module Data.Csv.Conduit (
CsvParseError(..)
, fromCsv
, fromCsvStreamError
, toCsv
) where
import LocalPrelude
import Control.Monad.Error.Class ( MonadError(..) )
import Data.Bifunctor ( first )
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Conduit ( Conduit, await, yield )
import Data.Conduit.List ( map, mapM )
import Data.Csv ( FromRecord(..), ToRecord(..), DecodeOptions, EncodeOptions, HasHeader, encodeWith )
import Data.Csv.Incremental ( Parser(..), decodeWith )
import Data.Foldable ( mapM_ )
data CsvParseError =
CsvParseError BS.ByteString String
| IncrementalError String
fromCsv :: (Show a, Monad m, FromRecord a, MonadError CsvParseError m) => DecodeOptions -> HasHeader -> Conduit BS.ByteString m a
fromCsv opts h = terminatingStreamParser $ decodeWith opts h
fromCsvStreamError :: (Monad m, FromRecord a) => DecodeOptions -> HasHeader -> Conduit BS.ByteString m (Either CsvParseError a)
fromCsvStreamError opts h = streamParser $ decodeWith opts h
toCsv :: (Monad m, ToRecord a) => EncodeOptions -> Conduit a m BS.ByteString
toCsv opts = map $ BSL.toStrict . encodeWith opts . pure
streamParser :: (Monad m) => Parser a -> Conduit BS.ByteString m (Either CsvParseError a)
streamParser (Fail rest errMsg) = yield $ Left $ CsvParseError rest errMsg
streamParser (Many rs p) = do
mapM_ (yield . first IncrementalError) rs
more <- await
maybe (return ()) (streamParser . p) more
streamParser (Done rs) = mapM_ (yield . first IncrementalError) rs
terminatingStreamParser
:: (Show a, Monad m, MonadError CsvParseError m)
=> Parser a
-> Conduit BS.ByteString m a
terminatingStreamParser (Fail rest errMsg) = mapM $ const $ throwError $ CsvParseError rest errMsg
terminatingStreamParser (Many ers p) =
let
errorHandler :: (Monad m, MonadError CsvParseError m) => String -> Conduit BS.ByteString m a
errorHandler = mapM . const . throwError . IncrementalError
safeResultHandler
:: (Show a, Monad m, MonadError CsvParseError m)
=> (BS.ByteString -> Parser a)
-> (Parser a -> Conduit BS.ByteString m a)
-> [a]
-> Conduit BS.ByteString m a
safeResultHandler p' f rs = do
mapM_ yield rs
await >>= maybe (return ()) (f . p')
in
either errorHandler (safeResultHandler p terminatingStreamParser) (sequence ers)
terminatingStreamParser (Done rs) = either (mapM . const . throwError . IncrementalError) (mapM_ yield) (sequence rs)