{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE ScopedTypeVariables #-}
module System.IO.Streams.Csv.Decode
( StreamDecodingError (..)
, decodeStream
, decodeStreamWith
, decodeStreamByName
, decodeStreamByNameWith
, onlyValidRecords
) where
import Control.Exception
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Csv hiding (Parser, decodeWith, decodeByNameWith)
import Data.Csv.Incremental
import Data.IORef
import Data.Typeable
import System.IO.Streams (InputStream, makeInputStream)
import qualified System.IO.Streams as Streams
data StreamDecodingError = StreamDecodingError String
deriving (Typeable, Show)
instance Exception StreamDecodingError
decodeStream :: (FromRecord a)
=> HasHeader
-> InputStream ByteString
-> IO (InputStream (Either String a))
decodeStream = decodeStreamWith defaultDecodeOptions
decodeStreamWith :: (FromRecord a)
=> DecodeOptions
-> HasHeader
-> InputStream ByteString
-> IO (InputStream (Either String a))
decodeStreamWith ops hdr input = do
queue <- newIORef []
parser <- newIORef $ Just (decodeWith ops hdr)
makeInputStream (dispatch queue parser input)
decodeStreamByName :: (FromNamedRecord a)
=> InputStream ByteString
-> IO (InputStream (Either String a))
decodeStreamByName = decodeStreamByNameWith defaultDecodeOptions
decodeStreamByNameWith :: (FromNamedRecord a)
=> DecodeOptions
-> InputStream ByteString
-> IO (InputStream (Either String a))
decodeStreamByNameWith ops input = go (decodeByNameWith ops) where
go (FailH _ e) = bomb e
go (PartialH f) = Streams.read input >>= go . maybe (f BS.empty) f
go (DoneH _ p) = do
queue <- newIORef []
parser <- newIORef (Just p)
makeInputStream (dispatch queue parser input)
onlyValidRecords :: InputStream (Either String a)
-> IO (InputStream a)
onlyValidRecords input = makeInputStream $ do
upstream <- Streams.read input
case upstream of
Nothing -> return Nothing
Just (Left err) -> bomb ("invalid CSV row: " ++ err)
Just (Right x) -> return (Just x)
dispatch :: forall a. IORef [Either String a]
-> IORef (Maybe (Parser a))
-> InputStream ByteString
-> IO (Maybe (Either String a))
dispatch queueRef parserRef input = do
queue <- readIORef queueRef
case queue of
[] -> do
parser <- readIORef parserRef
case parser of
Nothing -> return Nothing
Just (Fail _ e) -> bomb ("input data malformed: " ++ e)
Just (Many [] f) -> more f
Just (Many xs f) -> writeIORef parserRef (Just $ Many [] f) >> feed xs
Just (Done xs ) -> writeIORef parserRef Nothing >> feed xs
(x:xs) -> do
writeIORef queueRef xs
return (Just x)
where
more :: (ByteString -> Parser a) -> IO (Maybe (Either String a))
more f = do bs <- Streams.read input
writeIORef parserRef (Just $ maybe (f BS.empty) f bs)
dispatch queueRef parserRef input
feed :: [Either String a] -> IO (Maybe (Either String a))
feed xs = do writeIORef queueRef xs
dispatch queueRef parserRef input
bomb :: String -> IO a
bomb = throwIO . StreamDecodingError