{-# LANGUAGE TupleSections #-}
{-# options_ghc -Wno-unused-imports #-}
-- | Streaming interface for JSONL-encoded files, based on @conduit@
--
-- The JSONL (JSON Lines) format : https://jsonlines.org/
module JSONL.Conduit (
  -- * Encode
  jsonToLBSC
  -- ** I/O
  , sinkFileC
  , appendFileC
  -- * Decode
  , jsonFromLBSC
  , jsonFromLBSCE
  -- ** I/O
  , sourceFileC
  , sourceFileCE
  -- * Tokenize only
  , sourceFileC_
  ) where

import Data.Void (Void)

import Control.Monad.IO.Class (MonadIO(..))
import System.IO (IOMode(..), Handle, openBinaryFile)

  -- aeson
import Data.Aeson (ToJSON(..), FromJSON(..), eitherDecode' )
-- bytestring
import qualified Data.ByteString as BS (ByteString, null)
import qualified Data.ByteString.Builder as BBS (toLazyByteString, Builder)
import qualified Data.ByteString.Internal as BS (c2w)
import qualified Data.ByteString.Char8 as BS8 (span, drop, putStrLn, putStr)
import qualified Data.ByteString.Lazy as LBS (ByteString, null, drop, span, toStrict, fromStrict)
-- conduit
import qualified Conduit as C (ConduitT, runConduit, sourceFile, sinkFile, await, yield, mapC, unfoldC, foldMapC, foldlC, printC, sinkIOHandle)
import Conduit ( (.|) , MonadResource)
-- jsonl
import JSONL (jsonLine)

-- | Render a stream of JSON-encodable objects into a `LBS.ByteString`
jsonToLBSC :: (ToJSON a, Monad m) => C.ConduitT a o m LBS.ByteString
jsonToLBSC :: forall a (m :: * -> *) o.
(ToJSON a, Monad m) =>
ConduitT a o m ByteString
jsonToLBSC = Builder -> ByteString
BBS.toLazyByteString forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a (m :: * -> *) o.
(ToJSON a, Monad m) =>
ConduitT a o m Builder
jsonToBuilderC

-- | Render a stream of JSON-encodable objects into a `BSB.Builder`
jsonToBuilderC :: (ToJSON a, Monad m) => C.ConduitT a o m BBS.Builder
jsonToBuilderC :: forall a (m :: * -> *) o.
(ToJSON a, Monad m) =>
ConduitT a o m Builder
jsonToBuilderC = forall (m :: * -> *) b a o.
(Monad m, Monoid b) =>
(a -> b) -> ConduitT a o m b
C.foldMapC forall a. ToJSON a => a -> Builder
jsonLine

-- | Render a stream of JSON-encodable objects into a JSONL file
sinkFileC :: (ToJSON a, MonadResource m) =>
             FilePath -- ^ path of JSONL file to be created
          -> C.ConduitT a o m ()
sinkFileC :: forall a (m :: * -> *) o.
(ToJSON a, MonadResource m) =>
FilePath -> ConduitT a o m ()
sinkFileC FilePath
fpath = forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.mapC forall a. ToJSON a => a -> ByteString
encodeJSONL forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.|
                  forall (m :: * -> *) o.
MonadResource m =>
FilePath -> ConduitT ByteString o m ()
C.sinkFile FilePath
fpath

-- | Like `sinkFileC` but in `AppendMode`, which means that the handle is positioned at the
-- end of the file.
--
-- @since 0.1.1
appendFileC :: (ToJSON a, MonadResource m) =>
               FilePath
            -> C.ConduitT a o m ()
appendFileC :: forall a (m :: * -> *) o.
(ToJSON a, MonadResource m) =>
FilePath -> ConduitT a o m ()
appendFileC FilePath
fpath = forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
C.mapC forall a. ToJSON a => a -> ByteString
encodeJSONL forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.|
                    forall (m :: * -> *) o.
MonadResource m =>
IO Handle -> ConduitT ByteString o m ()
C.sinkIOHandle (FilePath -> IOMode -> IO Handle
openBinaryFile FilePath
fpath IOMode
AppendMode)

encodeJSONL :: ToJSON a => a -> BS.ByteString
encodeJSONL :: forall a. ToJSON a => a -> ByteString
encodeJSONL = ByteString -> ByteString
LBS.toStrict forall b c a. (b -> c) -> (a -> b) -> a -> c
. Builder -> ByteString
BBS.toLazyByteString forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. ToJSON a => a -> Builder
jsonLine

-- | Read a JSONL file and stream the decoded records
--
-- NB : ignores any parsing errors and returns 
sourceFileC :: (MonadResource m, FromJSON a) =>
               FilePath -- ^ path of JSONL file to be read
            -> C.ConduitT () a m ()
sourceFileC :: forall (m :: * -> *) a.
(MonadResource m, FromJSON a) =>
FilePath -> ConduitT () a m ()
sourceFileC FilePath
fpath = forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
C.sourceFile FilePath
fpath forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.|
                    forall (m :: * -> *) a.
(Monad m, FromJSON a) =>
ConduitT ByteString a m ()
parseChunk

parseChunk :: (Monad m, FromJSON a) => C.ConduitT BS.ByteString a m ()
parseChunk :: forall (m :: * -> *) a.
(Monad m, FromJSON a) =>
ConduitT ByteString a m ()
parseChunk = forall {o} {m :: * -> *}.
(FromJSON o, Monad m) =>
ByteString -> ConduitT ByteString o m ()
go forall a. Monoid a => a
mempty
  where
    progress :: ByteString -> ConduitT ByteString o m ()
progress ByteString
acc = case forall a.
FromJSON a =>
ByteString -> Either FilePath (a, ByteString)
chopDecode ByteString
acc of
                     Left FilePath
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
                     Right (o
y, ByteString
srest) -> do
                       forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield o
y
                       ByteString -> ConduitT ByteString o m ()
go ByteString
srest
    go :: ByteString -> ConduitT ByteString o m ()
go ByteString
acc =
      if Bool -> Bool
not (ByteString -> Bool
BS.null ByteString
acc) -- buffer is non empty
      then ByteString -> ConduitT ByteString o m ()
progress ByteString
acc
      else do
        Maybe ByteString
mc <- forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await -- get data from upstream
        case Maybe ByteString
mc of
          Maybe ByteString
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          Just ByteString
x -> ByteString -> ConduitT ByteString o m ()
progress (ByteString
acc forall a. Semigroup a => a -> a -> a
<> ByteString
x)

-- | Read a JSONL file and stream the decoded records
--
-- NB : decoding error messages are in 'Left' values
sourceFileCE :: (MonadResource m, FromJSON a) =>
                FilePath -- ^ path of JSONL file to be read
             -> C.ConduitT () (Either String a) m ()
sourceFileCE :: forall (m :: * -> *) a.
(MonadResource m, FromJSON a) =>
FilePath -> ConduitT () (Either FilePath a) m ()
sourceFileCE FilePath
fpath = forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
C.sourceFile FilePath
fpath forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.|
                     forall (m :: * -> *) a.
(Monad m, FromJSON a) =>
ConduitT ByteString (Either FilePath a) m ()
parseChunkE

parseChunkE :: (Monad m, FromJSON a) => C.ConduitT BS.ByteString (Either String a) m ()
parseChunkE :: forall (m :: * -> *) a.
(Monad m, FromJSON a) =>
ConduitT ByteString (Either FilePath a) m ()
parseChunkE = forall {b} {m :: * -> *}.
(FromJSON b, Monad m) =>
ByteString -> ConduitT ByteString (Either FilePath b) m ()
go forall a. Monoid a => a
mempty
  where
    progress :: ByteString -> ConduitT ByteString (Either FilePath b) m ()
progress ByteString
acc = case forall a.
FromJSON a =>
ByteString -> Either FilePath (a, ByteString)
chopDecode ByteString
acc of
                     Left FilePath
e -> forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield (forall a b. a -> Either a b
Left FilePath
e)
                     Right (b
y, ByteString
srest) -> do
                       forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield forall a b. (a -> b) -> a -> b
$ forall a b. b -> Either a b
Right b
y
                       ByteString -> ConduitT ByteString (Either FilePath b) m ()
go ByteString
srest
    go :: ByteString -> ConduitT ByteString (Either FilePath b) m ()
go ByteString
acc =
      if Bool -> Bool
not (ByteString -> Bool
BS.null ByteString
acc) -- buffer is non empty
      then ByteString -> ConduitT ByteString (Either FilePath b) m ()
progress ByteString
acc
      else do
        Maybe ByteString
mc <- forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await -- get data from upstream
        case Maybe ByteString
mc of
          Maybe ByteString
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
          Just ByteString
x -> ByteString -> ConduitT ByteString (Either FilePath b) m ()
progress (ByteString
acc forall a. Semigroup a => a -> a -> a
<> ByteString
x)


-- | The outgoing stream elements are the lines of the file, i.e guaranteed not to contain newline characters
--
-- NB : In case it wasn't clear, no JSON parsing is done, only string copies
sourceFileC_ :: MonadResource m =>
                FilePath -- ^ path of JSONL file to be read
             -> C.ConduitT () LBS.ByteString m ()
sourceFileC_ :: forall (m :: * -> *).
MonadResource m =>
FilePath -> ConduitT () ByteString m ()
sourceFileC_ FilePath
fpath = forall (m :: * -> *) i.
MonadResource m =>
FilePath -> ConduitT i ByteString m ()
C.sourceFile FilePath
fpath forall (m :: * -> *) a b c r.
Monad m =>
ConduitT a b m () -> ConduitT b c m r -> ConduitT a c m r
.|
                     forall (m :: * -> *).
Monad m =>
ConduitT ByteString ByteString m ()
toLazyLines

toLazyLines :: (Monad m) => C.ConduitT BS.ByteString LBS.ByteString m ()
toLazyLines :: forall (m :: * -> *).
Monad m =>
ConduitT ByteString ByteString m ()
toLazyLines = forall {m :: * -> *}.
Monad m =>
ByteString -> ConduitT ByteString ByteString m ()
go forall a. Monoid a => a
mempty
  where
    go :: ByteString -> ConduitT ByteString ByteString m ()
go ByteString
acc =
      if Bool -> Bool
not (ByteString -> Bool
BS.null ByteString
acc)
      then
        do
          let
            (ByteString
y, ByteString
srest) = ByteString -> (ByteString, ByteString)
chop ByteString
acc
          forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield ByteString
y
          ByteString -> ConduitT ByteString ByteString m ()
go ByteString
srest
      else
        do
          Maybe ByteString
mc <- forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
C.await
          case Maybe ByteString
mc of
            Maybe ByteString
Nothing -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
            Just ByteString
x -> do
              let
                acc' :: ByteString
acc' = ByteString
acc forall a. Semigroup a => a -> a -> a
<> ByteString
x
                (ByteString
y, ByteString
srest) = ByteString -> (ByteString, ByteString)
chop ByteString
acc'
              forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
C.yield ByteString
y
              ByteString -> ConduitT ByteString ByteString m ()
go ByteString
srest


chop :: BS.ByteString -> (LBS.ByteString, BS.ByteString)
chop :: ByteString -> (ByteString, ByteString)
chop ByteString
acc = (ByteString -> ByteString
LBS.fromStrict ByteString
s, ByteString
srest)
  where
    (ByteString
s, ByteString
srest) = ByteString -> (ByteString, ByteString)
chopBS8 ByteString
acc

chopDecode :: FromJSON a =>
              BS.ByteString -> Either String (a, BS.ByteString)
chopDecode :: forall a.
FromJSON a =>
ByteString -> Either FilePath (a, ByteString)
chopDecode ByteString
acc = (, ByteString
srest) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. FromJSON a => ByteString -> Either FilePath a
eitherDecode' (ByteString -> ByteString
LBS.fromStrict ByteString
s)
  where
    (ByteString
s, ByteString
srest) = ByteString -> (ByteString, ByteString)
chopBS8 ByteString
acc



-- | Source a `LBS.ByteString` for JSONL records
--
-- NB in case of a decoding error the stream is stopped
jsonFromLBSC :: (FromJSON a, Monad m) => LBS.ByteString -> C.ConduitT Void a m ()
jsonFromLBSC :: forall a (m :: * -> *).
(FromJSON a, Monad m) =>
ByteString -> ConduitT Void a m ()
jsonFromLBSC = forall (m :: * -> *) b a i.
Monad m =>
(b -> Maybe (a, b)) -> b -> ConduitT i a m ()
C.unfoldC forall {a}. FromJSON a => ByteString -> Maybe (a, ByteString)
mk
  where
    mk :: ByteString -> Maybe (a, ByteString)
mk ByteString
lbs = case forall a. FromJSON a => ByteString -> Either FilePath a
eitherDecode' ByteString
s of
      Right a
x -> forall a. a -> Maybe a
Just (a
x, ByteString
srest)
      Left FilePath
_ -> forall a. Maybe a
Nothing
      where
        (ByteString
s, ByteString
srest) = ByteString -> (ByteString, ByteString)
chopLBS ByteString
lbs

-- | Like 'jsonFromLBSC' but all decoding errors are passed in Left values
jsonFromLBSCE :: (FromJSON a, Monad m) => LBS.ByteString -> C.ConduitT i (Either String a) m ()
jsonFromLBSCE :: forall a (m :: * -> *) i.
(FromJSON a, Monad m) =>
ByteString -> ConduitT i (Either FilePath a) m ()
jsonFromLBSCE = forall (m :: * -> *) b a i.
Monad m =>
(b -> Maybe (a, b)) -> b -> ConduitT i a m ()
C.unfoldC forall {b}.
FromJSON b =>
ByteString -> Maybe (Either FilePath b, ByteString)
mk
  where
    mk :: ByteString -> Maybe (Either FilePath b, ByteString)
mk ByteString
lbs =
      let (ByteString
s, ByteString
srest) = ByteString -> (ByteString, ByteString)
chopLBS ByteString
lbs
      in
        if ByteString -> Bool
LBS.null ByteString
s
        then forall a. Maybe a
Nothing
        else case forall a. FromJSON a => ByteString -> Either FilePath a
eitherDecode' ByteString
s of
          Right b
x -> forall a. a -> Maybe a
Just (forall a b. b -> Either a b
Right b
x, ByteString
srest)
          Left FilePath
e -> forall a. a -> Maybe a
Just (forall a b. a -> Either a b
Left FilePath
e, ByteString
srest)


-- * utilities

-- | 'span' for a strict bytestring encoding a JSONL record
chopBS8 :: BS.ByteString -> (BS.ByteString, BS.ByteString)
chopBS8 :: ByteString -> (ByteString, ByteString)
chopBS8 ByteString
lbs = (ByteString
s, Int -> ByteString -> ByteString
BS8.drop Int
1 ByteString
srest)
  where (ByteString
s, ByteString
srest) = (Char -> Bool) -> ByteString -> (ByteString, ByteString)
BS8.span (forall a. Eq a => a -> a -> Bool
/= Char
'\n') ByteString
lbs

-- | 'span' for a lazy bytestring encoding a JSONL record
chopLBS :: LBS.ByteString -> (LBS.ByteString, LBS.ByteString)
chopLBS :: ByteString -> (ByteString, ByteString)
chopLBS ByteString
lbs = (ByteString
s, Int64 -> ByteString -> ByteString
LBS.drop Int64
1 ByteString
srest)
  where (ByteString
s, ByteString
srest) = (Word8 -> Bool) -> ByteString -> (ByteString, ByteString)
LBS.span (forall a. Eq a => a -> a -> Bool
/= Char -> Word8
BS.c2w Char
'\n') ByteString
lbs