module Network.NSQ.Parser
( message
, decode
, encode
) where
import Data.Word
import Data.Monoid
import Control.Applicative
import Data.Attoparsec.Binary
import Data.Attoparsec.ByteString hiding (count)
import Data.Int
import Prelude hiding (take)
import qualified Data.List as DL
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as C8
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Builder as BL
import qualified Data.Text.Encoding as T
import Network.NSQ.Types
import Network.NSQ.Identify
decode :: BS.ByteString -> Maybe Message
decode str = case parseOnly message str of
Left _ -> Nothing
Right r -> Just r
command :: BS.ByteString -> Message
command "_heartbeat_" = Heartbeat
command "OK" = OK
command "CLOSE_WAIT" = CloseWait
command x = CatchAllMessage (FTUnknown 99) x
errorCasting :: BS.ByteString -> ErrorType
errorCasting "E_INVALID" = Invalid
errorCasting "E_BAD_BODY" = BadBody
errorCasting "E_BAD_TOPIC" = BadTopic
errorCasting "E_BAD_MESSAGE" = BadMessage
errorCasting "E_PUB_FAILED" = PubFailed
errorCasting "E_MPUB_FAILED" = MPubFailed
errorCasting "E_FIN_FAILED" = FinFailed
errorCasting "E_REQ_FAILED" = ReqFailed
errorCasting "E_TOUCH_FAILED" = TouchFailed
errorCasting x = Unknown x
frameType :: Int32 -> FrameType
frameType 0 = FTResponse
frameType 1 = FTError
frameType 2 = FTMessage
frameType x = FTUnknown x
message :: Parser Message
message = do
size <- fromIntegral <$> anyWord32be
ft <- frameType <$> fromIntegral <$> anyWord32be
frame ft (size 4)
frame :: FrameType -> Int -> Parser Message
frame FTError size = Error <$> (errorCasting `fmap` take size)
frame FTResponse size = command <$> take size
frame FTMessage size = Message
<$> (fromIntegral <$> anyWord64be)
<*> anyWord16be
<*> take 16
<*> take (size 26)
frame ft size = CatchAllMessage ft <$> take size
sizedData :: BS.ByteString -> BL.Builder
sizedData dat = BL.word32BE (fromIntegral $ BS.length dat) <> BL.byteString dat
concatSizedData :: (Word32, Word32, BL.Builder) -> BS.ByteString -> (Word32, Word32, BL.Builder)
concatSizedData (totalSize, count, xs) dat = (
totalSize + 4 + fromIntegral (BS.length dat),
count + 1,
xs <> sizedData dat
)
encode :: Command -> BS.ByteString
encode Protocol = " V2"
encode NOP = "NOP\n"
encode Cls = "CLS\n"
encode (Identify identify) = BL.toStrict $ BL.toLazyByteString (
BL.byteString "IDENTIFY\n" <>
sizedData (BL.toStrict $ encodeMetadata identify)
)
encode (Sub topic channel ephemeral) = BL.toStrict $ BL.toLazyByteString (
BL.byteString "SUB " <>
BL.byteString (T.encodeUtf8 topic) <>
BL.byteString " " <>
BL.byteString (T.encodeUtf8 channel) <>
BL.byteString (if ephemeral then "#ephemeral" else "") <>
BL.byteString "\n"
)
encode (Pub topic dat) = BL.toStrict $ BL.toLazyByteString (
BL.byteString "PUB " <>
BL.byteString (T.encodeUtf8 topic) <>
BL.byteString "\n" <>
sizedData dat
)
encode (MPub topic dx) = BL.toStrict $ BL.toLazyByteString (
BL.byteString "MPUB " <>
BL.byteString (T.encodeUtf8 topic) <>
BL.byteString "\n" <>
BL.word32BE (totalSize + 4) <>
BL.word32BE totalCount <>
content
)
where
(totalSize, totalCount, content) = DL.foldl' concatSizedData (0, 0, mempty) dx
encode (Rdy count) = BL.toStrict $ BL.toLazyByteString (
BL.byteString "RDY " <>
BL.byteString (C8.pack $ show count) <>
BL.byteString "\n"
)
encode (Fin msg_id) = BL.toStrict $ BL.toLazyByteString (
BL.byteString "FIN " <>
BL.byteString msg_id <>
BL.byteString "\n"
)
encode (Touch msg_id) = BL.toStrict $ BL.toLazyByteString (
BL.byteString "TOUCH " <>
BL.byteString msg_id <>
BL.byteString "\n"
)
encode (Req msg_id timeout) = BL.toStrict $ BL.toLazyByteString (
BL.byteString "REQ " <>
BL.byteString msg_id <>
BL.byteString " " <>
BL.byteString (C8.pack $ show timeout) <>
BL.byteString "\n"
)
encode (Command m) = m