module Kafka.Internal.Response
( Response
, FetchResponse(..)
, getFetchResponse
, getMultiFetchResponse
, getOffsetsResponse
) where
import Control.Applicative
import Control.Monad
import Data.ByteString (ByteString)
import Data.Monoid
import qualified Data.Serialize as C
import Kafka.Internal.Request
import Kafka.Internal.Types
type Response a = Either Error a
data FetchResponse = FetchResponse {
fetchMessages :: [ByteString]
, fetchNewOffset :: !Offset
} deriving (Show, Read, Eq)
getFetchResponse :: Fetch -> C.Get FetchResponse
getFetchResponse Fetch{fetchOffset = oldOffset} = do
startBytes <- C.remaining
messages <- fromMessageSet . mconcat <$> many C.get
endBytes <- C.remaining
let newOffset = oldOffset + fromIntegral (startBytes endBytes)
return $ FetchResponse messages $! newOffset
getMultiFetchResponse :: [Fetch] -> C.Get [FetchResponse]
getMultiFetchResponse requests = forM requests $ \request -> do
size <- fromIntegral <$> C.getWord32be
C.skip 2
isolate' (size 2) (getFetchResponse request)
getOffsetsResponse :: C.Get [Offset]
getOffsetsResponse = do
numOffsets <- C.getWord32be
replicateM (fromIntegral numOffsets) C.get
isolate' :: Int -> C.Get a -> C.Get a
isolate' n m = do
s <- C.getBytes n
either fail return $ C.runGet m s