module Data.Conduit.JSON.NewlineDelimited
(
serializer
, valueSerializer
, parser
, maybeParser
, eitherParser
, valueParser
, maybeValueParser
, eitherValueParser
, resultValueParser
) where
import qualified Data.Aeson as A
import qualified Data.Aeson.Types as A
import qualified Data.Aeson.Encode as A
import Data.Attoparsec.ByteString
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as B (toChunks)
import qualified Data.ByteString.Builder as B
import Data.Conduit
import Data.Monoid
carriage = fromIntegral $ fromEnum '\r'
newline = fromIntegral $ fromEnum '\n'
serializer :: (Monad m, A.ToJSON a) => Conduit a m B.ByteString
serializer = mapInput A.toJSON (const Nothing) valueSerializer
valueSerializer :: Monad m => Conduit A.Value m B.ByteString
valueSerializer = do
val <- await
case val of
Nothing -> return ()
Just val' -> do
mapM_ yield $ B.toChunks $ B.toLazyByteString $
A.encodeToByteStringBuilder val' <> B.word8 carriage <> B.word8 newline
valueSerializer
parser :: (Monad m, A.FromJSON a) => Conduit B.ByteString m a
parser = mapOutputMaybe (>>= A.parseMaybe A.parseJSON) maybeValueParser
maybeParser :: (Monad m, A.FromJSON a) => Conduit B.ByteString m (Maybe a)
maybeParser = mapOutput (>>= A.parseMaybe A.parseJSON) maybeValueParser
eitherParser :: (Monad m, A.FromJSON a) => Conduit B.ByteString m (Either String a)
eitherParser = mapOutput (>>= A.parseEither A.parseJSON) eitherValueParser
valueParser :: Monad m => Conduit B.ByteString m A.Value
valueParser = mapOutputMaybe maybeResult resultValueParser
maybeValueParser :: Monad m => Conduit B.ByteString m (Maybe A.Value)
maybeValueParser = mapOutput maybeResult resultValueParser
eitherValueParser :: Monad m => Conduit B.ByteString m (Either String A.Value)
eitherValueParser = mapOutput eitherResult resultValueParser
resultValueParser :: Monad m => Conduit B.ByteString m (Result A.Value)
resultValueParser = chunked (parse A.json B.empty)
where
chunked iresult = do
val <- await
case val of
Nothing -> return ()
Just val' -> do
case B.split newline (B.filter (/=carriage) val') of
[] -> chunked iresult
(head:rest) ->
case rest of
[] -> chunked (feed iresult head)
(head':rest') -> do
yield (feed iresult head')
case rest' of
[] -> resultValueParser
_ -> do
mapM_ yield (map (parse A.json) (init rest'))
chunked (parse A.json (last rest'))