{-# LANGUAGE TupleSections #-}
{-# options_ghc -Wno-unused-imports #-}
-- | Streaming interface for JSONL-encoded files, based on @conduit@
--
-- The JSONL (JSON Lines) format : https://jsonlines.org/
module JSONL.Conduit (
  -- * Encode
  jsonToLBSC
  -- ** I/O
  , sinkFileC
  , appendFileC
  -- * Decode
  , jsonFromLBSC
  -- ** I/O
  , sourceFileC
  ) where

import Data.Void (Void)

import Control.Monad.IO.Class (MonadIO(..))
import System.IO (IOMode(..), Handle, openBinaryFile)

  -- aeson
import Data.Aeson (ToJSON(..), FromJSON(..), eitherDecode' )
-- bytestring
import qualified Data.ByteString as BS (ByteString, null)
import qualified Data.ByteString.Builder as BBS (toLazyByteString, Builder)
import qualified Data.ByteString.Internal as BS (c2w)
import qualified Data.ByteString.Char8 as BS8 (span, drop, putStrLn, putStr)
import qualified Data.ByteString.Lazy as LBS (ByteString, drop, span, toStrict, fromStrict)
-- conduit
import qualified Conduit as C (ConduitT, runConduit, sourceFile, sinkFile, await, yield, mapC, unfoldC, foldMapC, foldlC, printC, sinkIOHandle)
import Conduit ( (.|) , MonadResource)
-- jsonl
import JSONL (jsonLine)

-- | Render a stream of JSON-encodable objects into a `LBS.ByteString`
jsonToLBSC :: (ToJSON a, Monad m) => C.ConduitT a o m LBS.ByteString
jsonToLBSC :: ConduitT a o m ByteString
jsonToLBSC = Builder -> ByteString
BBS.toLazyByteString (Builder -> ByteString)
-> ConduitT a o m Builder -> ConduitT a o m ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ConduitT a o m Builder
forall a (m :: * -> *) o.
(ToJSON a, Monad m) =>
ConduitT a o m Builder
jsonToBuilderC

-- | Render a stream of JSON-encodable objects into a `BSB.Builder`
jsonToBuilderC :: (ToJSON a, Monad m) => C.ConduitT a o m BBS.Builder
jsonToBuilderC :: ConduitT a o m Builder
jsonToBuilderC = (a -> Builder) -> ConduitT a o m Builder
forall (m :: * -> *) b a o.
(Monad m, Monoid b) =>
(a -> b) -> ConduitT a o m b
C.foldMapC a -> Builder
forall a. ToJSON a => a -> Builder
jsonLine

-- | Render a stream of JSON-encodable objects into a JSONL file
sinkFileC :: (ToJSON a, MonadResource m) =>
             FilePath -- ^ path of JSONL file to be created
          -> C.ConduitT a o m ()
sinkFileC :: FilePath -> ConduitT a o m ()
sinkFileC FilePath
fpath = (a -> ByteString) -> ConduitT a ByteString m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.mapC a -> ByteString
forall a. ToJSON a => a -> ByteString
encodeJSONL ConduitT a ByteString m ()
-> ConduitM ByteString o m () -> ConduitT a o m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.|
                  FilePath -> ConduitM ByteString o m ()
forall (m :: * -> *) o.
MonadResource m =>
FilePath -> ConduitT ByteString o m ()
C.sinkFile FilePath
fpath

-- | Like `sinkFileC` but in `AppendMode`, which means that the handle is positioned at the
-- end of the file.
--
-- @since 0.1.1
appendFileC :: (ToJSON a, MonadResource m) =>
               FilePath
            -> C.ConduitT a o m ()
appendFileC :: FilePath -> ConduitT a o m ()
appendFileC FilePath
fpath = (a -> ByteString) -> ConduitT a ByteString m ()
forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.mapC a -> ByteString
forall a. ToJSON a => a -> ByteString
encodeJSONL ConduitT a ByteString m ()
-> ConduitM ByteString o m () -> ConduitT a o m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.|
                    IO Handle -> ConduitM ByteString o m ()
forall (m :: * -> *) o.
MonadResource m =>
IO Handle -> ConduitT ByteString o m ()
C.sinkIOHandle (FilePath -> IOMode -> IO Handle
openBinaryFile FilePath
fpath IOMode
AppendMode)

encodeJSONL :: ToJSON a => a -> BS.ByteString
encodeJSONL :: a -> ByteString
encodeJSONL = ByteString -> ByteString
LBS.toStrict (ByteString -> ByteString) -> (a -> ByteString) -> a -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Builder -> ByteString
BBS.toLazyByteString (Builder -> ByteString) -> (a -> Builder) -> a -> ByteString
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Builder
forall a. ToJSON a => a -> Builder
jsonLine

-- | Read a JSONL file and stream the decoded records
--
-- NB : ignores any parsing errors and returns 
sourceFileC :: (MonadResource m, FromJSON a) =>
               FilePath -- ^ path of JSONL file to be read
            -> C.ConduitT () a m ()
sourceFileC :: FilePath -> ConduitT () a m ()
sourceFileC FilePath
fpath = FilePath -> ConduitT () ByteString m ()
forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
C.sourceFile FilePath
fpath ConduitT () ByteString m ()
-> ConduitM ByteString a m () -> ConduitT () a m ()
forall (m :: * -> *) a b c r.
Monad m =>
ConduitM a b m () -> ConduitM b c m r -> ConduitM a c m r
.|
                    ConduitM ByteString a m ()
forall (m :: * -> *) a.
(Monad m, FromJSON a) =>
ConduitT ByteString a m ()
parseChunk

parseChunk :: (Monad m, FromJSON a) => C.ConduitT BS.ByteString a m ()
parseChunk :: ConduitT ByteString a m ()
parseChunk = ByteString -> ConduitT ByteString a m ()
forall o (m :: * -> *).
(FromJSON o, Monad m) =>
ByteString -> ConduitT ByteString o m ()
go ByteString
forall a. Monoid a => a
mempty
  where
    go :: ByteString -> ConduitT ByteString o m ()
go ByteString
acc =
      if Bool -> Bool
not (ByteString -> Bool
BS.null ByteString
acc) -- buffer is non empty
      then
        case ByteString -> Either FilePath (o, ByteString)
forall a.
FromJSON a =>
ByteString -> Either FilePath (a, ByteString)
chopDecode ByteString
acc of
          Left FilePath
_ -> () -> ConduitT ByteString o m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          Right (o
y, ByteString
srest) -> do
            o -> ConduitT ByteString o m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield o
y
            ByteString -> ConduitT ByteString o m ()
go ByteString
srest
      else do
        Maybe ByteString
mc <- ConduitT ByteString o m (Maybe ByteString)
forall (m :: * -> *) i. Monad m => Consumer i m (Maybe i)
C.await -- get data from upstream
        case Maybe ByteString
mc of
          Maybe ByteString
Nothing -> () -> ConduitT ByteString o m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          Just ByteString
x -> do
            let
              acc' :: ByteString
acc' = ByteString
acc ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> ByteString
x
            case ByteString -> Either FilePath (o, ByteString)
forall a.
FromJSON a =>
ByteString -> Either FilePath (a, ByteString)
chopDecode ByteString
acc' of
              Left FilePath
_ -> () -> ConduitT ByteString o m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
              Right (o
y, ByteString
srest) -> do
                o -> ConduitT ByteString o m ()
forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield o
y
                ByteString -> ConduitT ByteString o m ()
go ByteString
srest

chopDecode :: FromJSON a =>
              BS.ByteString -> Either String (a, BS.ByteString)
chopDecode :: ByteString -> Either FilePath (a, ByteString)
chopDecode ByteString
acc = (, ByteString
srest) (a -> (a, ByteString))
-> Either FilePath a -> Either FilePath (a, ByteString)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ByteString -> Either FilePath a
forall a. FromJSON a => ByteString -> Either FilePath a
eitherDecode' (ByteString -> ByteString
LBS.fromStrict ByteString
s)
  where
    (ByteString
s, ByteString
srest) = ByteString -> (ByteString, ByteString)
chopBS8 ByteString
acc



-- | Source a `LBS.ByteString` for JSONL records
jsonFromLBSC :: (FromJSON a, Monad m) => LBS.ByteString -> C.ConduitT Void a m ()
jsonFromLBSC :: ByteString -> ConduitT Void a m ()
jsonFromLBSC = (ByteString -> Maybe (a, ByteString))
-> ByteString -> ConduitT Void a m ()
forall (m :: * -> *) b a i.
Monad m =>
(b -> Maybe (a, b)) -> b -> ConduitT i a m ()
C.unfoldC ByteString -> Maybe (a, ByteString)
forall a. FromJSON a => ByteString -> Maybe (a, ByteString)
mk
  where
    mk :: ByteString -> Maybe (a, ByteString)
mk ByteString
lbs = case ByteString -> Either FilePath a
forall a. FromJSON a => ByteString -> Either FilePath a
eitherDecode' ByteString
s of
      Right a
x -> (a, ByteString) -> Maybe (a, ByteString)
forall a. a -> Maybe a
Just (a
x, ByteString
srest)
      Left FilePath
_ -> Maybe (a, ByteString)
forall a. Maybe a
Nothing
      where
        (ByteString
s, ByteString
srest) = ByteString -> (ByteString, ByteString)
chopLBS ByteString
lbs -- LBS.span (== BS.c2w '\n') lbs


-- * utilities

-- | 'span' for a strict bytestring encoding a JSONL record
chopBS8 :: BS.ByteString -> (BS.ByteString, BS.ByteString)
chopBS8 :: ByteString -> (ByteString, ByteString)
chopBS8 ByteString
lbs = (ByteString
s, Int -> ByteString -> ByteString
BS8.drop Int
1 ByteString
srest)
  where (ByteString
s, ByteString
srest) = (Char -> Bool) -> ByteString -> (ByteString, ByteString)
BS8.span (Char -> Char -> Bool
forall a. Eq a => a -> a -> Bool
/= Char
'\n') ByteString
lbs

-- | 'span' for a lazy bytestring encoding a JSONL record
chopLBS :: LBS.ByteString -> (LBS.ByteString, LBS.ByteString)
chopLBS :: ByteString -> (ByteString, ByteString)
chopLBS ByteString
lbs = (ByteString
s, Int64 -> ByteString -> ByteString
LBS.drop Int64
1 ByteString
srest)
  where (ByteString
s, ByteString
srest) = (Word8 -> Bool) -> ByteString -> (ByteString, ByteString)
LBS.span (Word8 -> Word8 -> Bool
forall a. Eq a => a -> a -> Bool
/= Char -> Word8
BS.c2w Char
'\n') ByteString
lbs