{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE OverlappingInstances #-} module Kafka.Internal.Types ( Error(..) , Compression(..) , OffsetsTime(..) , RequestType(..) , Topic(..) , Offset(..) , Partition(..) , Size(..) , Count(..) , Message(..) , MessageSet(..) ) where import Control.Applicative import Data.ByteString (ByteString) import Data.ByteString.Lazy (fromStrict, toStrict) import Data.Digest.CRC32 (crc32) import Data.DList (DList) import Data.Monoid import Data.String import Data.Time (UTCTime) import Data.Time.Clock.POSIX import Data.Word import qualified Codec.Compression.GZip as GZip import qualified Codec.Compression.Snappy as Snappy import qualified Data.ByteString as B import qualified Data.DList as DList import qualified Data.Serialize as C -- | Different errors returned by Kafka. data Error = UnknownError -- -1 -- ^ Unknown error | OffsetOutOfRangeError -- 1 -- ^ Offset requested is invalid or no longer available on the server. | InvalidMessageError -- 2 -- ^ A message failed to match its checksum. | WrongPartitionError -- 3 -- ^ The requested partition doesn't exist. | InvalidFetchSizeError -- 4 -- ^ The maximum size requested for fetching is smaller than the message -- being fetched. deriving (Show, Read, Eq, Ord) instance C.Serialize (Maybe Error) where -- A note about extensions: -- We need FlexibleInstances to allow GHC to let us declare an instance -- for the type @Maybe Error@. We also need OverlappingInstances to tell -- it to pick this (more specific) instance of Maybe Error over the -- generic one provided by cereal. get = do code <- C.getWord16be case code of 0 -> return Nothing 1 -> return $ Just OffsetOutOfRangeError 2 -> return $ Just InvalidMessageError 3 -> return $ Just WrongPartitionError 4 -> return $ Just InvalidFetchSizeError -1 -> return $ Just UnknownError _ -> fail $ "Unknown error code: " ++ show code put Nothing = C.putWord16be 0 put (Just OffsetOutOfRangeError) = C.putWord16be 1 put (Just InvalidMessageError) = C.putWord16be 2 put (Just WrongPartitionError) = C.putWord16be 3 put (Just InvalidFetchSizeError) = C.putWord16be 4 put (Just UnknownError) = C.putWord16be (-1) -- | Methods of compression supported by Kafka. data Compression = NoCompression -- ^ The message is uncompressed. | GzipCompression -- ^ The message is compressed using @gzip@ compression and may contain -- other messages in it. | SnappyCompression -- ^ The message is compressed using @snappy@ compression and may contain -- other messages in it. deriving (Show, Read, Eq, Ord) instance C.Serialize Compression where put NoCompression = C.putWord8 0 put GzipCompression = C.putWord8 1 put SnappyCompression = C.putWord8 2 get = C.getWord8 >>= \i -> case i of 0 -> return NoCompression 1 -> return GzipCompression 2 -> return SnappyCompression _ -> fail $ "Invalid compression code: " ++ show i -- | Different times for which offsets may be retrieved using -- 'Kafka.offsets'. data OffsetsTime = OffsetsLatest -- ^ Retrieve the latest offsets | OffsetsEarliest -- ^ Retrieve the earliest offsets. | OffsetsBefore !UTCTime -- ^ Retrieve offsets before the given time. -- -- Keep in mind that the response will not contain the precise offset that -- occurred around this time. It will return up to the specified count of -- offsets in descending, each being the first offset of every segment -- file for the specified partition with a modified time less than this -- time, and possibly a "high water mark" for the last segment of the -- partition (if it was modified before this time) which specifies the -- offset at which the next message to that partition will be written. deriving (Show, Read, Eq, Ord) instance C.Serialize OffsetsTime where put OffsetsLatest = C.putWord64be (-1) put OffsetsEarliest = C.putWord64be (-2) put (OffsetsBefore t) = C.putWord64be . round . (* 1000) . utcTimeToPOSIXSeconds $ t -- Kafka doesn't care about anything beyond milliseconds while UTCTime can -- be more precise. We will lose some amount of precision if the same -- UTCTime is serialized and then deserialized. get = C.getWord64be >>= \i -> case i of -1 -> return OffsetsLatest -2 -> return OffsetsEarliest t -> return . OffsetsBefore . posixSecondsToUTCTime . (/ 1000) . realToFrac $ t -- | The different request types supported by Kafka. data RequestType = ProduceRequestType -- 0 | FetchRequestType -- 1 | MultiFetchRequestType -- 2 | MultiProduceRequestType -- 3 | OffsetsRequestType -- 4 deriving (Show, Read, Eq, Ord) instance C.Serialize RequestType where put ProduceRequestType = C.putWord16be 0 put FetchRequestType = C.putWord16be 1 put MultiFetchRequestType = C.putWord16be 2 put MultiProduceRequestType = C.putWord16be 3 put OffsetsRequestType = C.putWord16be 4 get = C.getWord16be >>= \i -> case i of 0 -> return ProduceRequestType 1 -> return FetchRequestType 2 -> return MultiFetchRequestType 3 -> return MultiProduceRequestType 4 -> return OffsetsRequestType _ -> fail $ "Unknown request type: " ++ show i -- | Represents a Kafka topic. -- -- This is an instance of 'IsString' so a literal string may be used to create -- a Topic with the @OverloadedStrings@ extension. newtype Topic = Topic ByteString deriving (Show, Read, Eq, Ord, IsString) instance C.Serialize Topic where put (Topic topic) = do C.putWord16be (fromIntegral $ B.length topic) C.putByteString topic get = do topicLength <- fromIntegral <$> C.getWord16be Topic <$> C.getByteString topicLength -- | Represents an Offset in Kafka. -- -- This is an instance of 'Num' so a literal number may be used to create an -- Offset. newtype Offset = Offset Word64 deriving (Show, Read, Eq, Ord, Num) instance C.Serialize Offset where put (Offset o) = C.putWord64be o get = Offset <$> C.getWord64be -- | Represents a Kafka topic partition. -- -- This is an instance of 'Num' so a literal number may be used to create a -- Partition. newtype Partition = Partition Word32 deriving (Show, Read, Eq, Ord, Num) instance C.Serialize Partition where put (Partition p) = C.putWord32be p get = Partition <$> C.getWord32be -- | Represents a size. -- -- This is an instance of 'Num' so a literal number may be used to create a -- Size. newtype Size = Size Word32 deriving (Show, Read, Eq, Ord, Num) instance C.Serialize Size where put (Size s) = C.putWord32be s get = Size <$> C.getWord32be -- | Represents a Count. -- -- This is an instance of 'Num' so a literal number may be used to create a -- Size. newtype Count = Count Word32 deriving (Show, Read, Eq, Ord, Num) instance C.Serialize Count where put (Count s) = C.putWord32be s get = Count <$> C.getWord32be -- | Represents a Message being sent through Kafka. data Message = Message { messageCompression :: !Compression -- ^ Compression used for the message. -- -- If this is anything but 'NoCompression', this message contains other -- messages in it. , messagePayload :: !ByteString -- ^ Message payload. -- -- If the message is using any compression scheme, the payload contains -- other messages in the same format. } deriving (Show, Read, Eq, Ord) instance C.Serialize Message where put (Message compression payload) = do C.putWord32be (fromIntegral messageLength) C.putWord8 1 -- Magic C.put compression C.putWord32be (crc32 payload) C.putByteString payload where messageLength = B.length payload + 4 -- checksum + 1 -- compression + 1 -- magic get = C.getWord32be >>= \messageLength -> do magic <- C.getWord8 compression <- case magic of 0 -> return NoCompression 1 -> C.get _ -> fail $ "Unknown magic code: " ++ show magic checksum <- C.getWord32be let remainingLength = fromIntegral $ messageLength - ( 4 -- checksum + 1 -- magic + fromIntegral magic -- compression (1 byte if magic is 1) ) payload <- C.getByteString remainingLength if crc32 payload == checksum then return (Message compression payload) else fail "Checksum did not match." -- TODO: This should probably be InvalidMessageError -- | Represents a collection of message payloads. -- -- These are compressed into a single message using Snappy compression when -- being sent. newtype MessageSet = MessageSet { fromMessageSet :: [ByteString] } deriving (Show, Read, Eq, Monoid) instance C.Serialize MessageSet where put (MessageSet messages) = C.put (Message SnappyCompression payload) where payload = Snappy.compress . C.runPut $ mapM_ (C.put . Message NoCompression) messages get = MessageSet . DList.toList <$> (C.get >>= readMessages) where readMessages :: Message -> C.Get (DList ByteString) readMessages (Message NoCompression payload) = return (DList.singleton payload) readMessages (Message compression payload) = do messages <- either fail return $ C.runGet (many C.get) decompressedPayload mconcat <$> mapM readMessages messages where decompressedPayload = decompress payload decompress = case compression of NoCompression -> id SnappyCompression -> Snappy.decompress GzipCompression -> toStrict . GZip.decompress . fromStrict