module Data.Csv.Conduit (
CsvParseError(..)
, CsvStreamHaltParseError(..)
, CsvStreamRecordParseError(..)
, fromCsv
, fromCsvLiftError
, fromNamedCsv
, fromNamedCsvLiftError
, fromCsvStreamError
, fromNamedCsvStreamError
, toCsv
) where
import LocalPrelude
import Control.Monad.Trans (MonadTrans(..))
import Control.Monad.Error.Class (MonadError(..))
import Data.Bifunctor (first)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Conduit (ConduitT, await, yield)
import Data.Conduit.List (map, mapM)
import Data.Csv (FromNamedRecord, FromRecord, ToRecord, DecodeOptions, EncodeOptions, HasHeader, encodeWith)
import Data.Csv.Incremental (HeaderParser(..), Parser(..), decodeByNameWith, decodeWith)
import Data.Foldable (mapM_)
import qualified Data.Text as T
data CsvParseError =
CsvParseError BS.ByteString T.Text
| IncrementalError T.Text
deriving (Show, Eq)
data CsvStreamHaltParseError =
HaltingCsvParseError BS.ByteString T.Text
deriving (Show, Eq)
data CsvStreamRecordParseError =
CsvStreamRecordParseError T.Text
deriving (Show, Eq)
fromCsv :: (FromRecord a, MonadError CsvParseError m) => DecodeOptions -> HasHeader -> ConduitT BS.ByteString a m ()
fromCsv = fromCsvLiftError id
fromCsvLiftError :: (FromRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> HasHeader -> ConduitT BS.ByteString a m ()
fromCsvLiftError f opts h = terminatingStreamParser f $ decodeWith opts h
fromNamedCsv :: (FromNamedRecord a, MonadError CsvParseError m) => DecodeOptions -> ConduitT BS.ByteString a m ()
fromNamedCsv = fromNamedCsvLiftError id
fromNamedCsvLiftError :: (FromNamedRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> ConduitT BS.ByteString a m ()
fromNamedCsvLiftError f opts = terminatingStreamHeaderParser f $ decodeByNameWith opts
fromCsvStreamError :: (FromRecord a, MonadError e m) => DecodeOptions -> HasHeader -> (CsvStreamHaltParseError -> e) -> ConduitT BS.ByteString (Either CsvStreamRecordParseError a) m ()
fromCsvStreamError opts h f = streamParser f $ decodeWith opts h
fromNamedCsvStreamError :: (FromNamedRecord a, MonadError e m) => DecodeOptions -> (CsvStreamHaltParseError -> e) -> ConduitT BS.ByteString (Either CsvStreamRecordParseError a) m ()
fromNamedCsvStreamError opts f = streamHeaderParser f $ decodeByNameWith opts
toCsv :: (Monad m, ToRecord a) => EncodeOptions -> ConduitT a BS.ByteString m ()
toCsv opts = map $ BSL.toStrict . encodeWith opts . pure
streamHeaderParser :: (MonadError e m) => (CsvStreamHaltParseError -> e) -> HeaderParser (Parser a) -> ConduitT BS.ByteString (Either CsvStreamRecordParseError a) m ()
streamHeaderParser f (FailH rest errMsg) = lift . throwError . f $ HaltingCsvParseError rest (T.pack errMsg)
streamHeaderParser f (PartialH p) = await >>= maybe (return ()) (streamHeaderParser f . p)
streamHeaderParser f (DoneH _ p) = streamParser f p
streamParser :: (MonadError e m) => (CsvStreamHaltParseError -> e) -> Parser a -> ConduitT BS.ByteString (Either CsvStreamRecordParseError a) m ()
streamParser f (Fail rest errMsg) = lift . throwError . f $ HaltingCsvParseError rest (T.pack errMsg)
streamParser f (Many rs p) = do
mapM_ (yield . first (CsvStreamRecordParseError . T.pack)) rs
more <- await
maybe (return ()) (streamParser f . p) more
streamParser _ (Done rs) = mapM_ (yield . first (CsvStreamRecordParseError . T.pack)) rs
terminatingStreamHeaderParser
:: (Monad m, MonadError e m)
=> (CsvParseError -> e)
-> HeaderParser (Parser a)
-> ConduitT BS.ByteString a m ()
terminatingStreamHeaderParser f (FailH rest errMsg) = lift . throwError . f . CsvParseError rest . T.pack $ errMsg
terminatingStreamHeaderParser f (PartialH p) = await >>= maybe (return ()) (terminatingStreamHeaderParser f . p)
terminatingStreamHeaderParser f (DoneH _ p) = terminatingStreamParser f p
terminatingStreamParser
:: (Monad m, MonadError e m)
=> (CsvParseError -> e)
-> Parser a
-> ConduitT BS.ByteString a m ()
terminatingStreamParser f (Fail rest errMsg) = lift . throwError . f . CsvParseError rest . T.pack $ errMsg
terminatingStreamParser f (Many ers p) =
let
errorHandler :: (Monad m, MonadError e m) => (CsvParseError -> e) -> String -> ConduitT BS.ByteString a m ()
errorHandler f' = mapM . const . throwError . f' . IncrementalError . T.pack
safeResultHandler
:: (Monad m)
=> (BS.ByteString -> Parser a)
-> (Parser a -> ConduitT BS.ByteString a m ())
-> [a]
-> ConduitT BS.ByteString a m ()
safeResultHandler p' f' rs = do
mapM_ yield rs
await >>= f' . p' . fromMaybe BS.empty
in
either (errorHandler f) (safeResultHandler p (terminatingStreamParser f)) (sequence ers)
terminatingStreamParser f (Done rs) = either (lift . throwError . f . IncrementalError . T.pack) (mapM_ yield) (sequence rs)