module Network.Kafka.Producer where import Prelude hiding ((!!)) import Control.Applicative import Control.Lens import Control.Monad.Trans (liftIO, lift) import Control.Monad.Trans.Either import Data.ByteString.Char8 (ByteString) import qualified Data.Digest.Murmur32 as Murmur32 import Data.List.Safe ((!!)) import Data.Monoid ((<>)) import System.IO import qualified Data.Map as M import System.Random (getStdRandom, randomR) import Network.Kafka import Network.Kafka.Protocol -- * Producing -- | Execute a produce request and get the raw preduce response. produce :: Handle -> ProduceRequest -> Kafka ProduceResponse produce handle request = makeRequest (ProduceRequest request) >>= doRequest' handle >>= expectResponse ExpectedProduce _ProduceResponse -- | Construct a produce request with explicit arguments. produceRequest :: RequiredAcks -> Timeout -> [(TopicAndPartition, MessageSet)] -> ProduceRequest produceRequest ra ti ts = ProduceReq (ra, ti, M.toList . M.unionsWith (<>) $ fmap f ts) where f (TopicAndPartition t p, i) = M.singleton t [(p, i)] -- | Send messages to partition calculated by 'partitionAndCollate'. produceMessages :: [TopicAndMessage] -> Kafka [ProduceResponse] produceMessages tams = do m <- fmap (fmap groupMessagesToSet) <$> partitionAndCollate tams mapM (uncurry send) $ fmap M.toList <$> M.toList m -- | Create a protocol message set from a list of messages. groupMessagesToSet :: [TopicAndMessage] -> MessageSet groupMessagesToSet xs = MessageSet $ msm <$> xs where msm = MessageSetMember (Offset (-1)) . _tamMessage -- | Group messages together with the leader they should be sent to. partitionAndCollate :: [TopicAndMessage] -> Kafka (M.Map Leader (M.Map TopicAndPartition [TopicAndMessage])) partitionAndCollate ks = recurse ks M.empty where recurse [] accum = return accum recurse (x:xs) accum = do topicPartitionsList <- brokerPartitionInfo $ _tamTopic x let maybeKey = x ^. tamMessage . messageKey . keyBytes pal <- case maybeKey of Nothing -> getRandPartition topicPartitionsList Just key -> return $ getPartitionByKey (_kafkaByteString key) topicPartitionsList let leader = maybe (Leader Nothing) _palLeader pal tp = TopicAndPartition <$> pal ^? folded . palTopic <*> pal ^? folded . palPartition b = M.singleton leader $ maybe M.empty (`M.singleton` [x]) tp accum' = M.unionWith (M.unionWith (<>)) accum b recurse xs accum' -- | Compute the partition for a record. This matches the way the official -- | clients compute the partition. getPartitionByKey :: ByteString -> [PartitionAndLeader] -> Maybe PartitionAndLeader getPartitionByKey key ps = let i = Murmur32.asWord32 $ Murmur32.hash32WithSeed 0x9747b28c key in ps !! i -- | Execute a produce request using the values in the state. send :: Leader -> [(TopicAndPartition, MessageSet)] -> Kafka ProduceResponse send l ts = do let s = kafkaClientState . stateBrokers . at l topicNames = map (_tapTopic . fst) ts broker <- findMetadataOrElse topicNames s (KafkaInvalidBroker l) requiredAcks <- use (kafkaClientState . stateRequiredAcks) requestTimeout <- use (kafkaClientState . stateRequestTimeout) withBrokerHandle broker $ \handle -> produce handle $ produceRequest requiredAcks requestTimeout ts -- | Find a leader and partition for the topic. brokerPartitionInfo :: TopicName -> Kafka [PartitionAndLeader] brokerPartitionInfo t = do let s = kafkaClientState . stateTopicMetadata . at t tmd <- findMetadataOrElse [t] s KafkaFailedToFetchMetadata return $ pal <$> tmd ^. partitionsMetadata where pal d = PartitionAndLeader t (d ^. partitionId) (d ^. partitionMetadataLeader) findMetadataOrElse :: [TopicName] -> Getting (Maybe a) KafkaClient (Maybe a) -> KafkaClientError -> Kafka a findMetadataOrElse ts s err = do maybeFound <- use s case maybeFound of Just x -> return x Nothing -> do updateMetadatas ts maybeFound' <- use s case maybeFound' of Just x -> return x Nothing -> lift $ left $ err getRandPartition :: [PartitionAndLeader] -> Kafka (Maybe PartitionAndLeader) getRandPartition ps = liftIO $ (ps' ^?) . element <$> getStdRandom (randomR (0, length ps' - 1)) where ps' = ps ^.. folded . filtered (has $ palLeader . leaderId . _Just) -- * Messages -- | Default: @1@ defaultMessageCrc :: Crc defaultMessageCrc = 1 -- | Default: @0@ defaultMessageMagicByte :: MagicByte defaultMessageMagicByte = 0 -- | Default: @Nothing@ defaultMessageKey :: Key defaultMessageKey = Key Nothing -- | Default: @0@ defaultMessageAttributes :: Attributes defaultMessageAttributes = 0 -- | Construct a message from a string of bytes using default attributes. makeMessage :: ByteString -> Message makeMessage m = Message (defaultMessageCrc, defaultMessageMagicByte, defaultMessageAttributes, defaultMessageKey, Value (Just (KBytes m)))