{-# LANGUAGE FlexibleContexts #-}
module Data.Csv.Conduit (
CsvParseError(..)
, CsvStreamHaltParseError(..)
, CsvStreamRecordParseError(..)
, fromCsv
, fromCsvLiftError
, fromNamedCsv
, fromNamedCsvLiftError
, fromCsvStreamError
, fromNamedCsvStreamError
, toCsv
) where
import LocalPrelude
import Control.Monad.Trans (MonadTrans(..))
import Control.Monad.Error.Class (MonadError(..))
import Data.Bifunctor (first)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Conduit (ConduitT, await, yield)
import Data.Conduit.List (map, mapM)
import Data.Csv (FromNamedRecord, FromRecord, ToRecord, DecodeOptions, EncodeOptions, HasHeader, encodeWith)
import Data.Csv.Incremental (HeaderParser(..), Parser(..), decodeByNameWith, decodeWith)
import Data.Foldable (mapM_)
import qualified Data.Text as T
data CsvParseError =
CsvParseError BS.ByteString T.Text
| IncrementalError T.Text
deriving (Show, Eq)
data CsvStreamHaltParseError =
HaltingCsvParseError BS.ByteString T.Text
deriving (Show, Eq)
data CsvStreamRecordParseError =
CsvStreamRecordParseError T.Text
deriving (Show, Eq)
fromCsv :: (FromRecord a, MonadError CsvParseError m) => DecodeOptions -> HasHeader -> ConduitT BS.ByteString a m ()
fromCsv = fromCsvLiftError id
fromCsvLiftError :: (FromRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> HasHeader -> ConduitT BS.ByteString a m ()
fromCsvLiftError f opts h = {-# SCC fromCsvLiftError_p #-} terminatingStreamParser f $ decodeWith opts h
fromNamedCsv :: (FromNamedRecord a, MonadError CsvParseError m) => DecodeOptions -> ConduitT BS.ByteString a m ()
fromNamedCsv = fromNamedCsvLiftError id
fromNamedCsvLiftError :: (FromNamedRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> ConduitT BS.ByteString a m ()
fromNamedCsvLiftError f opts = {-# SCC fromNamedCsv_p #-} terminatingStreamHeaderParser f $ decodeByNameWith opts
fromCsvStreamError :: (FromRecord a, MonadError e m) => DecodeOptions -> HasHeader -> (CsvStreamHaltParseError -> e) -> ConduitT BS.ByteString (Either CsvStreamRecordParseError a) m ()
fromCsvStreamError opts h f = {-# SCC fromCsvStreamError_p #-} streamParser f $ decodeWith opts h
fromNamedCsvStreamError :: (FromNamedRecord a, MonadError e m) => DecodeOptions -> (CsvStreamHaltParseError -> e) -> ConduitT BS.ByteString (Either CsvStreamRecordParseError a) m ()
fromNamedCsvStreamError opts f = {-# SCC fromCsvStreamError_p #-} streamHeaderParser f $ decodeByNameWith opts
toCsv :: (Monad m, ToRecord a) => EncodeOptions -> ConduitT a BS.ByteString m ()
toCsv opts = {-# SCC toCsv_p #-} map $ BSL.toStrict . encodeWith opts . pure
streamHeaderParser :: (MonadError e m) => (CsvStreamHaltParseError -> e) -> HeaderParser (Parser a) -> ConduitT BS.ByteString (Either CsvStreamRecordParseError a) m ()
streamHeaderParser f (FailH rest errMsg) = {-# SCC streamHeaderParser_FailH_p #-} lift . throwError . f $ HaltingCsvParseError rest (T.pack errMsg)
streamHeaderParser f (PartialH p) = {-# SCC streamHeaderParser_PartialH_p #-} await >>= maybe (return ()) (streamHeaderParser f . p)
streamHeaderParser f (DoneH _ p) = {-# SCC streamHeaderParser_DoneH_p #-} streamParser f p
streamParser :: (MonadError e m) => (CsvStreamHaltParseError -> e) -> Parser a -> ConduitT BS.ByteString (Either CsvStreamRecordParseError a) m ()
streamParser f (Fail rest errMsg) = {-# SCC streamParser_Fail_p #-} lift . throwError . f $ HaltingCsvParseError rest (T.pack errMsg)
streamParser f (Many rs p) = {-# SCC streamParser_Many_p #-} do
mapM_ (yield . first (CsvStreamRecordParseError . T.pack)) rs
more <- await
maybe (return ()) (streamParser f . p) more
streamParser _ (Done rs) = {-# SCC streamParser_Done_p #-} mapM_ (yield . first (CsvStreamRecordParseError . T.pack)) rs
terminatingStreamHeaderParser
:: (Monad m, MonadError e m)
=> (CsvParseError -> e)
-> HeaderParser (Parser a)
-> ConduitT BS.ByteString a m ()
terminatingStreamHeaderParser f (FailH rest errMsg) = {-# SCC terminatingStreamHeaderParser_FailH_p #-} lift . throwError . f . CsvParseError rest . T.pack $ errMsg
terminatingStreamHeaderParser f (PartialH p) = {-# SCC terminatingStreamHeaderParser_PartialH_p #-} await >>= maybe (return ()) (terminatingStreamHeaderParser f . p)
terminatingStreamHeaderParser f (DoneH _ p) = {-# SCC terminatingStreamHeaderParser_DoneH_p #-} terminatingStreamParser f p
terminatingStreamParser
:: (Monad m, MonadError e m)
=> (CsvParseError -> e)
-> Parser a
-> ConduitT BS.ByteString a m ()
terminatingStreamParser f (Fail rest errMsg) = {-# SCC terminatingStreamParser_Fail_p #-} lift . throwError . f . CsvParseError rest . T.pack $ errMsg
terminatingStreamParser f (Many ers p) = {-# SCC terminatingStreamParser_Many_p #-}
let
errorHandler :: (Monad m, MonadError e m) => (CsvParseError -> e) -> String -> ConduitT BS.ByteString a m ()
errorHandler f' = mapM . const . throwError . f' . IncrementalError . T.pack
safeResultHandler
:: (Monad m)
=> (BS.ByteString -> Parser a)
-> (Parser a -> ConduitT BS.ByteString a m ())
-> [a]
-> ConduitT BS.ByteString a m ()
safeResultHandler p' f' rs = do
mapM_ yield rs
await >>= f' . p' . fromMaybe BS.empty
in
either (errorHandler f) (safeResultHandler p (terminatingStreamParser f)) (sequence ers)
terminatingStreamParser f (Done rs) = {-# SCC terminatingStreamParser_Done_p #-} either (lift . throwError . f . IncrementalError . T.pack) (mapM_ yield) (sequence rs)