{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE UndecidableInstances #-} {-# LANGUAGE DeriveDataTypeable #-} -- | The main module, exporting types, utility functions, and fuse and connect -- operators. module Data.Conduit ( -- * Types -- ** Source module Data.Conduit.Types.Source -- *** Buffering , BufferedSource , bufferSource , unbufferSource , bsourceClose -- *** Unifying , IsSource -- ** Sink , module Data.Conduit.Types.Sink -- ** Conduit , module Data.Conduit.Types.Conduit , -- * Connect/fuse operators ($$) , ($=) , (=$) , (=$=) -- * Utility functions -- ** Source , module Data.Conduit.Util.Source -- ** Sink , module Data.Conduit.Util.Sink -- ** Conduit , module Data.Conduit.Util.Conduit -- * Convenience re-exports , ResourceT , Resource (..) , ResourceIO , ResourceUnsafeIO , runResourceT , ResourceThrow (..) ) where import Control.Monad.Trans.Resource import Data.Conduit.Types.Source import Data.Conduit.Util.Source import Data.Conduit.Types.Sink import Data.Conduit.Util.Sink import Data.Conduit.Types.Conduit import Data.Conduit.Util.Conduit infixr 0 $$ -- | The connect operator, which pulls data from a source and pushes to a sink. -- There are three ways this process can terminate: -- -- 1. In the case of a @SinkNoData@ constructor, the source is not opened at -- all, and the output value is returned immediately. -- -- 2. The sink returns @Done@, in which case any leftover input is returned via -- @bsourceUnpull@ the source is closed. -- -- 3. The source return @Closed@, in which case the sink is closed. -- -- Note that this function will automatically close any 'Source's, but will not -- close any 'BufferedSource's, allowing them to be reused. -- -- Since 0.0.0 ($$) :: (IsSource src, Resource m) => src m a -> Sink a m b -> ResourceT m b ($$) = connect {-# INLINE ($$) #-} -- | A typeclass allowing us to unify operators for 'Source' and -- 'BufferedSource'. class IsSource src where connect :: Resource m => src m a -> Sink a m b -> ResourceT m b fuseLeft :: Resource m => src m a -> Conduit a m b -> Source m b instance IsSource Source where connect = normalConnect {-# INLINE connect #-} fuseLeft = normalFuseLeft {-# INLINE fuseLeft #-} instance IsSource BufferedSource where connect = bufferedConnect {-# INLINE connect #-} fuseLeft = bufferedFuseLeft {-# INLINE fuseLeft #-} normalConnect :: Resource m => Source m a -> Sink a m b -> ResourceT m b normalConnect (Source msrc) (Sink msink) = do sinkI <- msink case sinkI of SinkNoData output -> return output SinkData push close -> do src <- msrc connect' src push close where connect' src push close = loop where loop = do res <- sourcePull src case res of Closed -> do res' <- close return res' Open a -> do mres <- push a case mres of Done _leftover res' -> do sourceClose src return res' Processing -> loop data FuseLeftState a = FLClosed [a] | FLOpen [a] infixl 1 $= -- | Left fuse, combining a source and a conduit together into a new source. -- -- Since 0.0.0 ($=) :: (IsSource src, Resource m) => src m a -> Conduit a m b -> Source m b ($=) = fuseLeft {-# INLINE ($=) #-} normalFuseLeft :: Resource m => Source m a -> Conduit a m b -> Source m b normalFuseLeft (Source msrc) (Conduit mc) = Source $ do istate <- newRef $ FLOpen [] -- still open, no buffer src <- msrc c <- mc return $ PreparedSource (pull istate src c) (close istate src c) where pull istate src c = do state' <- readRef istate case state' of FLClosed [] -> return Closed FLClosed (x:xs) -> do writeRef istate $ FLClosed xs return $ Open x FLOpen (x:xs) -> do writeRef istate $ FLOpen xs return $ Open x FLOpen [] -> do mres <- sourcePull src case mres of Closed -> do res <- conduitClose c case res of [] -> do writeRef istate $ FLClosed [] return Closed x:xs -> do writeRef istate $ FLClosed xs return $ Open x Open input -> do res' <- conduitPush c input case res' of Producing [] -> pull istate src c Producing (x:xs) -> do writeRef istate $ FLOpen xs return $ Open x Finished _leftover output -> do sourceClose src case output of [] -> do writeRef istate $ FLClosed [] return Closed x:xs -> do writeRef istate $ FLClosed xs return $ Open x close istate src c = do -- Invariant: sourceClose cannot be called twice, so we will assume -- it is currently open. We could add a sanity check here. writeRef istate $ FLClosed [] _ignored <- conduitClose c sourceClose src infixr 0 =$ -- | Right fuse, combining a conduit and a sink together into a new sink. -- -- Since 0.0.0 (=$) :: Resource m => Conduit a m b -> Sink b m c -> Sink a m c Conduit mc =$ Sink ms = Sink $ do s <- ms case s of SinkData pushI closeI -> mc >>= go pushI closeI SinkNoData mres -> return $ SinkNoData mres where go pushI closeI c = do return SinkData { sinkPush = \cinput -> do res <- conduitPush c cinput case res of Producing sinput -> do let push [] = return Processing push (i:is) = do mres <- pushI i case mres of Processing -> push is Done _sleftover res' -> do _ <- conduitClose c return $ Done Nothing res' push sinput Finished cleftover sinput -> do let push [] = closeI push (i:is) = do mres <- pushI i case mres of Processing -> push is Done _sleftover res' -> return res' res' <- push sinput return $ Done cleftover res' , sinkClose = do sinput <- conduitClose c let push [] = closeI push (i:is) = do mres <- pushI i case mres of Processing -> push is Done _sleftover res' -> return res' push sinput } infixr 0 =$= -- | Middle fuse, combining two conduits together into a new conduit. -- -- Since 0.0.0 (=$=) :: Resource m => Conduit a m b -> Conduit b m c -> Conduit a m c Conduit outerM =$= Conduit innerM = Conduit $ do outer <- outerM inner <- innerM return PreparedConduit { conduitPush = \inputO -> do res <- conduitPush outer inputO case res of Producing inputI -> do let push [] front = return $ Producing $ front [] push (i:is) front = do resI <- conduitPush inner i case resI of Producing c -> push is (front . (c ++)) Finished _leftover c -> do _ <- conduitClose outer return $ Finished Nothing $ front c push inputI id Finished leftoverO inputI -> do c <- conduitPushClose inner inputI return $ Finished leftoverO c , conduitClose = do b <- conduitClose outer c <- conduitPushClose inner b return c } -- | Push some data to a conduit, then close it if necessary. conduitPushClose :: Monad m => PreparedConduit a m b -> [a] -> ResourceT m [b] conduitPushClose c [] = conduitClose c conduitPushClose c (input:rest) = do res <- conduitPush c input case res of Finished _ b -> return b Producing b -> do b' <- conduitPushClose c rest return $ b ++ b' -- | When actually interacting with 'Source's, we usually want to be able to -- buffer the output, in case any intermediate steps return leftover data. A -- 'BufferedSource' allows for such buffering. -- -- A 'BufferedSource', unlike a 'Source', is resumable, meaning it can be passed to -- multiple 'Sink's without restarting. -- -- Finally, a 'BufferedSource' relaxes one of the invariants of a 'Source': -- pulling after an the source is closed is allowed. -- -- A @BufferedSource@ is also known as a /resumable source/, in that it can be -- called multiple times, and each time will provide new data. One caveat: -- while the types will allow you to use the buffered source in multiple -- threads, there is no guarantee that all @BufferedSource@s will handle this -- correctly. -- -- Since 0.0.0 data BufferedSource m a = BufferedSource { bsSource :: PreparedSource m a , bsBuffer :: Ref (Base m) (BSState a) } data BSState a = ClosedEmpty | OpenEmpty | ClosedFull a | OpenFull a -- | Prepare a 'Source' and initialize a buffer. Note that you should manually -- call 'bsourceClose' when the 'BufferedSource' is no longer in use. -- -- Since 0.0.0 bufferSource :: Resource m => Source m a -> ResourceT m (BufferedSource m a) bufferSource (Source msrc) = do src <- msrc buf <- newRef OpenEmpty return $ BufferedSource src buf -- | Turn a 'BufferedSource' into a 'Source'. Note that in general this will -- mean your original 'BufferedSource' will be closed. Additionally, all -- leftover data from usage of the returned @Source@ will be discarded. In -- other words: this is a no-going-back move. -- -- Note: @bufferSource@ . @unbufferSource@ is /not/ the identity function. -- -- Since 0.0.1 unbufferSource :: Resource m => BufferedSource m a -> Source m a unbufferSource (BufferedSource src bufRef) = Source $ do buf <- readRef bufRef case buf of OpenEmpty -> return src OpenFull a -> do isUsedRef <- newRef False return PreparedSource { sourcePull = do isUsed <- readRef isUsedRef if isUsed then sourcePull src else do writeRef isUsedRef True return $ Open a , sourceClose = sourceClose src } ClosedEmpty -> return PreparedSource -- Note: we could put some invariant checking in here if we wanted { sourcePull = return Closed , sourceClose = return () } ClosedFull a -> do isUsedRef <- newRef False return PreparedSource { sourcePull = do isUsed <- readRef isUsedRef if isUsed then return Closed else do writeRef isUsedRef True return $ Open a , sourceClose = sourceClose src } bufferedConnect :: Resource m => BufferedSource m a -> Sink a m b -> ResourceT m b bufferedConnect bs (Sink msink) = do sinkI <- msink case sinkI of SinkNoData output -> return output SinkData push close -> do bsState <- readRef $ bsBuffer bs case bsState of ClosedEmpty -> close OpenEmpty -> connect' push close ClosedFull a -> do res <- push a case res of Done mleftover res' -> do writeRef (bsBuffer bs) $ maybe ClosedEmpty ClosedFull mleftover return res' Processing -> do writeRef (bsBuffer bs) ClosedEmpty close OpenFull a -> push a >>= onRes (connect' push close) where connect' push close = loop where loop = do res <- sourcePull $ bsSource bs case res of Closed -> do writeRef (bsBuffer bs) ClosedEmpty res' <- close return res' Open a -> push a >>= onRes loop onRes _ (Done mleftover res) = do writeRef (bsBuffer bs) (maybe OpenEmpty OpenFull mleftover) return res onRes loop Processing = loop bufferedFuseLeft :: Resource m => BufferedSource m a -> Conduit a m b -> Source m b bufferedFuseLeft bsrc (Conduit mc) = Source $ do istate <- newRef $ FLOpen [] -- still open, no buffer c <- mc return $ PreparedSource (pull istate c) (close istate c) where pull istate c = do state' <- readRef istate case state' of FLClosed [] -> return Closed FLClosed (x:xs) -> do writeRef istate $ FLClosed xs return $ Open x FLOpen (x:xs) -> do writeRef istate $ FLOpen xs return $ Open x FLOpen [] -> do mres <- bsourcePull bsrc case mres of Closed -> do res <- conduitClose c case res of [] -> do writeRef istate $ FLClosed [] return Closed x:xs -> do writeRef istate $ FLClosed xs return $ Open x Open input -> do res' <- conduitPush c input case res' of Producing [] -> pull istate c Producing (x:xs) -> do writeRef istate $ FLOpen xs return $ Open x Finished leftover output -> do bsourceUnpull bsrc leftover case output of [] -> do writeRef istate $ FLClosed [] return Closed x:xs -> do writeRef istate $ FLClosed xs return $ Open x close istate c = do writeRef istate $ FLClosed [] _ignored <- conduitClose c return () bsourcePull :: Resource m => BufferedSource m a -> ResourceT m (SourceResult a) bsourcePull (BufferedSource src bufRef) = do buf <- readRef bufRef case buf of OpenEmpty -> do res <- sourcePull src case res of Open _ -> return res Closed -> writeRef bufRef ClosedEmpty >> return Closed ClosedEmpty -> return Closed OpenFull a -> do writeRef bufRef OpenEmpty return $ Open a ClosedFull a -> do writeRef bufRef ClosedEmpty return $ Open a bsourceUnpull :: Resource m => BufferedSource m a -> Maybe a -> ResourceT m () bsourceUnpull _ Nothing = return () bsourceUnpull (BufferedSource _ bufRef) (Just a) = do buf <- readRef bufRef case buf of OpenEmpty -> writeRef bufRef $ OpenFull a ClosedEmpty -> writeRef bufRef $ ClosedFull a _ -> error $ "Invariant violated: bsourceUnpull called on full data" -- | Close the underlying 'PreparedSource' for the given 'BufferedSource'. Note -- that this function can safely be called multiple times, as it will first -- check if the 'PreparedSource' was previously closed. -- -- Since 0.0.0 bsourceClose :: Resource m => BufferedSource m a -> ResourceT m () bsourceClose (BufferedSource src bufRef) = do buf <- readRef bufRef case buf of OpenEmpty -> sourceClose src OpenFull _ -> sourceClose src ClosedEmpty -> return () ClosedFull _ -> return ()