{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeSynonymInstances #-}
{-# LANGUAGE UndecidableInstances #-}
module Data.CSV.Conduit
(
decodeCSV
, readCSVFile
, writeCSVFile
, transformCSV
, transformCSV'
, mapCSVFile
, writeHeaders
, CSV (..)
, CSVSettings (..)
, defCSVSettings
, MapRow
, Row
, runResourceT
) where
import Control.Exception
import Control.Monad.Catch.Pure (CatchT)
import Control.Monad.Catch.Pure (runCatchT)
import Control.Monad.Except
import Control.Monad.Primitive
import Control.Monad.ST
import Control.Monad.Trans.Resource (MonadResource, MonadThrow,
runResourceT)
import Data.Attoparsec.Types (Parser)
import qualified Data.ByteString as B
import Data.ByteString.Char8 (ByteString)
import qualified Data.ByteString.Char8 as B8
import Data.ByteString.Internal (c2w)
import Data.Conduit
import Data.Conduit.Attoparsec
import Data.Conduit.Binary (sinkFile, sinkIOHandle,
sourceFile)
import qualified Data.Conduit.List as C
import qualified Data.Map as M
import Data.String
import Data.Text (Text)
import qualified Data.Text as T
import qualified Data.Text.Encoding as T
import qualified Data.Vector as V
import qualified Data.Vector.Generic as GV
import qualified Data.Vector.Generic.Mutable as GMV
import Data.Void (Void)
import System.IO
import Data.CSV.Conduit.Conversion (FromNamedRecord (..),
Named (..),
ToNamedRecord (..),
runParser)
import qualified Data.CSV.Conduit.Parser.ByteString as BSP
import qualified Data.CSV.Conduit.Parser.Text as TP
import Data.CSV.Conduit.Types
class CSV s r where
rowToStr :: CSVSettings -> r -> s
intoCSV :: (MonadThrow m) => CSVSettings -> ConduitM s r m ()
fromCSV :: Monad m => CSVSettings -> ConduitM r s m ()
instance CSV ByteString (Row ByteString) where
rowToStr s !r =
let
sep = B.pack [c2w (csvSep s)]
wrapField !f = case csvQuoteChar s of
Just !x-> (x `B8.cons` escape x f) `B8.snoc` x
_ -> f
escape c str = B8.intercalate (B8.pack [c,c]) $ B8.split c str
in B.intercalate sep . map wrapField $ r
intoCSV set = intoCSVRow (BSP.row set)
fromCSV set = fromCSVRow set
instance CSV Text (Row Text) where
rowToStr s !r =
let
sep = T.pack [csvSep s]
wrapField !f = case csvQuoteChar s of
Just !x-> x `T.cons` escape x f `T.snoc` x
_ -> f
escape c str = T.intercalate (T.pack [c,c]) $ T.split (== c) str
in T.intercalate sep . map wrapField $ r
intoCSV set = intoCSVRow (TP.row set)
fromCSV set = fromCSVRow set
instance CSV ByteString (Row Text) where
rowToStr s r = T.encodeUtf8 $ rowToStr s r
intoCSV set = intoCSV set .| C.map (map T.decodeUtf8)
fromCSV set = fromCSV set .| C.map T.encodeUtf8
instance CSV ByteString (Row String) where
rowToStr s r = rowToStr s $ map B8.pack r
intoCSV set = intoCSV set .| C.map (map B8.unpack)
fromCSV set = C.map (map B8.pack) .| fromCSV set
instance (CSV s (Row s)) => CSV s (V.Vector s) where
rowToStr s r = rowToStr s . V.toList $ r
intoCSV set = intoCSV set .| C.map (V.fromList)
fromCSV set = C.map (V.toList) .| fromCSV set
fromCSVRow :: (Monad m, IsString s, CSV s r)
=> CSVSettings -> ConduitM r s m ()
fromCSVRow set = awaitForever $ \row -> mapM_ yield [rowToStr set row, "\n"]
intoCSVRow :: (MonadThrow m, AttoparsecInput i) => Parser i (Maybe o) -> ConduitM i o m ()
intoCSVRow p = parse .| puller
where
parse = {-# SCC "conduitParser_p" #-} conduitParser p
puller = {-# SCC "puller" #-}
awaitForever $ \ (_, mrow) -> maybe (return ()) yield mrow
instance (CSV s (Row s'), Ord s', IsString s) => CSV s (MapRow s') where
rowToStr s r = rowToStr s . M.elems $ r
intoCSV set = intoCSVMap set
fromCSV set = fromCSVMap set
intoCSVMap :: (Ord a, MonadThrow m, CSV s [a])
=> CSVSettings -> ConduitM s (MapRow a) m ()
intoCSVMap set = intoCSV set .| (headers >>= converter)
where
headers = do
mrow <- await
case mrow of
Nothing -> return []
Just [] -> headers
Just hs -> return hs
converter hs = awaitForever $ yield . toMapCSV hs
toMapCSV !hs !fs = M.fromList $ zip hs fs
instance (FromNamedRecord a, ToNamedRecord a, CSV s (MapRow ByteString)) =>
CSV s (Named a) where
rowToStr s a = rowToStr s . toNamedRecord . getNamed $ a
intoCSV set = intoCSV set .| C.mapMaybe go
where
go x = either (const Nothing) (Just . Named) $
runParser (parseNamedRecord x)
fromCSV set = C.map go .| fromCSV set
where
go = toNamedRecord . getNamed
fromCSVMap :: (Monad m, IsString s, CSV s [a])
=> CSVSettings -> ConduitM (M.Map k a) s m ()
fromCSVMap set = awaitForever push
where
push r = mapM_ yield [rowToStr set (M.elems r), "\n"]
writeHeaders
:: (Monad m, CSV s (Row r), IsString s)
=> CSVSettings
-> ConduitM (MapRow r) s m ()
writeHeaders set = do
mrow <- await
case mrow of
Nothing -> return ()
Just row -> mapM_ yield [ rowToStr set (M.keys row)
, "\n"
, rowToStr set (M.elems row)
, "\n" ]
readCSVFile
:: (MonadIO m, CSV ByteString a)
=> CSVSettings
-> FilePath
-> m (V.Vector a)
readCSVFile set fp = liftIO . runResourceT $ runConduit $ sourceFile fp .| intoCSV set .| transPipe lift (sinkVector growthFactor)
where
growthFactor = 10
decodeCSV
:: forall v a s. (GV.Vector v a, CSV s a)
=> CSVSettings
-> s
-> Either SomeException (v a)
decodeCSV set bs = runST $ runExceptT pipeline
where
src :: ConduitM () s (ExceptT SomeException (ST s1)) ()
src = C.sourceList [bs]
csvConvert :: ConduitM s a (ExceptT SomeException (ST s1)) ()
csvConvert = transPipe (ExceptT . runCatchT) csvConvert'
csvConvert' :: ConduitM s a (CatchT (ST s1)) ()
csvConvert' = intoCSV set
growthFactor = 10
sink :: ConduitM a Void (ExceptT SomeException (ST s1)) (v a)
sink = sinkVector growthFactor
pipeline :: ExceptT SomeException (ST s1) (v a)
pipeline = runConduit (src .| csvConvert .| sink)
writeCSVFile
:: (CSV ByteString a)
=> CSVSettings
-> FilePath
-> IOMode
-> [a]
-> IO ()
writeCSVFile set fo fmode rows = runResourceT $ runConduit $ do
C.sourceList rows .| fromCSV set .|
sinkIOHandle (openFile fo fmode)
mapCSVFile
:: ( MonadResource m
, CSV ByteString a
, CSV ByteString b
# if MIN_VERSION_resourcet(1,2,0)
, MonadThrow m
#endif
)
=> CSVSettings
-> (a -> [b])
-> FilePath
-> FilePath
-> m ()
mapCSVFile set f fi fo =
transformCSV set (sourceFile fi) (C.concatMap f) (sinkFile fo)
transformCSV
:: (MonadThrow m, CSV s a, CSV s' b)
=> CSVSettings
-> ConduitM () s m ()
-> ConduitM a b m ()
-> ConduitM s' Void m ()
-> m ()
transformCSV set = transformCSV' set set
transformCSV'
:: (MonadThrow m, CSV s a, CSV s' b)
=> CSVSettings
-> CSVSettings
-> ConduitM () s m ()
-> ConduitM a b m ()
-> ConduitM s' Void m ()
-> m ()
transformCSV' setIn setOut source c sink = runConduit $
source .|
intoCSV setIn .|
c .|
fromCSV setOut .|
sink
sinkVector :: (PrimMonad m, GV.Vector v a) => Int -> ConduitM a o m (v a)
sinkVector by = do
v <- lift $ GMV.new by
go 0 v
where
go i v = do
res <- await
case res of
Nothing -> do
v' <- lift $ GV.freeze $ GMV.slice 0 i v
return $! v'
Just x -> do
v' <- case GMV.length v == i of
True -> lift $ GMV.grow v by
False -> return v
lift $ GMV.write v' i x
go (i+1) v'