module Data.Csv.Conduit (
CsvParseError(..)
, CsvStreamHaltParseError(..)
, CsvStreamRecordParseError(..)
, fromCsv
, fromCsvLiftError
, fromNamedCsv
, fromNamedCsvLiftError
, fromCsvStreamError
, fromNamedCsvStreamError
, toCsv
) where
import LocalPrelude
import Control.Monad.Trans ( MonadTrans(..) )
import Control.Monad.Error.Class ( MonadError(..) )
import Data.Bifunctor ( first )
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Conduit ( Conduit, await, yield )
import Data.Conduit.List ( map, mapM )
import Data.Csv ( FromNamedRecord, FromRecord, ToRecord, DecodeOptions, EncodeOptions, HasHeader, encodeWith )
import Data.Csv.Incremental ( HeaderParser(..), Parser(..), decodeByNameWith, decodeWith )
import Data.Foldable ( mapM_ )
import qualified Data.Text as T
data CsvParseError =
CsvParseError BS.ByteString T.Text
| IncrementalError T.Text
deriving (Show, Eq)
data CsvStreamHaltParseError = HaltingCsvParseError BS.ByteString T.Text
deriving (Show, Eq)
data CsvStreamRecordParseError =
CsvStreamRecordParseError T.Text
deriving (Show, Eq)
fromCsv :: (FromRecord a, MonadError CsvParseError m) => DecodeOptions -> HasHeader -> Conduit BS.ByteString m a
fromCsv = fromCsvLiftError id
fromCsvLiftError :: (FromRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> HasHeader -> Conduit BS.ByteString m a
fromCsvLiftError f opts h = terminatingStreamParser f $ decodeWith opts h
fromNamedCsv :: (FromNamedRecord a, MonadError CsvParseError m) => DecodeOptions -> Conduit BS.ByteString m a
fromNamedCsv = fromNamedCsvLiftError id
fromNamedCsvLiftError :: (FromNamedRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> Conduit BS.ByteString m a
fromNamedCsvLiftError f opts = terminatingStreamHeaderParser f $ decodeByNameWith opts
fromCsvStreamError :: (FromRecord a, MonadError e m) => DecodeOptions -> HasHeader -> (CsvStreamHaltParseError -> e) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
fromCsvStreamError opts h f = streamParser f $ decodeWith opts h
fromNamedCsvStreamError :: (FromNamedRecord a, MonadError e m) => DecodeOptions -> (CsvStreamHaltParseError -> e) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
fromNamedCsvStreamError opts f = streamHeaderParser f $ decodeByNameWith opts
toCsv :: (Monad m, ToRecord a) => EncodeOptions -> Conduit a m BS.ByteString
toCsv opts = map $ BSL.toStrict . encodeWith opts . pure
streamHeaderParser :: (MonadError e m) => (CsvStreamHaltParseError -> e) -> HeaderParser (Parser a) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
streamHeaderParser f (FailH rest errMsg) = lift . throwError . f $ HaltingCsvParseError rest (T.pack errMsg)
streamHeaderParser f (PartialH p) = await >>= maybe (return ()) (streamHeaderParser f . p)
streamHeaderParser f (DoneH _ p) = streamParser f p
streamParser :: (MonadError e m) => (CsvStreamHaltParseError -> e) -> Parser a -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
streamParser f (Fail rest errMsg) = lift . throwError . f $ HaltingCsvParseError rest (T.pack errMsg)
streamParser f (Many rs p) = do
mapM_ (yield . first (CsvStreamRecordParseError . T.pack)) rs
more <- await
maybe (return ()) (streamParser f . p) more
streamParser _ (Done rs) = mapM_ (yield . first (CsvStreamRecordParseError . T.pack)) rs
terminatingStreamHeaderParser
:: (Monad m, MonadError e m)
=> (CsvParseError -> e)
-> HeaderParser (Parser a)
-> Conduit BS.ByteString m a
terminatingStreamHeaderParser f (FailH rest errMsg) = lift . throwError . f . CsvParseError rest . T.pack $ errMsg
terminatingStreamHeaderParser f (PartialH p) = await >>= maybe (return ()) (terminatingStreamHeaderParser f . p)
terminatingStreamHeaderParser f (DoneH _ p) = terminatingStreamParser f p
terminatingStreamParser
:: (Monad m, MonadError e m)
=> (CsvParseError -> e)
-> Parser a
-> Conduit BS.ByteString m a
terminatingStreamParser f (Fail rest errMsg) = lift . throwError . f . CsvParseError rest . T.pack $ errMsg
terminatingStreamParser f (Many ers p) =
let
errorHandler :: (Monad m, MonadError e m) => (CsvParseError -> e) -> String -> Conduit BS.ByteString m a
errorHandler f' = mapM . const . throwError . f' . IncrementalError . T.pack
safeResultHandler
:: (Monad m)
=> (BS.ByteString -> Parser a)
-> (Parser a -> Conduit BS.ByteString m a)
-> [a]
-> Conduit BS.ByteString m a
safeResultHandler p' f' rs = do
mapM_ yield rs
await >>= maybe (return ()) (f' . p')
in
either (errorHandler f) (safeResultHandler p (terminatingStreamParser f)) (sequence ers)
terminatingStreamParser f (Done rs) = either (lift . throwError . f . IncrementalError . T.pack) (mapM_ yield) (sequence rs)