module Data.Csv.Conduit (
CsvParseError(..)
, fromCsv
, fromCsvLiftError
, fromNamedCsv
, fromNamedCsvLiftError
, fromCsvStreamError
, fromNamedCsvStreamError
, toCsv
) where
import LocalPrelude
import Control.Monad.Error.Class ( MonadError(..) )
import Data.Bifunctor ( first )
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Conduit ( Conduit, await, yield )
import Data.Conduit.List ( map, mapM )
import Data.Csv ( FromNamedRecord, FromRecord, ToRecord, DecodeOptions, EncodeOptions, HasHeader, encodeWith )
import Data.Csv.Incremental ( HeaderParser(..), Parser(..), decodeByNameWith, decodeWith )
import Data.Foldable ( mapM_ )
data CsvParseError =
CsvParseError BS.ByteString String
| IncrementalError String
fromCsv :: (FromRecord a, MonadError CsvParseError m) => DecodeOptions -> HasHeader -> Conduit BS.ByteString m a
fromCsv = fromCsvLiftError id
fromCsvLiftError :: (FromRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> HasHeader -> Conduit BS.ByteString m a
fromCsvLiftError f opts h = terminatingStreamParser f $ decodeWith opts h
fromNamedCsv :: (FromNamedRecord a, MonadError CsvParseError m) => DecodeOptions -> Conduit BS.ByteString m a
fromNamedCsv = fromNamedCsvLiftError id
fromNamedCsvLiftError :: (FromNamedRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> Conduit BS.ByteString m a
fromNamedCsvLiftError f opts = terminatingStreamHeaderParser f $ decodeByNameWith opts
fromCsvStreamError :: (Monad m, FromRecord a) => DecodeOptions -> HasHeader -> Conduit BS.ByteString m (Either CsvParseError a)
fromCsvStreamError opts h = streamParser $ decodeWith opts h
fromNamedCsvStreamError :: (Monad m, FromNamedRecord a) => DecodeOptions -> Conduit BS.ByteString m (Either CsvParseError a)
fromNamedCsvStreamError opts = streamHeaderParser $ decodeByNameWith opts
toCsv :: (Monad m, ToRecord a) => EncodeOptions -> Conduit a m BS.ByteString
toCsv opts = map $ BSL.toStrict . encodeWith opts . pure
streamHeaderParser :: (Monad m) => HeaderParser (Parser a) -> Conduit BS.ByteString m (Either CsvParseError a)
streamHeaderParser (FailH rest errMsg) = yield $ Left $ CsvParseError rest errMsg
streamHeaderParser (PartialH p) = await >>= maybe (return ()) (streamHeaderParser . p)
streamHeaderParser (DoneH _ p) = streamParser p
streamParser :: (Monad m) => Parser a -> Conduit BS.ByteString m (Either CsvParseError a)
streamParser (Fail rest errMsg) = yield $ Left $ CsvParseError rest errMsg
streamParser (Many rs p) = do
mapM_ (yield . first IncrementalError) rs
more <- await
maybe (return ()) (streamParser . p) more
streamParser (Done rs) = mapM_ (yield . first IncrementalError) rs
terminatingStreamHeaderParser
:: (Monad m, MonadError e m)
=> (CsvParseError -> e)
-> HeaderParser (Parser a)
-> Conduit BS.ByteString m a
terminatingStreamHeaderParser f (FailH rest errMsg) = mapM $ const $ throwError $ f $ CsvParseError rest errMsg
terminatingStreamHeaderParser f (PartialH p) = await >>= maybe (return ()) (terminatingStreamHeaderParser f . p)
terminatingStreamHeaderParser f (DoneH _ p) = terminatingStreamParser f p
terminatingStreamParser
:: (Monad m, MonadError e m)
=> (CsvParseError -> e)
-> Parser a
-> Conduit BS.ByteString m a
terminatingStreamParser f (Fail rest errMsg) = mapM $ const $ throwError $ f $ CsvParseError rest errMsg
terminatingStreamParser f (Many ers p) =
let
errorHandler :: (Monad m, MonadError e m) => (CsvParseError -> e) -> String -> Conduit BS.ByteString m a
errorHandler f' = mapM . const . throwError . f' . IncrementalError
safeResultHandler
:: (Monad m)
=> (BS.ByteString -> Parser a)
-> (Parser a -> Conduit BS.ByteString m a)
-> [a]
-> Conduit BS.ByteString m a
safeResultHandler p' f' rs = do
mapM_ yield rs
await >>= maybe (return ()) (f' . p')
in
either (errorHandler f) (safeResultHandler p (terminatingStreamParser f)) (sequence ers)
terminatingStreamParser f (Done rs) = either (mapM . const . throwError . f . IncrementalError) (mapM_ yield) (sequence rs)