module Streaming.Cassava
(
decode
, decodeWith
, decodeWithErrors
, CsvParseException (..)
, decodeByName
, decodeByNameWith
, decodeByNameWithErrors
, encode
, encodeDefault
, encodeWith
, encodeByName
, encodeByNameDefault
, encodeByNameWith
, FromRecord (..)
, FromNamedRecord (..)
, ToRecord (..)
, ToNamedRecord (..)
, DefaultOrdered (..)
, HasHeader (..)
, Header
, header
, Name
, DecodeOptions(..)
, defaultDecodeOptions
, EncodeOptions(..)
, defaultEncodeOptions
) where
import qualified Data.ByteString as DB
import qualified Data.ByteString.Lazy as DBL
import Data.ByteString.Streaming (ByteString)
import qualified Data.ByteString.Streaming as B
import qualified Data.ByteString.Streaming.Internal as B
import Streaming (Of, Stream)
import qualified Streaming.Prelude as S
import Data.Csv (DecodeOptions(..), DefaultOrdered(..),
EncodeOptions(..), FromNamedRecord(..),
FromRecord(..), Header, Name,
ToNamedRecord(..), ToRecord(..),
defaultDecodeOptions,
defaultEncodeOptions, encIncludeHeader,
header)
import Data.Csv.Incremental (HasHeader(..), HeaderParser(..),
Parser(..))
import qualified Data.Csv.Incremental as CI
import Control.Exception (Exception(..))
import Control.Monad.Error.Class (MonadError, throwError)
import Control.Monad.Trans.Class (lift)
import Data.Bifunctor (first)
import Data.Maybe (fromMaybe)
import Data.String (IsString(..))
import Data.Typeable (Typeable)
decode :: (MonadError CsvParseException m, FromRecord a)
=> HasHeader -> ByteString m r
-> Stream (Of a) m r
decode = decodeWith defaultDecodeOptions
decodeWith :: (MonadError CsvParseException m, FromRecord a)
=> DecodeOptions -> HasHeader
-> ByteString m r -> Stream (Of a) m r
decodeWith opts hdr bs = getValues (decodeWithErrors opts hdr bs)
>>= either (throwError . fst) return
decodeWithErrors :: (Monad m, FromRecord a) => DecodeOptions -> HasHeader
-> ByteString m r
-> Stream (Of (Either CsvParseException a)) m (Either (CsvParseException, ByteString m r) r)
decodeWithErrors opts = runParser . CI.decodeWith opts
runParser :: (Monad m) => Parser a -> ByteString m r
-> Stream (Of (Either CsvParseException a)) m (Either (CsvParseException, ByteString m r) r)
runParser = loop
where
feed f str = (uncurry (loop . f) . fromMaybe (mempty, str))
=<< lift (B.unconsChunk str)
loop p str = case p of
Fail bs err -> return (Left (CsvParseException err, B.consChunk bs str))
Many es get -> withEach es >> feed get str
Done es -> do withEach es
nxt <- lift (B.nextChunk str)
return $ case nxt of
Left r -> Right r
Right _ -> Left ("Unconsumed input", str)
withEach = S.each . map (first CsvParseException)
decodeByName :: (MonadError CsvParseException m, FromNamedRecord a)
=> ByteString m r -> Stream (Of a) m r
decodeByName = decodeByNameWith defaultDecodeOptions
decodeByNameWith :: (MonadError CsvParseException m, FromNamedRecord a)
=> DecodeOptions
-> ByteString m r -> Stream (Of a) m r
decodeByNameWith opts bs = getValues (decodeByNameWithErrors opts bs)
>>= either (throwError . fst) return
decodeByNameWithErrors :: (Monad m, FromNamedRecord a) => DecodeOptions
-> ByteString m r
-> Stream (Of (Either CsvParseException a)) m (Either (CsvParseException, ByteString m r) r)
decodeByNameWithErrors = loopH . CI.decodeByNameWith
where
loopH ph str = case ph of
FailH bs err -> return (Left (CsvParseException err, B.consChunk bs str))
PartialH get -> feedH get str
DoneH _ p -> runParser p str
feedH f str = (uncurry (loopH . f) . fromMaybe (mempty, str))
=<< lift (B.unconsChunk str)
encode :: (ToRecord a, Monad m) => Maybe Header
-> Stream (Of a) m r -> ByteString m r
encode = encodeWith defaultEncodeOptions
encodeDefault :: forall a m r. (ToRecord a, DefaultOrdered a, Monad m)
=> Stream (Of a) m r -> ByteString m r
encodeDefault = encode (Just (headerOrder (undefined :: a)))
encodeWith :: (ToRecord a, Monad m) => EncodeOptions -> Maybe Header
-> Stream (Of a) m r -> ByteString m r
encodeWith opts mhdr = B.fromChunks
. S.concat
. addHeaders
. S.map enc
where
addHeaders = maybe id (S.cons . enc) mhdr
enc :: (ToRecord v) => v -> [DB.ByteString]
enc = DBL.toChunks . CI.encodeWith opts . CI.encodeRecord
encodeByNameDefault :: forall a m r. (DefaultOrdered a, ToNamedRecord a, Monad m)
=> Stream (Of a) m r -> ByteString m r
encodeByNameDefault = encodeByName (headerOrder (undefined :: a))
encodeByName :: (ToNamedRecord a, Monad m) => Header
-> Stream (Of a) m r -> ByteString m r
encodeByName = encodeByNameWith defaultEncodeOptions
encodeByNameWith :: (ToNamedRecord a, Monad m) => EncodeOptions -> Header
-> Stream (Of a) m r -> ByteString m r
encodeByNameWith opts hdr = B.fromChunks
. S.concat
. addHeaders
. S.map enc
where
opts' = opts { encIncludeHeader = False }
addHeaders
| encIncludeHeader opts = S.cons . DBL.toChunks
. CI.encodeWith opts' . CI.encodeRecord $ hdr
| otherwise = id
enc = DBL.toChunks . CI.encodeByNameWith opts' hdr . CI.encodeNamedRecord
getValues :: (MonadError e m) => Stream (Of (Either e a)) m r -> Stream (Of a) m r
getValues = S.mapM (either throwError return)
newtype CsvParseException = CsvParseException String
deriving (Eq, Show, Typeable)
instance IsString CsvParseException where
fromString = CsvParseException
instance Exception CsvParseException where
displayException (CsvParseException e) = "Error parsing csv: " ++ e