{-# LANGUAGE FlexibleContexts, MultiParamTypeClasses, OverloadedStrings,
             ScopedTypeVariables, LambdaCase #-}

{- |
   Module      : Streamly.Csv
   Description : Cassava support for the streamly library
   Copyright   : (c) Richard Warfield
   License     : BSD 3-clause
   Maintainer  : richard@litx.io

   Stream CSV data in\/out using
   [Cassava](http://hackage.haskell.org/package/cassava).  Adapted from
   [streaming-cassava](http://hackage.haskell.org/package/streaming-cassava).

   For efficiency, operates on streams of strict ByteString chunks 
   @(i.e. IsStream t => t m ByteString)@ rather than directly on streams of Word8. 
   The 'chunkStream' function is useful for generating an input stream from a
   'Handle'.

   Example usage:

   > import Streamly
   > import qualified Streamly.Prelude as S
   > import Streamly.Csv (decode, encode, chunkStream)
   > import System.IO
   > import qualified Data.Csv as Csv
   > import qualified Data.ByteString as BS
   > import Data.Vector (Vector)
   >
   > do
   >   h <- openFile "testfile.csv" ReadMode
   >   let chunks = chunkStream h (64*1024)
   >       recs = decode Csv.HasHeader chunks :: SerialT IO (Vector BS.ByteString)
   >   withFile "dest.csv" WriteMode $ \ho ->
   >     S.mapM_ (BS.hPut ho) $ encode Nothing recs
 -}
module Streamly.Csv
  ( -- * Decoding
    decode
  , decodeWith
  , decodeWithErrors
  , CsvParseException (..)
  , chunkStream
    -- ** Named decoding
  , decodeByName
  , decodeByNameWith
  , decodeByNameWithErrors
    -- * Encoding
  , encode
  , encodeDefault
  , encodeWith
    -- ** Named encoding
  , encodeByName
  , encodeByNameDefault
  , encodeByNameWith
    -- * Re-exports
  , FromRecord (..)
  , FromNamedRecord (..)
  , ToRecord (..)
  , ToNamedRecord (..)
  , DefaultOrdered (..)
  , HasHeader (..)
  , Header
  , header
  , Name
  , DecodeOptions(..)
  , defaultDecodeOptions
  , EncodeOptions(..)
  , defaultEncodeOptions
  ) where

import qualified Data.ByteString                    as BS
import qualified Data.ByteString.Lazy               as BSL
import           Streamly
import qualified Streamly.Prelude                  as S

import           Data.Csv             (DecodeOptions(..), DefaultOrdered(..),
                                       EncodeOptions(..), FromNamedRecord(..),
                                       FromRecord(..), Header, Name,
                                       ToNamedRecord(..), ToRecord(..),
                                       defaultDecodeOptions,
                                       defaultEncodeOptions, encIncludeHeader,
                                       header)
import           Data.Csv.Incremental (HasHeader(..), HeaderParser(..),
                                       Parser(..))
import qualified Data.Csv.Incremental as CI

import System.IO (Handle)
import Control.Exception         (Exception(..))
import Control.Monad.Catch (MonadThrow(..))
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Word             (Word8)
import Data.Bifunctor            (first)
import Data.Maybe                (fromMaybe)
import Data.String               (IsString(..))
import Data.Typeable             (Typeable)

--------------------------------------------------------------------------------

-- | Use 'defaultOptions' for decoding the provided CSV.
decode :: (IsStream t, MonadAsync m, FromRecord a)
       => HasHeader
       -> t m BS.ByteString
       -> t m a
decode = decodeWith defaultDecodeOptions

-- | Return back a stream of values from the provided CSV, stopping at
--   the first error.
--
--   If you wish to instead ignore errors, consider using
--   'decodeWithErrors' with 'S.mapMaybe'
--
--   Any remaining input is discarded.
decodeWith :: (IsStream t, MonadAsync m, FromRecord a)
           => DecodeOptions -> HasHeader
           -> t m BS.ByteString
           -> t m a
decodeWith opts hdr chunks = getValues (decodeWithErrors opts hdr chunks)
                         -- >>= either (throwError . fst) return

-- | Return back a stream with an attempt at type conversion, and
--   either the previous result or any overall parsing errors with the
--   remainder of the input.
decodeWithErrors :: (IsStream t, Monad m, FromRecord a, MonadThrow m)
                 => DecodeOptions -> HasHeader
                 -> t m BS.ByteString
                 -> t m (Either CsvParseException a)
decodeWithErrors opts = runParser . CI.decodeWith opts

runParser :: forall t a m. (IsStream t, Monad m, MonadThrow m)
          => Parser a -> t m BS.ByteString -> t m (Either CsvParseException a)
runParser p chunked = S.concatMap fst $ S.scanlM' continue (S.nil, const p) $
                        S.cons BS.empty chunked
  where
    continue :: (t m (Either CsvParseException a), BS.ByteString -> Parser a)
             -> BS.ByteString
             -> m (t m (Either CsvParseException a), BS.ByteString -> Parser a)
    continue (_, p) chunk =
      case p chunk of
        Fail bs err -> throwM (CsvParseException err)
        Many es get -> return (withEach es, get)
        Done es     -> return (withEach es, p)

    withEach = S.fromList . map (first CsvParseException)

chunkStream :: (IsStream t, MonadAsync m) => Handle -> Int -> t m BS.ByteString
chunkStream h chunkSize = loop
  where
    loop = S.takeWhile (not . BS.null) $
      liftIO (BS.hGetSome h chunkSize) `S.consM` loop

--------------------------------------------------------------------------------

-- | Use 'defaultOptions' for decoding the provided CSV.
decodeByName :: (MonadAsync m, FromNamedRecord a)
                => SerialT m BS.ByteString -> SerialT m a
decodeByName = decodeByNameWith defaultDecodeOptions

-- | Return back a stream of values from the provided CSV, stopping at
--   the first error.
--
--   A header is required to determine the order of columns, but then
--   discarded.
--
--   If you wish to instead ignore errors, consider using
--   'decodeByNameWithErrors' with 'S.mapMaybe'
--
--   Any remaining input is discarded.
decodeByNameWith :: (MonadAsync m, FromNamedRecord a)
                    => DecodeOptions
                    -> SerialT m BS.ByteString -> SerialT m a
decodeByNameWith opts bs = getValues (decodeByNameWithErrors opts bs)
                           -- >>= either (throwError . fst) return

-- | Return back a stream with an attempt at type conversion, but
--   where the order of columns doesn't have to match the order of
--   fields of your actual type.
--
--   This requires\/assumes a header in the CSV stream, which is
--   discarded after parsing.
--
decodeByNameWithErrors :: forall m a. (Monad m, MonadThrow m, FromNamedRecord a)
                       => DecodeOptions
                       -> SerialT m BS.ByteString
                       -> SerialT m (Either CsvParseException a)
decodeByNameWithErrors opts chunked = do
  (p, rest) <- S.yieldM $ extractParser (const $ CI.decodeByNameWith opts) $ S.cons BS.empty chunked
  runParser p rest
  where
    extractParser :: (BS.ByteString -> HeaderParser (Parser a))
                  -> SerialT m BS.ByteString
                  -> m (Parser a, SerialT m BS.ByteString)
    extractParser p chunks = S.uncons chunks >>= \case
      Just (hed, rest) ->
        case p hed of
          FailH bs err -> throwM (CsvParseException err)
          PartialH get -> extractParser get rest
          DoneH _ p    -> return (p, rest)
      Nothing -> throwM $ CsvParseException "Unexpected end of input stream"

-- --------------------------------------------------------------------------------
-- 
-- -- | Encode a stream of values with the default options.
-- --
-- --   Optionally prefix the stream with headers (the 'header' function
-- --   may be useful).
encode :: (IsStream t, ToRecord a, Monad m) => Maybe Header
          -> t m a -> t m BS.ByteString
encode = encodeWith defaultEncodeOptions

-- | Encode a stream of values with the default options and a derived
--   header prefixed.
encodeDefault :: forall a t m. (IsStream t, ToRecord a, DefaultOrdered a, Monad m)
                 => t m a -> t m BS.ByteString
encodeDefault = encode (Just (headerOrder (undefined :: a)))

-- | Encode a stream of values with the provided options.
--
--   Optionally prefix the stream with headers (the 'header' function
--   may be useful).
encodeWith :: (IsStream t, ToRecord a, Monad m)
           => EncodeOptions
           -> Maybe Header
           -> t m a
           -> t m BS.ByteString
encodeWith opts mhdr = S.concatMap S.fromList
                       . addHeaders
                       . S.map enc
  where
    addHeaders = maybe id (S.cons . enc) mhdr

    enc :: (ToRecord v) => v -> [BS.ByteString]
    enc = BSL.toChunks . CI.encodeWith opts . CI.encodeRecord

--------------------------------------------------------------------------------

-- | Use the default ordering to encode all fields\/columns.
encodeByNameDefault :: forall a t m. (IsStream t, DefaultOrdered a, ToNamedRecord a, Monad m)
                       => t m a -> t m BS.ByteString
encodeByNameDefault = encodeByName (headerOrder (undefined :: a))

-- | Select the columns that you wish to encode from your data
--   structure using default options (which currently includes
--   printing the header).
encodeByName :: (IsStream t, ToNamedRecord a, Monad m) => Header
                -> t m a -> t m BS.ByteString
encodeByName = encodeByNameWith defaultEncodeOptions

-- | Select the columns that you wish to encode from your data
--   structure.
--
--   Header printing respects 'encIncludeheader'.
encodeByNameWith :: (IsStream t, ToNamedRecord a, Monad m) => EncodeOptions -> Header
                    -> t m a -> t m BS.ByteString
encodeByNameWith opts hdr = S.concatMap S.fromList
                            . addHeaders
                            . S.map enc
  where
    opts' = opts { encIncludeHeader = False }

    addHeaders
      | encIncludeHeader opts = S.cons . BSL.toChunks
                                . CI.encodeWith opts' . CI.encodeRecord $ hdr
      | otherwise             = id

    enc = BSL.toChunks . CI.encodeByNameWith opts' hdr . CI.encodeNamedRecord

--------------------------------------------------------------------------------

getValues :: (IsStream t, MonadAsync m, Exception e)
          => t m (Either e a) -> t m a
getValues = S.mapM (either throwM return)

newtype CsvParseException = CsvParseException String
  deriving (Eq, Show, Typeable)

instance IsString CsvParseException where
  fromString = CsvParseException

instance Exception CsvParseException where
  displayException (CsvParseException e) = "Error parsing csv: " ++ e