module Kafka.Internal.Response ( Response , FetchResponse(..) , getFetchResponse , getMultiFetchResponse , getOffsetsResponse ) where import Control.Applicative import Control.Monad import Data.ByteString (ByteString) import Data.Monoid import qualified Data.Serialize as C import Kafka.Internal.Request import Kafka.Internal.Types -- | A response from Kafka can either be a failure or the value that was -- expected. type Response a = Either Error a -- | Result of a single Kafka 'Kafka.Fetch'. data FetchResponse = FetchResponse { fetchMessages :: [ByteString] -- ^ List of messages returned in the response for the 'Kafka.Fetch' -- request. , fetchNewOffset :: {-# UNPACK #-} !Offset -- ^ New offset at which the next 'Fetch' request should start reading in -- the same topic and partition to access the messages that follow the -- messages returned in this response. } deriving (Show, Read, Eq) -- | Parses a single @FetchResponse@ for the given @Fetch@ request. getFetchResponse :: Fetch -> C.Get FetchResponse getFetchResponse Fetch{fetchOffset = oldOffset} = do startBytes <- C.remaining messages <- fromMessageSet . mconcat <$> many C.get endBytes <- C.remaining let newOffset = oldOffset + fromIntegral (startBytes - endBytes) return $ FetchResponse messages $! newOffset -- | Parses @FetchResponse@s for each of the given @Fetch@ requests. getMultiFetchResponse :: [Fetch] -> C.Get [FetchResponse] getMultiFetchResponse requests = forM requests $ \request -> do size <- fromIntegral <$> C.getWord32be C.skip 2 -- skip error isolate' (size - 2) (getFetchResponse request) -- | Parses the response for an @Offsets@ request. getOffsetsResponse :: C.Get [Offset] getOffsetsResponse = do numOffsets <- C.getWord32be replicateM (fromIntegral numOffsets) C.get -- | A version of 'C.isolate' that does not fail the parse because of -- unconsumed data. isolate' :: Int -> C.Get a -> C.Get a isolate' n m = do s <- C.getBytes n either fail return $ C.runGet m s