{-# LANGUAGE FlexibleContexts #-} ------------------------------------------------------------------- -- | -- Module : Data.Csv.Conduit -- Copyright : (C) 2014 -- License : BSD-style (see the file etc/LICENSE.md) -- Maintainer : Dom De Re -- -- Conduit interface for cassava -- ------------------------------------------------------------------- module Data.Csv.Conduit ( -- * Types CsvParseError(..) , CsvStreamHaltParseError(..) , CsvStreamRecordParseError(..) -- * Conduits , fromCsv , fromCsvLiftError , fromNamedCsv , fromNamedCsvLiftError , fromCsvStreamError , fromNamedCsvStreamError , toCsv ) where import LocalPrelude import Control.Monad.Trans (MonadTrans(..)) import Control.Monad.Error.Class (MonadError(..)) import Data.Bifunctor (first) import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL import Data.Conduit (Conduit, await, yield) import Data.Conduit.List (map, mapM) import Data.Csv (FromNamedRecord, FromRecord, ToRecord, DecodeOptions, EncodeOptions, HasHeader, encodeWith) import Data.Csv.Incremental (HeaderParser(..), Parser(..), decodeByNameWith, decodeWith) import Data.Foldable (mapM_) import qualified Data.Text as T data CsvParseError = CsvParseError BS.ByteString T.Text | IncrementalError T.Text deriving (Show, Eq) -- | When you want to include errors in the stream, this error type represents errors that halt the stream. -- They do not appear inside the conduit and will instead get returned from running the conduit. -- data CsvStreamHaltParseError = HaltingCsvParseError BS.ByteString T.Text -- ^ the remaining bytestring that was read in but not parsed yet, and the stringy error msg describing the fail. deriving (Show, Eq) -- | When you want to include errors in the stream, these are the errors that can be included in the stream, -- they are usually problems restricted to individual records, and streaming can resume from the next record -- you just have to decide on something sensible to do with the per record errors. -- data CsvStreamRecordParseError = CsvStreamRecordParseError T.Text -- ^ The stringy error describing why this record could not be parsed. deriving (Show, Eq) -- | -- Streams parsed records, Errors are not received in the stream but instead after the pipeline is executed, -- If you want to handle errors as they come and resume, see `fromCsvStreamError` -- fromCsv :: (FromRecord a, MonadError CsvParseError m) => DecodeOptions -> HasHeader -> Conduit BS.ByteString m a fromCsv = fromCsvLiftError id -- | -- Sometimes your pipeline will involve an error type other than `CsvParseError`, in which case if you provide -- a function to project it into your custom error type, you can use this instead of `fromCsv` -- fromCsvLiftError :: (FromRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> HasHeader -> Conduit BS.ByteString m a fromCsvLiftError f opts h = {-# SCC fromCsvLiftError_p #-} terminatingStreamParser f $ decodeWith opts h -- | -- Parses an instance of `FromNamedRecord`, this conduit drops the Header -- -- Errors are not seen in the pipeline but rather at the end after executing the pipeline, if you want to handle the errors -- as they occur, try `fromNamedCsvStreamError` instead. -- fromNamedCsv :: (FromNamedRecord a, MonadError CsvParseError m) => DecodeOptions -> Conduit BS.ByteString m a fromNamedCsv = fromNamedCsvLiftError id -- | -- Sometimes your pipeline will involve an error type other than `CsvParseError`, in which case if you provide -- a function to project it into your custom error type, you can use this instead of `fromCsv` -- fromNamedCsvLiftError :: (FromNamedRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> Conduit BS.ByteString m a fromNamedCsvLiftError f opts = {-# SCC fromNamedCsv_p #-} terminatingStreamHeaderParser f $ decodeByNameWith opts -- | -- Same as `fromCsv` but allows for errors to be handled in the pipeline instead -- fromCsvStreamError :: (FromRecord a, MonadError e m) => DecodeOptions -> HasHeader -> (CsvStreamHaltParseError -> e) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a) fromCsvStreamError opts h f = {-# SCC fromCsvStreamError_p #-} streamParser f $ decodeWith opts h -- | -- Like `fromNamedCsvStream` but allows for errors to be handled in the pipeline itself. -- fromNamedCsvStreamError :: (FromNamedRecord a, MonadError e m) => DecodeOptions -> (CsvStreamHaltParseError -> e) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a) fromNamedCsvStreamError opts f = {-# SCC fromCsvStreamError_p #-} streamHeaderParser f $ decodeByNameWith opts -- | -- Streams from csv to text, does not create headers... -- toCsv :: (Monad m, ToRecord a) => EncodeOptions -> Conduit a m BS.ByteString toCsv opts = {-# SCC toCsv_p #-} map $ BSL.toStrict . encodeWith opts . pure -- helpers streamHeaderParser :: (MonadError e m) => (CsvStreamHaltParseError -> e) -> HeaderParser (Parser a) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a) streamHeaderParser f (FailH rest errMsg) = {-# SCC streamHeaderParser_FailH_p #-} lift . throwError . f $ HaltingCsvParseError rest (T.pack errMsg) streamHeaderParser f (PartialH p) = {-# SCC streamHeaderParser_PartialH_p #-} await >>= maybe (return ()) (streamHeaderParser f . p) streamHeaderParser f (DoneH _ p) = {-# SCC streamHeaderParser_DoneH_p #-} streamParser f p streamParser :: (MonadError e m) => (CsvStreamHaltParseError -> e) -> Parser a -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a) streamParser f (Fail rest errMsg) = {-# SCC streamParser_Fail_p #-} lift . throwError . f $ HaltingCsvParseError rest (T.pack errMsg) streamParser f (Many rs p) = {-# SCC streamParser_Many_p #-} do -- send the results down the stream.. mapM_ (yield . first (CsvStreamRecordParseError . T.pack)) rs -- wait for more.. more <- await maybe (return ()) (streamParser f . p) more streamParser _ (Done rs) = {-# SCC streamParser_Done_p #-} mapM_ (yield . first (CsvStreamRecordParseError . T.pack)) rs terminatingStreamHeaderParser :: (Monad m, MonadError e m) => (CsvParseError -> e) -> HeaderParser (Parser a) -> Conduit BS.ByteString m a terminatingStreamHeaderParser f (FailH rest errMsg) = {-# SCC terminatingStreamHeaderParser_FailH_p #-} lift . throwError . f . CsvParseError rest . T.pack $ errMsg terminatingStreamHeaderParser f (PartialH p) = {-# SCC terminatingStreamHeaderParser_PartialH_p #-} await >>= maybe (return ()) (terminatingStreamHeaderParser f . p) terminatingStreamHeaderParser f (DoneH _ p) = {-# SCC terminatingStreamHeaderParser_DoneH_p #-} terminatingStreamParser f p terminatingStreamParser :: (Monad m, MonadError e m) => (CsvParseError -> e) -> Parser a -> Conduit BS.ByteString m a terminatingStreamParser f (Fail rest errMsg) = {-# SCC terminatingStreamParser_Fail_p #-} lift . throwError . f . CsvParseError rest . T.pack $ errMsg terminatingStreamParser f (Many ers p) = {-# SCC terminatingStreamParser_Many_p #-} let errorHandler :: (Monad m, MonadError e m) => (CsvParseError -> e) -> String -> Conduit BS.ByteString m a errorHandler f' = mapM . const . throwError . f' . IncrementalError . T.pack safeResultHandler :: (Monad m) => (BS.ByteString -> Parser a) -> (Parser a -> Conduit BS.ByteString m a) -> [a] -> Conduit BS.ByteString m a safeResultHandler p' f' rs = do mapM_ yield rs -- wait for more.. await >>= f' . p' . fromMaybe BS.empty in -- send the results down the stream.. either (errorHandler f) (safeResultHandler p (terminatingStreamParser f)) (sequence ers) terminatingStreamParser f (Done rs) = {-# SCC terminatingStreamParser_Done_p #-} either (lift . throwError . f . IncrementalError . T.pack) (mapM_ yield) (sequence rs)