module Network.Kafka where
import Control.Applicative
import Control.Exception (bracket)
import Control.Lens
import Control.Monad (liftM)
import Control.Monad.Trans (liftIO, lift)
import Control.Monad.Trans.Either
import Control.Monad.Trans.State
import Data.ByteString.Char8 (ByteString)
import Data.Monoid ((<>))
import qualified Data.Pool as Pool
import Data.Serialize.Get
import System.IO
import qualified Data.ByteString.Char8 as B
import qualified Data.Map as M
import qualified Network
import Network.Kafka.Protocol
data KafkaState = KafkaState {
_stateName :: KafkaString
, _stateRequiredAcks :: RequiredAcks
, _stateRequestTimeout :: Timeout
, _stateWaitSize :: MinBytes
, _stateBufferSize :: MaxBytes
, _stateWaitTime :: MaxWaitTime
, _stateCorrelationId :: CorrelationId
, _stateBrokers :: M.Map Leader Broker
, _stateConnections :: M.Map Broker (Pool.Pool Handle)
, _stateTopicMetadata :: M.Map TopicName TopicMetadata
}
makeLenses ''KafkaState
data KafkaClient = KafkaClient { _kafkaClientState :: KafkaState
, _kafkaClientHandle :: Handle
}
makeLenses ''KafkaClient
type Kafka = StateT KafkaClient (EitherT KafkaClientError IO)
type KafkaAddress = (Host, Port)
type KafkaClientId = KafkaString
data KafkaClientError =
KafkaNoOffset
| KafkaExpected KafkaExpectedResponse
| KafkaDeserializationError String
| KafkaInvalidBroker Leader
| KafkaFailedToFetchMetadata
deriving (Eq, Show)
data KafkaExpectedResponse = ExpectedMetadata
| ExpectedFetch
| ExpectedProduce
deriving (Eq, Show)
data KafkaTime =
LatestTime
| EarliestTime
| OtherTime Time
data PartitionAndLeader = PartitionAndLeader { _palTopic :: TopicName
, _palPartition :: Partition
, _palLeader :: Leader
}
deriving (Show)
makeLenses ''PartitionAndLeader
data TopicAndPartition = TopicAndPartition { _tapTopic :: TopicName
, _tapPartition :: Partition
}
deriving (Eq, Ord, Show)
data TopicAndMessage = TopicAndMessage { _tamTopic :: TopicName
, _tamMessage :: Message
}
deriving (Eq, Show)
makeLenses ''TopicAndMessage
tamPayload :: TopicAndMessage -> ByteString
tamPayload = foldOf (tamMessage . payload)
defaultCorrelationId :: CorrelationId
defaultCorrelationId = 0
defaultRequiredAcks :: RequiredAcks
defaultRequiredAcks = 1
defaultRequestTimeout :: Timeout
defaultRequestTimeout = 10000
defaultMinBytes :: MinBytes
defaultMinBytes = MinBytes 0
defaultMaxBytes :: MaxBytes
defaultMaxBytes = 1024 * 1024
defaultMaxWaitTime :: MaxWaitTime
defaultMaxWaitTime = 0
defaultState :: KafkaClientId -> KafkaState
defaultState cid =
KafkaState cid
defaultRequiredAcks
defaultRequestTimeout
defaultMinBytes
defaultMaxBytes
defaultMaxWaitTime
defaultCorrelationId
M.empty
M.empty
M.empty
runKafka :: KafkaAddress -> KafkaState -> Kafka a -> IO (Either KafkaClientError a)
runKafka (h, p) s k =
bracket (Network.connectTo (h ^. hostString) (p ^. portId)) hClose $ runEitherT . evalStateT k . KafkaClient s
makeRequest :: RequestMessage -> Kafka Request
makeRequest m = do
corid <- use (kafkaClientState . stateCorrelationId)
kafkaClientState . stateCorrelationId += 1
conid <- use (kafkaClientState . stateName)
return $ Request (corid, ClientId conid, m)
doRequest :: Request -> Kafka Response
doRequest r = do
h <- use kafkaClientHandle
doRequest' h r
doRequest' :: Handle -> Request -> Kafka Response
doRequest' h r = mapStateT (bimapEitherT KafkaDeserializationError id) $ do
dataLength <- lift . EitherT $ do
B.hPut h $ requestBytes r
hFlush h
rawLength <- B.hGet h 4
return $ runGet (liftM fromIntegral getWord32be) rawLength
resp <- liftIO $ B.hGet h dataLength
lift . hoistEither $ runGet (getResponse dataLength) resp
metadata :: MetadataRequest -> Kafka MetadataResponse
metadata request = do
h <- use kafkaClientHandle
metadata' h request
metadata' :: Handle -> MetadataRequest -> Kafka MetadataResponse
metadata' handle request =
makeRequest (MetadataRequest request) >>= doRequest' handle >>= expectResponse ExpectedMetadata _MetadataResponse
expectResponse :: KafkaExpectedResponse -> Getting (Leftmost b) ResponseMessage b -> Response -> Kafka b
expectResponse e p = lift . maybe (left $ KafkaExpected e) return . firstOf (responseMessage . p)
protocolTime :: KafkaTime -> Time
protocolTime LatestTime = Time (1)
protocolTime EarliestTime = Time (2)
protocolTime (OtherTime o) = o
ordinaryConsumerId :: ReplicaId
ordinaryConsumerId = ReplicaId (1)
fetchRequest :: Offset -> Partition -> TopicName -> Kafka FetchRequest
fetchRequest o p topic = do
wt <- use (kafkaClientState . stateWaitTime)
ws <- use (kafkaClientState . stateWaitSize)
bs <- use (kafkaClientState . stateBufferSize)
return $ FetchReq (ordinaryConsumerId, wt, ws, [(topic, [(p, o, bs)])])
fetch :: FetchRequest -> Kafka FetchResponse
fetch request =
makeRequest (FetchRequest request) >>= doRequest >>= expectResponse ExpectedFetch _FetchResponse
fetchMessages :: FetchResponse -> [TopicAndMessage]
fetchMessages fr = (fr ^.. fetchResponseFields . folded) >>= tam
where tam a = TopicAndMessage (a ^. _1) <$> a ^.. _2 . folded . _4 . messageSetMembers . folded . setMessage
updateMetadatas :: [TopicName] -> Kafka ()
updateMetadatas ts = do
md <- metadata $ MetadataReq ts
let (brokers, tmds) = (md ^.. metadataResponseBrokers . folded, md ^.. topicsMetadata . folded)
kafkaClientState . stateBrokers %= \m -> foldr addBroker m brokers
kafkaClientState . stateTopicMetadata %= \m -> foldr addTopicMetadata m tmds
return ()
where addBroker :: Broker -> M.Map Leader Broker -> M.Map Leader Broker
addBroker b = M.insert (Leader . Just $ b ^. brokerNode . nodeId) b
addTopicMetadata :: TopicMetadata -> M.Map TopicName TopicMetadata -> M.Map TopicName TopicMetadata
addTopicMetadata tm = M.insert (tm ^. topicMetadataName) tm
updateMetadata :: TopicName -> Kafka ()
updateMetadata t = updateMetadatas [t]
updateAllMetadata :: Kafka ()
updateAllMetadata = updateMetadatas []
withBrokerHandle :: Broker -> (Handle -> Kafka a) -> Kafka a
withBrokerHandle broker f = do
conns <- use (kafkaClientState . stateConnections)
let foundPool = conns ^. at broker
pool <- case foundPool of
Nothing -> do
newPool <- liftIO $ mkPool broker
kafkaClientState . stateConnections .= (at broker ?~ newPool $ conns)
return newPool
Just p -> return p
Pool.withResource pool f
where mkPool :: Broker -> IO (Pool.Pool Handle)
mkPool b = Pool.createPool (createHandle b) hClose 1 10 1
createHandle b = do
let h = b ^. brokerHost ^. hostString
p = b ^. brokerPort ^. portId
Network.connectTo h p
data PartitionOffsetRequestInfo =
PartitionOffsetRequestInfo {
_kafkaTime :: KafkaTime
, _maxNumOffsets :: MaxNumberOfOffsets
}
getLastOffset :: KafkaTime -> Partition -> TopicName -> Kafka Offset
getLastOffset m p t =
makeRequest (offsetRequest [(TopicAndPartition t p, PartitionOffsetRequestInfo m 1)]) >>= doRequest >>= maybe (StateT . const $ left KafkaNoOffset) return . firstOf (responseMessage . _OffsetResponse . offsetResponseOffset p)
offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> RequestMessage
offsetRequest ts =
OffsetRequest $ OffsetReq (ReplicaId (1), M.toList . M.unionsWith (<>) $ fmap f ts)
where f (TopicAndPartition t p, i) = M.singleton t [g p i]
g p (PartitionOffsetRequestInfo kt mno) = (p, protocolTime kt, mno)