{-# LANGUAGE FlexibleContexts #-}
-------------------------------------------------------------------
-- |
-- Module       : Data.Csv.Conduit
-- Copyright    : (C) 2014
-- License      : BSD-style (see the file etc/LICENSE.md)
-- Maintainer   : Dom De Re
--
-- Conduit interface for cassava
--
-------------------------------------------------------------------
module Data.Csv.Conduit (
  -- * Types
    CsvParseError(..)
  , CsvStreamHaltParseError(..)
  , CsvStreamRecordParseError(..)
  -- * Conduits
  , fromCsv
  , fromCsvLiftError
  , fromNamedCsv
  , fromNamedCsvLiftError
  , fromCsvStreamError
  , fromNamedCsvStreamError
  , toCsv
  ) where

import LocalPrelude

import Control.Monad.Trans (MonadTrans(..))
import Control.Monad.Error.Class (MonadError(..))
import Data.Bifunctor (first)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Conduit (Conduit, await, yield)
import Data.Conduit.List (map, mapM)
import Data.Csv (FromNamedRecord, FromRecord, ToRecord, DecodeOptions, EncodeOptions, HasHeader, encodeWith)
import Data.Csv.Incremental (HeaderParser(..), Parser(..), decodeByNameWith, decodeWith)
import Data.Foldable (mapM_)
import qualified Data.Text as T

data CsvParseError =
    CsvParseError BS.ByteString T.Text
  | IncrementalError T.Text
    deriving (Show, Eq)

-- | When you want to include errors in the stream, this error type represents errors that halt the stream.
-- They do not appear inside the conduit and will instead get returned from running the conduit.
--
data CsvStreamHaltParseError =
  HaltingCsvParseError BS.ByteString T.Text -- ^ the remaining bytestring that was read in but not parsed yet, and the stringy error msg describing the fail.
  deriving (Show, Eq)

-- | When you want to include errors in the stream, these are the errors that can be included in the stream,
-- they are usually problems restricted to individual records, and streaming can resume from the next record
-- you just have to decide on something sensible to do with the per record errors.
--
data CsvStreamRecordParseError =
  CsvStreamRecordParseError T.Text -- ^ The stringy error describing why this record could not be parsed.
  deriving (Show, Eq)

-- |
-- Streams parsed records, Errors are not received in the stream but instead after the pipeline is executed,
-- If you want to handle errors as they come and resume, see `fromCsvStreamError`
--
fromCsv :: (FromRecord a, MonadError CsvParseError m) => DecodeOptions -> HasHeader -> Conduit BS.ByteString m a
fromCsv = fromCsvLiftError id

-- |
-- Sometimes your pipeline will involve an error type other than `CsvParseError`, in which case if you provide
-- a function to project it into your custom error type, you can use this instead of `fromCsv`
--
fromCsvLiftError :: (FromRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> HasHeader -> Conduit BS.ByteString m a
fromCsvLiftError f opts h = {-# SCC fromCsvLiftError_p #-} terminatingStreamParser f $ decodeWith opts h


-- |
-- Parses an instance of `FromNamedRecord`, this conduit drops the Header
--
-- Errors are not seen in the pipeline but rather at the end after executing the pipeline, if you want to handle the errors
-- as they occur, try `fromNamedCsvStreamError` instead.
--
fromNamedCsv :: (FromNamedRecord a, MonadError CsvParseError m) => DecodeOptions -> Conduit BS.ByteString m a
fromNamedCsv = fromNamedCsvLiftError id

-- |
-- Sometimes your pipeline will involve an error type other than `CsvParseError`, in which case if you provide
-- a function to project it into your custom error type, you can use this instead of `fromCsv`
--
fromNamedCsvLiftError :: (FromNamedRecord a, MonadError e m) => (CsvParseError -> e) -> DecodeOptions -> Conduit BS.ByteString m a
fromNamedCsvLiftError f opts = {-# SCC fromNamedCsv_p #-} terminatingStreamHeaderParser f $ decodeByNameWith opts

-- |
-- Same as `fromCsv` but allows for errors to be handled in the pipeline instead
--
fromCsvStreamError :: (FromRecord a, MonadError e m) => DecodeOptions -> HasHeader -> (CsvStreamHaltParseError -> e) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
fromCsvStreamError opts h f = {-# SCC fromCsvStreamError_p #-} streamParser f $ decodeWith opts h

-- |
-- Like `fromNamedCsvStream` but allows for errors to be handled in the pipeline itself.
--
fromNamedCsvStreamError :: (FromNamedRecord a, MonadError e m) => DecodeOptions -> (CsvStreamHaltParseError -> e) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
fromNamedCsvStreamError opts f = {-# SCC fromCsvStreamError_p #-} streamHeaderParser f $ decodeByNameWith opts

-- |
-- Streams from csv to text, does not create headers...
--
toCsv :: (Monad m, ToRecord a) => EncodeOptions -> Conduit a m BS.ByteString
toCsv opts = {-# SCC toCsv_p #-} map $ BSL.toStrict . encodeWith opts . pure

-- helpers

streamHeaderParser :: (MonadError e m) => (CsvStreamHaltParseError -> e) -> HeaderParser (Parser a) -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
streamHeaderParser f (FailH rest errMsg)  = {-# SCC streamHeaderParser_FailH_p #-} lift . throwError . f $ HaltingCsvParseError rest (T.pack errMsg)
streamHeaderParser f (PartialH p)         = {-# SCC streamHeaderParser_PartialH_p #-} await >>= maybe (return ()) (streamHeaderParser f . p)
streamHeaderParser f (DoneH _ p)          = {-# SCC streamHeaderParser_DoneH_p #-} streamParser f p

streamParser :: (MonadError e m) => (CsvStreamHaltParseError -> e) -> Parser a -> Conduit BS.ByteString m (Either CsvStreamRecordParseError a)
streamParser f (Fail rest errMsg)   = {-# SCC streamParser_Fail_p #-} lift . throwError . f $ HaltingCsvParseError rest (T.pack errMsg)
streamParser f (Many rs p)          = {-# SCC streamParser_Many_p #-} do
  -- send the results down the stream..
  mapM_ (yield . first (CsvStreamRecordParseError . T.pack)) rs
  -- wait for more..
  more <- await
  maybe (return ()) (streamParser f . p) more
streamParser _ (Done rs)            = {-# SCC streamParser_Done_p #-} mapM_ (yield . first (CsvStreamRecordParseError . T.pack)) rs

terminatingStreamHeaderParser
  :: (Monad m, MonadError e m)
  => (CsvParseError -> e)
  -> HeaderParser (Parser a)
  -> Conduit BS.ByteString m a
terminatingStreamHeaderParser f (FailH rest errMsg)   = {-# SCC terminatingStreamHeaderParser_FailH_p #-} lift . throwError . f . CsvParseError rest . T.pack $ errMsg
terminatingStreamHeaderParser f (PartialH p)          = {-# SCC terminatingStreamHeaderParser_PartialH_p #-} await >>= maybe (return ()) (terminatingStreamHeaderParser f . p)
terminatingStreamHeaderParser f (DoneH _ p)           = {-# SCC terminatingStreamHeaderParser_DoneH_p #-} terminatingStreamParser f p

terminatingStreamParser
  :: (Monad m, MonadError e m)
  => (CsvParseError -> e)
  -> Parser a
  -> Conduit BS.ByteString m a
terminatingStreamParser f (Fail rest errMsg)    = {-# SCC terminatingStreamParser_Fail_p #-} lift . throwError . f . CsvParseError rest . T.pack $ errMsg
terminatingStreamParser f (Many ers p)          = {-# SCC terminatingStreamParser_Many_p #-}
  let
    errorHandler :: (Monad m, MonadError e m) => (CsvParseError -> e) -> String -> Conduit BS.ByteString m a
    errorHandler f' = mapM . const . throwError . f' . IncrementalError . T.pack

    safeResultHandler
      :: (Monad m)
      => (BS.ByteString -> Parser a)
      -> (Parser a -> Conduit BS.ByteString m a)
      -> [a]
      -> Conduit BS.ByteString m a
    safeResultHandler p' f' rs = do
      mapM_ yield rs
      -- wait for more..
      await >>= f' . p' . fromMaybe BS.empty
  in
    -- send the results down the stream..
    either (errorHandler f) (safeResultHandler p (terminatingStreamParser f)) (sequence ers)
terminatingStreamParser f (Done rs) = {-# SCC terminatingStreamParser_Done_p #-} either (lift . throwError . f . IncrementalError . T.pack) (mapM_ yield) (sequence rs)