module Data.Csv.Conduit (
CsvParseError(..)
, fromCsv
, fromNamedCsv
, fromCsvStreamError
, fromNamedCsvStreamError
, toCsv
) where
import LocalPrelude
import Control.Monad.Error.Class ( MonadError(..) )
import Data.Bifunctor ( first )
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Conduit ( Conduit, await, yield )
import Data.Conduit.List ( map, mapM )
import Data.Csv ( FromNamedRecord, FromRecord, ToRecord, DecodeOptions, EncodeOptions, HasHeader, encodeWith )
import Data.Csv.Incremental ( HeaderParser(..), Parser(..), decodeByNameWith, decodeWith )
import Data.Foldable ( mapM_ )
data CsvParseError =
CsvParseError BS.ByteString String
| IncrementalError String
fromCsv :: (Show a, Monad m, FromRecord a, MonadError CsvParseError m) => DecodeOptions -> HasHeader -> Conduit BS.ByteString m a
fromCsv opts h = terminatingStreamParser $ decodeWith opts h
fromNamedCsv :: (Show a, Monad m, FromNamedRecord a, MonadError CsvParseError m) => DecodeOptions -> Conduit BS.ByteString m a
fromNamedCsv opts = terminatingStreamHeaderParser $ decodeByNameWith opts
fromCsvStreamError :: (Monad m, FromRecord a) => DecodeOptions -> HasHeader -> Conduit BS.ByteString m (Either CsvParseError a)
fromCsvStreamError opts h = streamParser $ decodeWith opts h
fromNamedCsvStreamError :: (Monad m, FromNamedRecord a) => DecodeOptions -> Conduit BS.ByteString m (Either CsvParseError a)
fromNamedCsvStreamError opts = streamHeaderParser $ decodeByNameWith opts
toCsv :: (Monad m, ToRecord a) => EncodeOptions -> Conduit a m BS.ByteString
toCsv opts = map $ BSL.toStrict . encodeWith opts . pure
streamHeaderParser :: (Monad m) => HeaderParser (Parser a) -> Conduit BS.ByteString m (Either CsvParseError a)
streamHeaderParser (FailH rest errMsg) = yield $ Left $ CsvParseError rest errMsg
streamHeaderParser (PartialH p) = await >>= maybe (return ()) (streamHeaderParser . p)
streamHeaderParser (DoneH _ p) = streamParser p
streamParser :: (Monad m) => Parser a -> Conduit BS.ByteString m (Either CsvParseError a)
streamParser (Fail rest errMsg) = yield $ Left $ CsvParseError rest errMsg
streamParser (Many rs p) = do
mapM_ (yield . first IncrementalError) rs
more <- await
maybe (return ()) (streamParser . p) more
streamParser (Done rs) = mapM_ (yield . first IncrementalError) rs
terminatingStreamHeaderParser
:: (Show a, Monad m, MonadError CsvParseError m)
=> HeaderParser (Parser a)
-> Conduit BS.ByteString m a
terminatingStreamHeaderParser (FailH rest errMsg) = mapM $ const $ throwError $ CsvParseError rest errMsg
terminatingStreamHeaderParser (PartialH p) = await >>= maybe (return ()) (terminatingStreamHeaderParser . p)
terminatingStreamHeaderParser (DoneH _ p) = terminatingStreamParser p
terminatingStreamParser
:: (Show a, Monad m, MonadError CsvParseError m)
=> Parser a
-> Conduit BS.ByteString m a
terminatingStreamParser (Fail rest errMsg) = mapM $ const $ throwError $ CsvParseError rest errMsg
terminatingStreamParser (Many ers p) =
let
errorHandler :: (Monad m, MonadError CsvParseError m) => String -> Conduit BS.ByteString m a
errorHandler = mapM . const . throwError . IncrementalError
safeResultHandler
:: (Show a, Monad m, MonadError CsvParseError m)
=> (BS.ByteString -> Parser a)
-> (Parser a -> Conduit BS.ByteString m a)
-> [a]
-> Conduit BS.ByteString m a
safeResultHandler p' f rs = do
mapM_ yield rs
await >>= maybe (return ()) (f . p')
in
either errorHandler (safeResultHandler p terminatingStreamParser) (sequence ers)
terminatingStreamParser (Done rs) = either (mapM . const . throwError . IncrementalError) (mapM_ yield) (sequence rs)