module Network.Kafka where
import Control.Applicative
import Control.Exception (bracket)
import Control.Lens
import Control.Monad (liftM)
import Control.Monad.Trans (liftIO, lift)
import Control.Monad.Trans.Either
import Control.Monad.Trans.State
import Data.ByteString.Char8 (ByteString)
import Data.Monoid ((<>))
import Data.Serialize.Get
import System.IO
import System.Random (getStdRandom, randomR)
import qualified Data.ByteString.Char8 as B
import qualified Data.Map as M
import qualified Network
import Network.Kafka.Protocol
data KafkaState = KafkaState {
_stateName :: KafkaString
, _stateCorrelationId :: CorrelationId
, _stateRequiredAcks :: RequiredAcks
, _stateRequestTimeout :: Timeout
, _stateWaitSize :: MinBytes
, _stateBufferSize :: MaxBytes
, _stateWaitTime :: MaxWaitTime
, _stateBrokers :: M.Map Leader Broker
}
makeLenses ''KafkaState
data KafkaConsumer = KafkaConsumer { _consumerState :: KafkaState
, _consumerHandle :: Handle
}
makeLenses ''KafkaConsumer
type Kafka = StateT KafkaConsumer (EitherT KafkaClientError IO)
type KafkaAddress = (Host, Port)
type KafkaClientId = KafkaString
data KafkaClientError =
KafkaNoOffset
| KafkaExpected KafkaExpectedResponse
| KafkaDeserializationError String
| KafkaInvalidBroker Leader
deriving (Eq, Show)
data KafkaExpectedResponse = ExpectedMetadata
| ExpectedFetch
| ExpectedProduce
deriving (Eq, Show)
data KafkaTime =
LatestTime
| EarliestTime
| OtherTime Time
data PartitionAndLeader = PartitionAndLeader { _palTopic :: TopicName
, _palPartition :: Partition
, _palLeader :: Leader
}
makeLenses ''PartitionAndLeader
data TopicAndPartition = TopicAndPartition { _tapTopic :: TopicName
, _tapPartition :: Partition
}
deriving (Eq, Ord, Show)
data TopicAndMessage = TopicAndMessage { _tamTopic :: TopicName
, _tamMessage :: Message
}
deriving (Eq, Show)
makeLenses ''TopicAndMessage
tamPayload :: TopicAndMessage -> ByteString
tamPayload = foldOf (tamMessage . payload)
defaultCorrelationId :: CorrelationId
defaultCorrelationId = 0
defaultRequiredAcks :: RequiredAcks
defaultRequiredAcks = 1
defaultRequestTimeout :: Timeout
defaultRequestTimeout = 10000
defaultMinBytes :: MinBytes
defaultMinBytes = MinBytes 0
defaultMaxBytes :: MaxBytes
defaultMaxBytes = 1024 * 1024
defaultMaxWaitTime :: MaxWaitTime
defaultMaxWaitTime = 0
defaultState :: KafkaClientId -> KafkaState
defaultState cid =
KafkaState cid
defaultCorrelationId
defaultRequiredAcks
defaultRequestTimeout
defaultMinBytes
defaultMaxBytes
defaultMaxWaitTime
M.empty
runKafka :: KafkaAddress -> KafkaState -> Kafka a -> IO (Either KafkaClientError a)
runKafka (h, p) s k =
bracket (Network.connectTo (h ^. hostString) (p ^. portId)) hClose $ runEitherT . evalStateT k . KafkaConsumer s
makeRequest :: RequestMessage -> Kafka Request
makeRequest m = do
corid <- use (consumerState . stateCorrelationId)
consumerState . stateCorrelationId += 1
conid <- use (consumerState . stateName)
return $ Request (corid, ClientId conid, m)
doRequest :: Request -> Kafka Response
doRequest r = mapStateT (bimapEitherT KafkaDeserializationError id) $ do
h <- use consumerHandle
dataLength <- lift . EitherT $ do
B.hPut h $ requestBytes r
hFlush h
rawLength <- B.hGet h 4
return $ runGet (liftM fromIntegral getWord32be) rawLength
resp <- liftIO $ B.hGet h dataLength
lift . hoistEither $ runGet (getResponse dataLength) resp
metadata :: MetadataRequest -> Kafka MetadataResponse
metadata request =
makeRequest (MetadataRequest request) >>= doRequest >>= expectResponse ExpectedMetadata _MetadataResponse
expectResponse :: KafkaExpectedResponse -> Getting (Leftmost b) ResponseMessage b -> Response -> Kafka b
expectResponse e p = lift . maybe (left $ KafkaExpected e) return . firstOf (responseMessage . p)
protocolTime :: KafkaTime -> Time
protocolTime LatestTime = Time (1)
protocolTime EarliestTime = Time (2)
protocolTime (OtherTime o) = o
partitionAndCollate :: [TopicAndMessage] -> Kafka (M.Map Leader (M.Map TopicAndPartition [TopicAndMessage]))
partitionAndCollate ks = recurse ks M.empty
where recurse [] accum = return accum
recurse (x:xs) accum = do
topicPartitionsList <- brokerPartitionInfo $ _tamTopic x
pal <- getPartition topicPartitionsList
let leader = maybe (Leader Nothing) _palLeader pal
tp = TopicAndPartition <$> pal ^? folded . palTopic <*> pal ^? folded . palPartition
b = M.singleton leader $ maybe M.empty (`M.singleton` [x]) tp
accum' = M.unionWith (M.unionWith (<>)) accum b
recurse xs accum'
getPartition :: [PartitionAndLeader] -> Kafka (Maybe PartitionAndLeader)
getPartition ps =
liftIO $ (ps' ^?) . element <$> getStdRandom (randomR (0, length ps' 1))
where ps' = ps ^.. folded . filtered (has $ palLeader . leaderId . _Just)
groupMessagesToSet :: [TopicAndMessage] -> MessageSet
groupMessagesToSet xs = MessageSet $ uncurry msm <$> zip [0..] xs
where msm n = MessageSetMember (Offset n) . _tamMessage
brokerPartitionInfo :: TopicName -> Kafka [PartitionAndLeader]
brokerPartitionInfo t = do
md <- metadata $ MetadataReq [t]
let brokers = md ^.. metadataResponseFields . _1 . folded
consumerState . stateBrokers .= foldr addBroker M.empty brokers
return $ pal <$> md ^.. topicsMetadata . folded . partitionsMetadata . folded
where pal d = PartitionAndLeader t (d ^. partitionId) (d ^. partitionMetadataLeader)
addBroker b = M.insert (Leader . Just $ b ^. brokerFields . _1 . nodeId) b
defaultMessageCrc :: Crc
defaultMessageCrc = 1
defaultMessageMagicByte :: MagicByte
defaultMessageMagicByte = 0
defaultMessageKey :: Key
defaultMessageKey = Key Nothing
defaultMessageAttributes :: Attributes
defaultMessageAttributes = 0
makeMessage :: ByteString -> Message
makeMessage m = Message (defaultMessageCrc, defaultMessageMagicByte, defaultMessageAttributes, defaultMessageKey, Value (Just (KBytes m)))
ordinaryConsumerId :: ReplicaId
ordinaryConsumerId = ReplicaId (1)
fetchRequest :: Offset -> Partition -> TopicName -> Kafka FetchRequest
fetchRequest o p topic = do
wt <- use (consumerState . stateWaitTime)
ws <- use (consumerState . stateWaitSize)
bs <- use (consumerState . stateBufferSize)
return $ FetchReq (ordinaryConsumerId, wt, ws, [(topic, [(p, o, bs)])])
fetch :: FetchRequest -> Kafka FetchResponse
fetch request =
makeRequest (FetchRequest request) >>= doRequest >>= expectResponse ExpectedFetch _FetchResponse
fetchMessages :: FetchResponse -> [TopicAndMessage]
fetchMessages fr = (fr ^.. fetchResponseFields . folded) >>= tam
where tam a = TopicAndMessage (a ^. _1) <$> a ^.. _2 . folded . _4 . messageSetMembers . folded . setMessage
produce :: ProduceRequest -> Kafka ProduceResponse
produce request =
makeRequest (ProduceRequest request) >>= doRequest >>= expectResponse ExpectedProduce _ProduceResponse
produceRequest :: RequiredAcks -> Timeout -> [(TopicAndPartition, MessageSet)] -> ProduceRequest
produceRequest ra ti ts =
ProduceReq (ra, ti, M.toList . M.unionsWith (<>) $ fmap f ts)
where f (TopicAndPartition t p, i) = M.singleton t [(p, i)]
produceMessages :: [TopicAndMessage] -> Kafka [ProduceResponse]
produceMessages tams = do
m <- fmap (fmap groupMessagesToSet) <$> partitionAndCollate tams
mapM (uncurry send) $ fmap M.toList <$> M.toList m
send :: Leader -> [(TopicAndPartition, MessageSet)] -> Kafka ProduceResponse
send l ts = do
foundBroker <- use (consumerState . stateBrokers . at l)
broker <- lift $ maybe (left $ KafkaInvalidBroker l) right foundBroker
requiredAcks <- use (consumerState . stateRequiredAcks)
requestTimeout <- use (consumerState . stateRequestTimeout)
let h' = broker ^. brokerFields . _2
p' = broker ^. brokerFields . _3
cstate <- use consumerState
r <- liftIO . runKafka (h', p') cstate . produce $ produceRequest requiredAcks requestTimeout ts
lift $ either left right r
data PartitionOffsetRequestInfo =
PartitionOffsetRequestInfo {
_kafkaTime :: KafkaTime
, _maxNumOffsets :: MaxNumberOfOffsets
}
getLastOffset :: KafkaTime -> Partition -> TopicName -> Kafka Offset
getLastOffset m p t =
makeRequest (offsetRequest [(TopicAndPartition t p, PartitionOffsetRequestInfo m 1)]) >>= doRequest >>= maybe (StateT . const $ left KafkaNoOffset) return . firstOf (responseMessage . _OffsetResponse . offsetResponseOffset p)
offsetRequest :: [(TopicAndPartition, PartitionOffsetRequestInfo)] -> RequestMessage
offsetRequest ts =
OffsetRequest $ OffsetReq (ReplicaId (1), M.toList . M.unionsWith (<>) $ fmap f ts)
where f (TopicAndPartition t p, i) = M.singleton t [g p i]
g p (PartitionOffsetRequestInfo kt mno) = (p, protocolTime kt, mno)