{-# LANGUAGE FlexibleContexts #-}
-------------------------------------------------------------------
-- |
-- Module       : Data.Csv.Conduit
-- Copyright    : (C) 2014
-- License      : BSD-style (see the file etc/LICENSE.md)
-- Maintainer   : Dom De Re
--
-- Conduit interface for cassava
--
-------------------------------------------------------------------
module Data.Csv.Conduit (
  -- * Types
    CsvParseError(..)
  , CsvStreamHaltParseError(..)
  , CsvStreamRecordParseError(..)
  -- * Conduits
  , fromCsv
  , fromCsvLiftError
  , fromNamedCsv
  , fromNamedCsvLiftError
  , fromCsvStreamError
  , fromCsvStreamErrorNoThrow
  , fromNamedCsvStreamError
  , fromNamedCsvStreamErrorNoThrow
  , toCsv
  ) where

import LocalPrelude

import Control.Monad.Trans (MonadTrans(..))
import Control.Monad.Error.Class (MonadError(..))
import Data.Bifunctor (first)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Conduit (ConduitT, await, yield)
import Data.Conduit.List (map, mapM)
import Data.Csv (FromNamedRecord, FromRecord, ToRecord, DecodeOptions, EncodeOptions, HasHeader, encodeWith)
import Data.Csv.Incremental (HeaderParser(..), Parser(..), decodeByNameWith, decodeWith)
import Data.Foldable (mapM_)
import qualified Data.Text as T

data CsvParseError =
    CsvParseError BS.ByteString T.Text
  | IncrementalError T.Text
    deriving (Int -> CsvParseError -> ShowS
[CsvParseError] -> ShowS
CsvParseError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [CsvParseError] -> ShowS
$cshowList :: [CsvParseError] -> ShowS
show :: CsvParseError -> String
$cshow :: CsvParseError -> String
showsPrec :: Int -> CsvParseError -> ShowS
$cshowsPrec :: Int -> CsvParseError -> ShowS
Show, CsvParseError -> CsvParseError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: CsvParseError -> CsvParseError -> Bool
$c/= :: CsvParseError -> CsvParseError -> Bool
== :: CsvParseError -> CsvParseError -> Bool
$c== :: CsvParseError -> CsvParseError -> Bool
Eq)

-- | When you want to include errors in the stream, this error type represents errors that halt the stream.
-- They do not appear inside the conduit and will instead get returned from running the conduit.
--
data CsvStreamHaltParseError =
  HaltingCsvParseError BS.ByteString T.Text -- ^ the remaining bytestring that was read in but not parsed yet, and the stringy error msg describing the fail.
  deriving (Int -> CsvStreamHaltParseError -> ShowS
[CsvStreamHaltParseError] -> ShowS
CsvStreamHaltParseError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [CsvStreamHaltParseError] -> ShowS
$cshowList :: [CsvStreamHaltParseError] -> ShowS
show :: CsvStreamHaltParseError -> String
$cshow :: CsvStreamHaltParseError -> String
showsPrec :: Int -> CsvStreamHaltParseError -> ShowS
$cshowsPrec :: Int -> CsvStreamHaltParseError -> ShowS
Show, CsvStreamHaltParseError -> CsvStreamHaltParseError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: CsvStreamHaltParseError -> CsvStreamHaltParseError -> Bool
$c/= :: CsvStreamHaltParseError -> CsvStreamHaltParseError -> Bool
== :: CsvStreamHaltParseError -> CsvStreamHaltParseError -> Bool
$c== :: CsvStreamHaltParseError -> CsvStreamHaltParseError -> Bool
Eq)

-- | When you want to include errors in the stream, these are the errors that can be included in the stream,
-- they are usually problems restricted to individual records, and streaming can resume from the next record
-- you just have to decide on something sensible to do with the per record errors.
--
data CsvStreamRecordParseError =
  CsvStreamRecordParseError T.Text -- ^ The stringy error describing why this record could not be parsed.
  deriving (Int -> CsvStreamRecordParseError -> ShowS
[CsvStreamRecordParseError] -> ShowS
CsvStreamRecordParseError -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [CsvStreamRecordParseError] -> ShowS
$cshowList :: [CsvStreamRecordParseError] -> ShowS
show :: CsvStreamRecordParseError -> String
$cshow :: CsvStreamRecordParseError -> String
showsPrec :: Int -> CsvStreamRecordParseError -> ShowS
$cshowsPrec :: Int -> CsvStreamRecordParseError -> ShowS
Show, CsvStreamRecordParseError -> CsvStreamRecordParseError -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: CsvStreamRecordParseError -> CsvStreamRecordParseError -> Bool
$c/= :: CsvStreamRecordParseError -> CsvStreamRecordParseError -> Bool
== :: CsvStreamRecordParseError -> CsvStreamRecordParseError -> Bool
$c== :: CsvStreamRecordParseError -> CsvStreamRecordParseError -> Bool
Eq)

-- |
-- Streams parsed records, Errors are not received in the stream but instead after the pipeline is executed,
-- If you want to handle errors as they come and resume, see `fromCsvStreamError`
--
fromCsv :: (FromRecord a, MonadError CsvParseError m) => DecodeOptions -> HasHeader -> ConduitT BS.ByteString a m ()
fromCsv :: forall a (m :: * -> *).
(FromRecord a, MonadError CsvParseError m) =>
DecodeOptions -> HasHeader -> ConduitT ByteString a m ()
fromCsv = forall a e (m :: * -> *).
(FromRecord a, MonadError e m) =>
(CsvParseError -> e)
-> DecodeOptions -> HasHeader -> ConduitT ByteString a m ()
fromCsvLiftError forall a. a -> a
id

-- |
-- Sometimes your pipeline will involve an error type other than `CsvParseError`, in which case if you provide
-- a function to project it into your custom error type, you can use this instead of `fromCsv`
--
fromCsvLiftError :: (FromRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> HasHeader -> ConduitT BS.ByteString a m ()
fromCsvLiftError :: forall a e (m :: * -> *).
(FromRecord a, MonadError e m) =>
(CsvParseError -> e)
-> DecodeOptions -> HasHeader -> ConduitT ByteString a m ()
fromCsvLiftError CsvParseError -> e
f DecodeOptions
opts HasHeader
h = {-# SCC fromCsvLiftError_p #-} forall (m :: * -> *) e a.
(Monad m, MonadError e m) =>
(CsvParseError -> e) -> Parser a -> ConduitT ByteString a m ()
terminatingStreamParser CsvParseError -> e
f forall a b. (a -> b) -> a -> b
$ forall a. FromRecord a => DecodeOptions -> HasHeader -> Parser a
decodeWith DecodeOptions
opts HasHeader
h


-- |
-- Parses an instance of `FromNamedRecord`, this conduit drops the Header
--
-- Errors are not seen in the pipeline but rather at the end after executing the pipeline, if you want to handle the errors
-- as they occur, try `fromNamedCsvStreamError` instead.
--
fromNamedCsv :: (FromNamedRecord a, MonadError CsvParseError m) => DecodeOptions -> ConduitT BS.ByteString a m ()
fromNamedCsv :: forall a (m :: * -> *).
(FromNamedRecord a, MonadError CsvParseError m) =>
DecodeOptions -> ConduitT ByteString a m ()
fromNamedCsv = forall a e (m :: * -> *).
(FromNamedRecord a, MonadError e m) =>
(CsvParseError -> e) -> DecodeOptions -> ConduitT ByteString a m ()
fromNamedCsvLiftError forall a. a -> a
id

-- |
-- Sometimes your pipeline will involve an error type other than `CsvParseError`, in which case if you provide
-- a function to project it into your custom error type, you can use this instead of `fromCsv`
--
fromNamedCsvLiftError :: (FromNamedRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> ConduitT BS.ByteString a m ()
fromNamedCsvLiftError :: forall a e (m :: * -> *).
(FromNamedRecord a, MonadError e m) =>
(CsvParseError -> e) -> DecodeOptions -> ConduitT ByteString a m ()
fromNamedCsvLiftError CsvParseError -> e
f DecodeOptions
opts = {-# SCC fromNamedCsv_p #-} forall (m :: * -> *) e a.
(Monad m, MonadError e m) =>
(CsvParseError -> e)
-> HeaderParser (Parser a) -> ConduitT ByteString a m ()
terminatingStreamHeaderParser CsvParseError -> e
f forall a b. (a -> b) -> a -> b
$ forall a.
FromNamedRecord a =>
DecodeOptions -> HeaderParser (Parser a)
decodeByNameWith DecodeOptions
opts

-- |
-- Same as 'fromCsv' but allows for errors to be handled in the pipeline instead
--
fromCsvStreamError :: (FromRecord a, MonadError e m) => DecodeOptions -> HasHeader -> (CsvStreamHaltParseError -> e) -> ConduitT BS.ByteString (Either CsvStreamRecordParseError a) m ()
fromCsvStreamError :: forall a e (m :: * -> *).
(FromRecord a, MonadError e m) =>
DecodeOptions
-> HasHeader
-> (CsvStreamHaltParseError -> e)
-> ConduitT ByteString (Either CsvStreamRecordParseError a) m ()
fromCsvStreamError DecodeOptions
opts HasHeader
h CsvStreamHaltParseError -> e
f = {-# SCC fromCsvStreamError_p #-} forall e (m :: * -> *) a.
MonadError e m =>
(CsvStreamHaltParseError -> e)
-> Parser a
-> ConduitT ByteString (Either CsvStreamRecordParseError a) m ()
streamParser CsvStreamHaltParseError -> e
f forall a b. (a -> b) -> a -> b
$ forall a. FromRecord a => DecodeOptions -> HasHeader -> Parser a
decodeWith DecodeOptions
opts HasHeader
h

-- |
-- Same as 'fromCsvStreamError' but allows for halting errors to be handled by the pipeline as well.
--
fromCsvStreamErrorNoThrow :: (Monad m, FromRecord a) => DecodeOptions -> HasHeader -> ConduitT BS.ByteString (Either (Either CsvStreamHaltParseError CsvStreamRecordParseError) a) m ()
fromCsvStreamErrorNoThrow :: forall (m :: * -> *) a.
(Monad m, FromRecord a) =>
DecodeOptions
-> HasHeader
-> ConduitT
     ByteString
     (Either
        (Either CsvStreamHaltParseError CsvStreamRecordParseError) a)
     m
     ()
fromCsvStreamErrorNoThrow DecodeOptions
opts HasHeader
h = {-# SCC fromCsvStreamErrorNoThrow_p #-} forall (m :: * -> *) a.
Monad m =>
Parser a
-> ConduitT
     ByteString
     (Either
        (Either CsvStreamHaltParseError CsvStreamRecordParseError) a)
     m
     ()
streamParserNoThrow forall a b. (a -> b) -> a -> b
$ forall a. FromRecord a => DecodeOptions -> HasHeader -> Parser a
decodeWith DecodeOptions
opts HasHeader
h

-- |
-- Like `fromNamedCsvStream` but allows for errors to be handled in the pipeline itself.
--
fromNamedCsvStreamError :: (FromNamedRecord a, MonadError e m) => DecodeOptions -> (CsvStreamHaltParseError -> e) -> ConduitT BS.ByteString (Either CsvStreamRecordParseError a) m ()
fromNamedCsvStreamError :: forall a e (m :: * -> *).
(FromNamedRecord a, MonadError e m) =>
DecodeOptions
-> (CsvStreamHaltParseError -> e)
-> ConduitT ByteString (Either CsvStreamRecordParseError a) m ()
fromNamedCsvStreamError DecodeOptions
opts CsvStreamHaltParseError -> e
f = {-# SCC fromCsvStreamError_p #-} forall e (m :: * -> *) a.
MonadError e m =>
(CsvStreamHaltParseError -> e)
-> HeaderParser (Parser a)
-> ConduitT ByteString (Either CsvStreamRecordParseError a) m ()
streamHeaderParser CsvStreamHaltParseError -> e
f forall a b. (a -> b) -> a -> b
$ forall a.
FromNamedRecord a =>
DecodeOptions -> HeaderParser (Parser a)
decodeByNameWith DecodeOptions
opts

-- |
-- Like `fromNamedCsvStreamErrorNoThrow` but allows for errors to be handled in the pipeline itself.
--
fromNamedCsvStreamErrorNoThrow :: (Monad m, FromNamedRecord a) => DecodeOptions -> ConduitT BS.ByteString (Either (Either CsvStreamHaltParseError CsvStreamRecordParseError) a) m ()
fromNamedCsvStreamErrorNoThrow :: forall (m :: * -> *) a.
(Monad m, FromNamedRecord a) =>
DecodeOptions
-> ConduitT
     ByteString
     (Either
        (Either CsvStreamHaltParseError CsvStreamRecordParseError) a)
     m
     ()
fromNamedCsvStreamErrorNoThrow DecodeOptions
opts = {-# SCC fromCsvStreamErrorNoThrow_p #-} forall (m :: * -> *) a.
Monad m =>
HeaderParser (Parser a)
-> ConduitT
     ByteString
     (Either
        (Either CsvStreamHaltParseError CsvStreamRecordParseError) a)
     m
     ()
streamHeaderParserNoThrow forall a b. (a -> b) -> a -> b
$ forall a.
FromNamedRecord a =>
DecodeOptions -> HeaderParser (Parser a)
decodeByNameWith DecodeOptions
opts

-- |
-- Streams from csv to text, does not create headers...
--
toCsv :: (Monad m, ToRecord a) => EncodeOptions -> ConduitT a BS.ByteString m ()
toCsv :: forall (m :: * -> *) a.
(Monad m, ToRecord a) =>
EncodeOptions -> ConduitT a ByteString m ()
toCsv EncodeOptions
opts = {-# SCC toCsv_p #-} forall (m :: * -> *) a b. Monad m => (a -> b) -> ConduitT a b m ()
map forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
BSL.toStrict forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. ToRecord a => EncodeOptions -> [a] -> ByteString
encodeWith EncodeOptions
opts forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a. Applicative f => a -> f a
pure

-- helpers

streamHeaderParser :: (MonadError e m) => (CsvStreamHaltParseError -> e) -> HeaderParser (Parser a) -> ConduitT BS.ByteString (Either CsvStreamRecordParseError a) m ()
streamHeaderParser :: forall e (m :: * -> *) a.
MonadError e m =>
(CsvStreamHaltParseError -> e)
-> HeaderParser (Parser a)
-> ConduitT ByteString (Either CsvStreamRecordParseError a) m ()
streamHeaderParser CsvStreamHaltParseError -> e
f (FailH ByteString
rest String
errMsg)  = {-# SCC streamHeaderParser_FailH_p #-} forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError forall b c a. (b -> c) -> (a -> b) -> a -> c
. CsvStreamHaltParseError -> e
f forall a b. (a -> b) -> a -> b
$ ByteString -> Text -> CsvStreamHaltParseError
HaltingCsvParseError ByteString
rest (String -> Text
T.pack String
errMsg)
streamHeaderParser CsvStreamHaltParseError -> e
f (PartialH ByteString -> HeaderParser (Parser a)
p)         = {-# SCC streamHeaderParser_PartialH_p #-} forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (m :: * -> *) a. Monad m => a -> m a
return ()) (forall e (m :: * -> *) a.
MonadError e m =>
(CsvStreamHaltParseError -> e)
-> HeaderParser (Parser a)
-> ConduitT ByteString (Either CsvStreamRecordParseError a) m ()
streamHeaderParser CsvStreamHaltParseError -> e
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> HeaderParser (Parser a)
p)
streamHeaderParser CsvStreamHaltParseError -> e
f (DoneH Header
_ Parser a
p)          = {-# SCC streamHeaderParser_DoneH_p #-} forall e (m :: * -> *) a.
MonadError e m =>
(CsvStreamHaltParseError -> e)
-> Parser a
-> ConduitT ByteString (Either CsvStreamRecordParseError a) m ()
streamParser CsvStreamHaltParseError -> e
f Parser a
p

streamHeaderParserNoThrow :: (Monad m) => HeaderParser (Parser a) -> ConduitT BS.ByteString (Either (Either CsvStreamHaltParseError CsvStreamRecordParseError) a) m ()
streamHeaderParserNoThrow :: forall (m :: * -> *) a.
Monad m =>
HeaderParser (Parser a)
-> ConduitT
     ByteString
     (Either
        (Either CsvStreamHaltParseError CsvStreamRecordParseError) a)
     m
     ()
streamHeaderParserNoThrow (FailH ByteString
rest String
errMsg)  = {-# SCC streamHeaderParserNoThrow_FailH_p #-} forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ ByteString -> Text -> CsvStreamHaltParseError
HaltingCsvParseError ByteString
rest (String -> Text
T.pack String
errMsg)
streamHeaderParserNoThrow (PartialH ByteString -> HeaderParser (Parser a)
p)         = {-# SCC streamHeaderParserNoThrow_PartialH_p #-} forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (m :: * -> *) a. Monad m => a -> m a
return ()) (forall (m :: * -> *) a.
Monad m =>
HeaderParser (Parser a)
-> ConduitT
     ByteString
     (Either
        (Either CsvStreamHaltParseError CsvStreamRecordParseError) a)
     m
     ()
streamHeaderParserNoThrow forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> HeaderParser (Parser a)
p)
streamHeaderParserNoThrow (DoneH Header
_ Parser a
p)          = {-# SCC streamHeaderParserNoThrow_DoneH_p #-} forall (m :: * -> *) a.
Monad m =>
Parser a
-> ConduitT
     ByteString
     (Either
        (Either CsvStreamHaltParseError CsvStreamRecordParseError) a)
     m
     ()
streamParserNoThrow Parser a
p

streamParser :: (MonadError e m) => (CsvStreamHaltParseError -> e) -> Parser a -> ConduitT BS.ByteString (Either CsvStreamRecordParseError a) m ()
streamParser :: forall e (m :: * -> *) a.
MonadError e m =>
(CsvStreamHaltParseError -> e)
-> Parser a
-> ConduitT ByteString (Either CsvStreamRecordParseError a) m ()
streamParser CsvStreamHaltParseError -> e
f (Fail ByteString
rest String
errMsg)   = {-# SCC streamParser_Fail_p #-} forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError forall b c a. (b -> c) -> (a -> b) -> a -> c
. CsvStreamHaltParseError -> e
f forall a b. (a -> b) -> a -> b
$ ByteString -> Text -> CsvStreamHaltParseError
HaltingCsvParseError ByteString
rest (String -> Text
T.pack String
errMsg)
streamParser CsvStreamHaltParseError -> e
f (Many [Either String a]
rs ByteString -> Parser a
p)          = {-# SCC streamParser_Many_p #-} do
  -- send the results down the stream..
  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (Text -> CsvStreamRecordParseError
CsvStreamRecordParseError forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack)) [Either String a]
rs
  -- wait for more..
  Maybe ByteString
more <- forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await
  forall e (m :: * -> *) a.
MonadError e m =>
(CsvStreamHaltParseError -> e)
-> Parser a
-> ConduitT ByteString (Either CsvStreamRecordParseError a) m ()
streamParser CsvStreamHaltParseError -> e
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Parser a
p forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a -> a
fromMaybe ByteString
BS.empty Maybe ByteString
more
streamParser CsvStreamHaltParseError -> e
_ (Done [Either String a]
rs)            = {-# SCC streamParser_Done_p #-} forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (Text -> CsvStreamRecordParseError
CsvStreamRecordParseError forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack)) [Either String a]
rs

streamParserNoThrow :: (Monad m) => Parser a -> ConduitT BS.ByteString (Either (Either CsvStreamHaltParseError CsvStreamRecordParseError) a) m ()
streamParserNoThrow :: forall (m :: * -> *) a.
Monad m =>
Parser a
-> ConduitT
     ByteString
     (Either
        (Either CsvStreamHaltParseError CsvStreamRecordParseError) a)
     m
     ()
streamParserNoThrow (Fail ByteString
rest String
errMsg)   = {-# SCC streamParserNoThrow_Fail_p #-} forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ ByteString -> Text -> CsvStreamHaltParseError
HaltingCsvParseError ByteString
rest (String -> Text
T.pack String
errMsg)
streamParserNoThrow (Many [Either String a]
rs ByteString -> Parser a
p)          = {-# SCC streamParserNoThrow_Many_p #-} do
  -- send the results down the stream..
  forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (forall a b. b -> Either a b
Right forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> CsvStreamRecordParseError
CsvStreamRecordParseError forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack)) [Either String a]
rs
  -- wait for more..
  Maybe ByteString
more <- forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await
  forall (m :: * -> *) a.
Monad m =>
Parser a
-> ConduitT
     ByteString
     (Either
        (Either CsvStreamHaltParseError CsvStreamRecordParseError) a)
     m
     ()
streamParserNoThrow forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Parser a
p forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a -> a
fromMaybe ByteString
BS.empty Maybe ByteString
more
streamParserNoThrow (Done [Either String a]
rs)            = {-# SCC streamParserNoThrow_Done_p #-} forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (forall a b. b -> Either a b
Right forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> CsvStreamRecordParseError
CsvStreamRecordParseError forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack)) [Either String a]
rs

terminatingStreamHeaderParser
  :: (Monad m, MonadError e m)
  => (CsvParseError -> e)
  -> HeaderParser (Parser a)
  -> ConduitT BS.ByteString a m ()
terminatingStreamHeaderParser :: forall (m :: * -> *) e a.
(Monad m, MonadError e m) =>
(CsvParseError -> e)
-> HeaderParser (Parser a) -> ConduitT ByteString a m ()
terminatingStreamHeaderParser CsvParseError -> e
f (FailH ByteString
rest String
errMsg)   = {-# SCC terminatingStreamHeaderParser_FailH_p #-} forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError forall b c a. (b -> c) -> (a -> b) -> a -> c
. CsvParseError -> e
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text -> CsvParseError
CsvParseError ByteString
rest forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
errMsg
terminatingStreamHeaderParser CsvParseError -> e
f (PartialH ByteString -> HeaderParser (Parser a)
p)          = {-# SCC terminatingStreamHeaderParser_PartialH_p #-} forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall b a. b -> (a -> b) -> Maybe a -> b
maybe (forall (m :: * -> *) a. Monad m => a -> m a
return ()) (forall (m :: * -> *) e a.
(Monad m, MonadError e m) =>
(CsvParseError -> e)
-> HeaderParser (Parser a) -> ConduitT ByteString a m ()
terminatingStreamHeaderParser CsvParseError -> e
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> HeaderParser (Parser a)
p)
terminatingStreamHeaderParser CsvParseError -> e
f (DoneH Header
_ Parser a
p)           = {-# SCC terminatingStreamHeaderParser_DoneH_p #-} forall (m :: * -> *) e a.
(Monad m, MonadError e m) =>
(CsvParseError -> e) -> Parser a -> ConduitT ByteString a m ()
terminatingStreamParser CsvParseError -> e
f Parser a
p

terminatingStreamParser
  :: (Monad m, MonadError e m)
  => (CsvParseError -> e)
  -> Parser a
  -> ConduitT BS.ByteString a m ()
terminatingStreamParser :: forall (m :: * -> *) e a.
(Monad m, MonadError e m) =>
(CsvParseError -> e) -> Parser a -> ConduitT ByteString a m ()
terminatingStreamParser CsvParseError -> e
f (Fail ByteString
rest String
errMsg)    = {-# SCC terminatingStreamParser_Fail_p #-} forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError forall b c a. (b -> c) -> (a -> b) -> a -> c
. CsvParseError -> e
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Text -> CsvParseError
CsvParseError ByteString
rest forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack forall a b. (a -> b) -> a -> b
$ String
errMsg
terminatingStreamParser CsvParseError -> e
f (Many [Either String a]
ers ByteString -> Parser a
p)          = {-# SCC terminatingStreamParser_Many_p #-}
  let
    errorHandler :: (Monad m, MonadError e m) => (CsvParseError -> e) -> String -> ConduitT BS.ByteString a m ()
    errorHandler :: forall (m :: * -> *) e a.
(Monad m, MonadError e m) =>
(CsvParseError -> e) -> String -> ConduitT ByteString a m ()
errorHandler CsvParseError -> e
f' = forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> ConduitT a b m ()
mapM forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> b -> a
const forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError forall b c a. (b -> c) -> (a -> b) -> a -> c
. CsvParseError -> e
f' forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> CsvParseError
IncrementalError forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack

    safeResultHandler
      :: (Monad m)
      => (BS.ByteString -> Parser a)
      -> (Parser a -> ConduitT BS.ByteString a m ())
      -> [a]
      -> ConduitT BS.ByteString a m ()
    safeResultHandler :: forall (m :: * -> *) a.
Monad m =>
(ByteString -> Parser a)
-> (Parser a -> ConduitT ByteString a m ())
-> [a]
-> ConduitT ByteString a m ()
safeResultHandler ByteString -> Parser a
p' Parser a -> ConduitT ByteString a m ()
f' [a]
rs = do
      forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield [a]
rs
      -- wait for more..
      forall (m :: * -> *) i o. Monad m => ConduitT i o m (Maybe i)
await forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Parser a -> ConduitT ByteString a m ()
f' forall b c a. (b -> c) -> (a -> b) -> a -> c
. ByteString -> Parser a
p' forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. a -> Maybe a -> a
fromMaybe ByteString
BS.empty
  in
    -- send the results down the stream..
    forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall (m :: * -> *) e a.
(Monad m, MonadError e m) =>
(CsvParseError -> e) -> String -> ConduitT ByteString a m ()
errorHandler CsvParseError -> e
f) (forall (m :: * -> *) a.
Monad m =>
(ByteString -> Parser a)
-> (Parser a -> ConduitT ByteString a m ())
-> [a]
-> ConduitT ByteString a m ()
safeResultHandler ByteString -> Parser a
p (forall (m :: * -> *) e a.
(Monad m, MonadError e m) =>
(CsvParseError -> e) -> Parser a -> ConduitT ByteString a m ()
terminatingStreamParser CsvParseError -> e
f)) (forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence [Either String a]
ers)
terminatingStreamParser CsvParseError -> e
f (Done [Either String a]
rs) = {-# SCC terminatingStreamParser_Done_p #-} forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall e (m :: * -> *) a. MonadError e m => e -> m a
throwError forall b c a. (b -> c) -> (a -> b) -> a -> c
. CsvParseError -> e
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. Text -> CsvParseError
IncrementalError forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> Text
T.pack) (forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall (m :: * -> *) o i. Monad m => o -> ConduitT i o m ()
yield) (forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
sequence [Either String a]
rs)