module Kafka.Internal.Request ( Produce(..) , putProduceRequest , putMultiProduceRequest , Fetch(..) , putFetchRequest , putMultiFetchRequest , Offsets(..) , putOffsetsRequest ) where import Control.Applicative import Data.ByteString (ByteString) import Data.Foldable (Foldable) import qualified Data.ByteString as B import qualified Data.Foldable as Fold import qualified Data.Serialize as C import Kafka.Internal.Types -- | 'Put's the items in the given Foldable in-order and prepends the result -- with a 16-bit integer containing the total number of items that were put. putWithCountPrefix :: Foldable f => f a -> C.Putter a -> C.Put putWithCountPrefix xs put = let (count, p) = Fold.foldr' go (0, pure ()) xs in C.putWord16be count *> p where go a (c, p) = (c + 1, put a *> p) -- | Prepends the given Put with a 32-bit integer containing the total size of -- the given Put. putWithLengthPrefix :: C.Put -> C.Put putWithLengthPrefix p = do C.putWord32be $ fromIntegral (B.length bs) C.putByteString bs where bs = C.runPut p -- | Encodes a request with the given 'RequestType'. -- -- More specifically, this prepends the given put with the request type and -- the size of the request. encodeRequest :: RequestType -> C.Put -> C.Put encodeRequest reqType p = putWithLengthPrefix $ C.put reqType >> p -- | A request to send messages down a Kafka topic-partition pair. -- -- Produce requests do not have a corresponding response. There is no way of -- knowing in Kafka 0.7 if a message was successfully @Produce@d. data Produce = Produce { produceTopic :: {-# UNPACK #-} !Topic -- ^ Kafka topic to which the messages will be sent. , producePartition :: {-# UNPACK #-} !Partition -- ^ Partition of the topic. , produceMessages :: [ByteString] -- ^ List of message payloads. -- -- For those concerned with low-leveld details: These messages will be -- compressed using compression. } deriving (Show, Read, Eq) encodeProduce :: Produce -> C.Put encodeProduce (Produce topic partition messages) = do C.put topic C.put partition putWithLengthPrefix (C.put (MessageSet messages)) -- | @Put@s the given single @Produce@ request. putProduceRequest :: Produce -> C.Put putProduceRequest = encodeRequest ProduceRequestType . encodeProduce -- | @Put@s the given @MultiProduce@ request. putMultiProduceRequest :: [Produce] -> C.Put putMultiProduceRequest reqs = encodeRequest MultiProduceRequestType $ putWithCountPrefix reqs encodeProduce -- | A request to fetch messages from a particular Kafka topic-partition pair. -- -- 'Kafka.FetchResponse' contains responses for this kind of request. data Fetch = Fetch { fetchTopic :: {-# UNPACK #-} !Topic -- ^ Kafka topic from which messages will be fetched. , fetchPartition :: {-# UNPACK #-} !Partition -- ^ Partition of the topic. , fetchOffset :: {-# UNPACK #-} !Offset -- ^ Offset at which the fetch will start. -- -- Kafka offloads the responsiblity of knowing this to the client. That -- means that if an offset is specified here that is not a real message -- start, Kafka will spit out garbage. -- -- Use 'Kafka.offsets' to find valid offsets. , fetchSize :: {-# UNPACK #-} !Size -- ^ Maximum size of the returned messages. -- -- Note, this is /not/ the number of messages. This is the maximum -- combined size of the returned /compressed/ messages. } deriving (Show, Read, Eq) encodeFetch :: Fetch -> C.Put encodeFetch (Fetch topic partition offset maxSize) = do C.put topic C.put partition C.put offset C.put maxSize -- | @Put@s the given single @Fetch@ request. putFetchRequest :: Fetch -> C.Put putFetchRequest = encodeRequest FetchRequestType . encodeFetch -- | @Put@s the given @MultiFetch@ request. putMultiFetchRequest :: [Fetch] -> C.Put putMultiFetchRequest reqs = encodeRequest MultiFetchRequestType $ putWithCountPrefix reqs encodeFetch -- | A request to retrieve offset information from Kafka. -- -- The response for this kind of request is a list of 'Offset's. data Offsets = Offsets { offsetsTopic :: {-# UNPACK #-} !Topic -- ^ Kafka topic from which offsets will be retrieved. , offsetsPartition :: {-# UNPACK #-} !Partition -- ^ Partition of the topic. , offsetsTime :: !OffsetsTime -- ^ Time around which offsets will be retrieved. -- -- If you provide a time for this, keep in mind that the response will not -- contain the precise offset that occurred around that time. It will return -- up to @offsetsCount@ offsets in descending, each being the first offset -- of every segment file for the specified partition with a modified time -- less than the specified time, and possibly a "high water mark" for the -- last segment of the partition (if it was modified before the specified -- time) which specifies the offset at which the next message to that -- partition will be written. , offsetsCount :: {-# UNPACK #-} !Count -- ^ Maximum number of offsets that will be retrieved. } deriving (Show, Read, Eq) -- | @Put@s the given @Offsets@ request. putOffsetsRequest :: Offsets -> C.Put putOffsetsRequest (Offsets topic partition time maxNumber) = encodeRequest OffsetsRequestType $ do C.put topic C.put partition C.put time C.put maxNumber