module Data.Csv.Conduit (
CsvParseError(..)
, CsvStreamHaltParseError(..)
, CsvStreamRecordParseError(..)
, fromCsv
, fromCsvLiftError
, fromNamedCsv
, fromNamedCsvLiftError
, fromCsvStreamError
, fromNamedCsvStreamError
, toCsv
) where
import LocalPrelude
import Control.Monad.Trans (MonadTrans(..))
import Control.Monad.Error.Class (MonadError(..))
import Data.Bifunctor (first)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Conduit (Conduit, await, yield)
import Data.Conduit.List (map, mapM)
import Data.Csv (FromNamedRecord, FromRecord, ToRecord, DecodeOptions, EncodeOptions, HasHeader, encodeWith)
import Data.Csv.Incremental (HeaderParser(..), Parser(..), decodeByNameWith, decodeWith)
import Data.Foldable (mapM_)
import qualified Data.Text as T
data CsvParseError =
CsvParseError BS.ByteString T.Text
| IncrementalError T.Text
deriving (Show, Eq)
data CsvStreamHaltParseError =
HaltingCsvParseError BS.ByteString T.Text
deriving (Show, Eq)
data CsvStreamRecordParseError =
CsvStreamRecordParseError T.Text
deriving (Show, Eq)
fromCsv :: (FromRecord a, MonadError CsvParseError m) => DecodeOptions -> HasHeader -> Conduit BS.ByteString m a
fromCsv = fromCsvLiftError id
fromCsvLiftError :: (FromRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> HasHeader -> Conduit BS.ByteString m a
fromCsvLiftError f opts h = terminatingStreamParser f $ decodeWith opts h
fromNamedCsv :: (FromNamedRecord a, MonadError CsvParseError m) => DecodeOptions -> Conduit BS.ByteString m a
fromNamedCsv = fromNamedCsvLiftError id
fromNamedCsvLiftError :: (FromNamedRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> Conduit BS.ByteString m a
fromNamedCsvLiftError f opts = terminatingStreamHeaderParser f $ decodeByNameWith opts
fromCsvStreamError :: (FromRecord a, MonadError e m) => DecodeOptions -> HasHeader -> (CsvStreamHaltParseError -> e) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
fromCsvStreamError opts h f = streamParser f $ decodeWith opts h
fromNamedCsvStreamError :: (FromNamedRecord a, MonadError e m) => DecodeOptions -> (CsvStreamHaltParseError -> e) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
fromNamedCsvStreamError opts f = streamHeaderParser f $ decodeByNameWith opts
toCsv :: (Monad m, ToRecord a) => EncodeOptions -> Conduit a m BS.ByteString
toCsv opts = map $ BSL.toStrict . encodeWith opts . pure
streamHeaderParser :: (MonadError e m) => (CsvStreamHaltParseError -> e) -> HeaderParser (Parser a) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
streamHeaderParser f (FailH rest errMsg) = lift . throwError . f $ HaltingCsvParseError rest (T.pack errMsg)
streamHeaderParser f (PartialH p) = await >>= maybe (return ()) (streamHeaderParser f . p)
streamHeaderParser f (DoneH _ p) = streamParser f p
streamParser :: (MonadError e m) => (CsvStreamHaltParseError -> e) -> Parser a -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
streamParser f (Fail rest errMsg) = lift . throwError . f $ HaltingCsvParseError rest (T.pack errMsg)
streamParser f (Many rs p) = do
mapM_ (yield . first (CsvStreamRecordParseError . T.pack)) rs
more <- await
maybe (return ()) (streamParser f . p) more
streamParser _ (Done rs) = mapM_ (yield . first (CsvStreamRecordParseError . T.pack)) rs
terminatingStreamHeaderParser
:: (Monad m, MonadError e m)
=> (CsvParseError -> e)
-> HeaderParser (Parser a)
-> Conduit BS.ByteString m a
terminatingStreamHeaderParser f (FailH rest errMsg) = lift . throwError . f . CsvParseError rest . T.pack $ errMsg
terminatingStreamHeaderParser f (PartialH p) = await >>= maybe (return ()) (terminatingStreamHeaderParser f . p)
terminatingStreamHeaderParser f (DoneH _ p) = terminatingStreamParser f p
terminatingStreamParser
:: (Monad m, MonadError e m)
=> (CsvParseError -> e)
-> Parser a
-> Conduit BS.ByteString m a
terminatingStreamParser f (Fail rest errMsg) = lift . throwError . f . CsvParseError rest . T.pack $ errMsg
terminatingStreamParser f (Many ers p) =
let
errorHandler :: (Monad m, MonadError e m) => (CsvParseError -> e) -> String -> Conduit BS.ByteString m a
errorHandler f' = mapM . const . throwError . f' . IncrementalError . T.pack
safeResultHandler
:: (Monad m)
=> (BS.ByteString -> Parser a)
-> (Parser a -> Conduit BS.ByteString m a)
-> [a]
-> Conduit BS.ByteString m a
safeResultHandler p' f' rs = do
mapM_ yield rs
await >>= f' . p' . fromMaybe BS.empty
in
either (errorHandler f) (safeResultHandler p (terminatingStreamParser f)) (sequence ers)
terminatingStreamParser f (Done rs) = either (lift . throwError . f . IncrementalError . T.pack) (mapM_ yield) (sequence rs)