module Kafka.Internal.Transport
( Transport(..)
, recvExactly
, Socket
, withConnection
, connect
, close
) where
import Control.Applicative
import Data.ByteString (ByteString)
import Network.Socket (Socket, close)
import qualified Control.Exception as E
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as B8
import qualified Network.Socket as N
import qualified Network.Socket.ByteString as S
class Transport t where
send :: t -> ByteString -> IO ()
recv :: t -> Int -> IO ByteString
instance Transport Socket where
send = S.sendAll
recv = S.recv
recvExactly :: Transport t => t -> Int -> IO ByteString
recvExactly t size = B.concat . reverse <$> loop [] 0
where
loop chunks bytesRead
| bytesRead >= size = return chunks
| otherwise = do
chunk <- recv t (size bytesRead)
if B.null chunk
then return chunks
else loop (chunk:chunks) $! bytesRead + B.length chunk
getAddrInfo :: ByteString -> Int -> IO N.AddrInfo
getAddrInfo host port = head <$>
N.getAddrInfo (Just hints)
(Just $ B8.unpack host)
(Just $ show port)
where
hints = N.defaultHints {
N.addrFamily = N.AF_INET
, N.addrFlags = [N.AI_NUMERICSERV]
, N.addrSocketType = N.Stream
}
connect :: ByteString -> Int -> IO Socket
connect host port = do
addrInfo <- getAddrInfo host port
socket <- N.socket N.AF_INET N.Stream N.defaultProtocol
N.connect socket (N.addrAddress addrInfo) `E.onException` N.close socket
return socket
withConnection :: ByteString -> Int -> (Socket -> IO a) -> IO a
withConnection host port = E.bracket (connect host port) close