{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE UndecidableInstances #-} {-# LANGUAGE DeriveDataTypeable #-} -- | The main module, exporting types, utility functions, and fuse and connect -- operators. module Data.Conduit ( -- * Types -- | The three core types to this package are 'Source' (the data -- producer), 'Sink' (the data consumer), and 'Conduit' (the data -- transformer). For all three types, a result will provide the next -- value to be used. For example, the @Open@ constructor includes a new -- @Source@ in it. This leads to the main invariant for all conduit code: -- these three types may /never/ be reused. While some specific values -- may work fine with reuse, the result is generally unpredictable and -- should no be relied upon. -- -- The user-facing API provided by the connect and fuse operators -- automatically addresses the low level details of pulling, pushing, and -- closing, and there should rarely be need to perform these actions in -- user code. -- ** Source module Data.Conduit.Types.Source -- *** Buffering , BufferedSource , bufferSource , unbufferSource , bsourceClose -- *** Unifying , IsSource -- ** Sink , module Data.Conduit.Types.Sink -- ** Conduit , module Data.Conduit.Types.Conduit , -- * Connect/fuse operators ($$) , ($=) , (=$) , (=$=) -- * Utility functions -- ** Source , module Data.Conduit.Util.Source -- ** Sink , module Data.Conduit.Util.Sink -- ** Conduit , module Data.Conduit.Util.Conduit -- * Flushing , Flush (..) -- * Convenience re-exports , ResourceT , Resource (..) , ResourceIO , ResourceUnsafeIO , runResourceT , ResourceThrow (..) ) where import Control.Applicative ((<$>)) import Control.Monad (liftM) import Control.Monad.Trans.Resource import Data.Conduit.Types.Source import Data.Conduit.Util.Source import Data.Conduit.Types.Sink import Data.Conduit.Util.Sink import Data.Conduit.Types.Conduit import Data.Conduit.Util.Conduit -- $typeOverview infixr 0 $$ -- | The connect operator, which pulls data from a source and pushes to a sink. -- There are three ways this process can terminate: -- -- 1. In the case of a @SinkNoData@ constructor, the source is not opened at -- all, and the output value is returned immediately. -- -- 2. The sink returns @Done@. If the input was a @BufferedSource@, any -- leftover input is put in the buffer. For a normal @Source@, the leftover -- value is discarded, and the source is closed. -- -- 3. The source return @Closed@, in which case the sink is closed. -- -- Note that this function will automatically close any @Source@s, but will not -- close any @BufferedSource@s, allowing them to be reused. -- -- Since 0.2.0 ($$) :: (IsSource src, Resource m) => src m a -> Sink a m b -> ResourceT m b ($$) = connect {-# INLINE ($$) #-} -- | A typeclass allowing us to unify operators for 'Source' and -- 'BufferedSource'. -- -- Since 0.2.0 class IsSource src where connect :: Resource m => src m a -> Sink a m b -> ResourceT m b fuseLeft :: Resource m => src m a -> Conduit a m b -> Source m b instance IsSource Source where connect = normalConnect {-# INLINE connect #-} fuseLeft = normalFuseLeft {-# INLINE fuseLeft #-} instance IsSource BufferedSource where connect = bufferedConnect {-# INLINE connect #-} fuseLeft = bufferedFuseLeft {-# INLINE fuseLeft #-} normalConnect :: Resource m => Source m a -> Sink a m b -> ResourceT m b normalConnect _ (SinkNoData output) = return output normalConnect src0 (SinkLift msink) = msink >>= normalConnect src0 normalConnect src0 (SinkData push0 close0) = connect' src0 push0 close0 where connect' src push close = do res <- sourcePull src case res of Closed -> do res' <- close return res' Open src' a -> do mres <- push a case mres of Done _leftover res' -> do sourceClose src' return res' Processing push' close' -> connect' src' push' close' data FuseLeftState src conduit output = FLClosed [output] | FLOpen src conduit [output] infixl 1 $= -- | Left fuse, combining a source and a conduit together into a new source. -- -- Note that any @Source@ passed in will be automatically closed, while a -- @BufferedSource@ will be left open. -- -- Since 0.2.0 ($=) :: (IsSource src, Resource m) => src m a -> Conduit a m b -> Source m b ($=) = fuseLeft {-# INLINE ($=) #-} normalFuseLeft :: Resource m => Source m a -> Conduit a m b -> Source m b normalFuseLeft src0 conduit0 = Source { sourcePull = pull $ FLOpen src0 conduit0 [] , sourceClose = return () } where mkSrc state = Source (pull state) (close state) pull state' = case state' of FLClosed [] -> return Closed FLClosed (x:xs) -> return $ Open (mkSrc (FLClosed xs)) x FLOpen src conduit (x:xs) -> return $ Open (mkSrc (FLOpen src conduit xs)) x FLOpen src conduit [] -> do mres <- sourcePull src case mres of Closed -> do res <- conduitClose conduit case res of [] -> return Closed x:xs -> return $ Open (mkSrc (FLClosed xs)) x Open src'' input -> do res' <- conduitPush conduit input case res' of Producing conduit' [] -> pull $ FLOpen src'' conduit' [] Producing conduit' (x:xs) -> return $ Open (mkSrc (FLOpen src'' conduit' xs)) x Finished _leftover output -> do sourceClose src'' case output of [] -> return Closed x:xs -> return $ Open (mkSrc (FLClosed xs)) x close state = do -- See comment on bufferedFuseLeft for why we need to have the -- following check case state of FLClosed _ -> return () FLOpen src' (Conduit _ closeC) _ -> do _ignored <- closeC sourceClose src' infixr 0 =$ -- | Right fuse, combining a conduit and a sink together into a new sink. -- -- Since 0.2.0 (=$) :: Resource m => Conduit a m b -> Sink b m c -> Sink a m c _ =$ SinkNoData res = SinkNoData res conduit =$ SinkLift msink = SinkLift (liftM (conduit =$) msink) conduitOrig =$ SinkData pushI0 closeI0 = SinkData { sinkPush = push pushI0 closeI0 conduitOrig , sinkClose = close pushI0 closeI0 conduitOrig } where push pushI closeI conduit0 cinput = do res <- conduitPush conduit0 cinput case res of Producing conduit' sinput -> do let loop p c [] = return (Processing (push p c conduit') (close p c conduit')) loop p _ (i:is) = do mres <- p i case mres of Processing p' c' -> loop p' c' is Done _sleftover res' -> do _ <- conduitClose conduit' return $ Done Nothing res' loop pushI closeI sinput Finished cleftover sinput -> do let loop _ c [] = c loop p _ (i:is) = do mres <- p i case mres of Processing p' c' -> loop p' c' is Done _sleftover res' -> return res' res' <- loop pushI closeI sinput return $ Done cleftover res' close pushI closeI conduit = do sinput <- conduitClose conduit let loop _ c [] = c loop p _ (i:is) = do mres <- p i case mres of Processing p' c' -> loop p' c' is Done _sleftover res' -> return res' loop pushI closeI sinput infixr 0 =$= -- | Middle fuse, combining two conduits together into a new conduit. -- -- Since 0.2.0 (=$=) :: Resource m => Conduit a m b -> Conduit b m c -> Conduit a m c outerOrig =$= innerOrig = Conduit (pushF outerOrig innerOrig) (closeF outerOrig innerOrig) where pushF outer0 inner0 inputO = do res <- conduitPush outer0 inputO case res of Producing outer inputI -> do let loop inner [] front = return $ Producing (Conduit (pushF outer inner) (closeF outer inner)) (front []) loop inner (i:is) front = do resI <- conduitPush inner i case resI of Producing conduit c -> loop conduit is (front . (c ++)) Finished _leftover c -> do _ <- conduitClose outer return $ Finished Nothing $ front c loop inner0 inputI id Finished leftoverO inputI -> do c <- conduitPushClose inner0 inputI return $ Finished leftoverO c closeF outer inner = do b <- conduitClose outer c <- conduitPushClose inner b return c -- | Push some data to a conduit, then close it if necessary. conduitPushClose :: Monad m => Conduit a m b -> [a] -> ResourceT m [b] conduitPushClose c [] = conduitClose c conduitPushClose c (input:rest) = do res <- conduitPush c input case res of Finished _ b -> return b Producing conduit b -> do b' <- conduitPushClose conduit rest return $ b ++ b' -- | When actually interacting with @Source@s, we sometimes want to be able to -- buffer the output, in case any intermediate steps return leftover data. A -- @BufferedSource@ allows for such buffering. -- -- A @BufferedSource@, unlike a @Source@, is resumable, meaning it can be -- passed to multiple @Sink@s without restarting. Therefore, a @BufferedSource@ -- relaxes the main invariant of this package: the same value may be used -- multiple times. -- -- The intention of a @BufferedSource@ is to be used internally by an -- application or library, not to be part of its user-facing API. For example, -- the Warp webserver uses a @BufferedSource@ internally for parsing the -- request headers, but then passes a normal @Source@ to the web application -- for reading the request body. -- -- One caveat: while the types will allow you to use the buffered source in -- multiple threads, there is no guarantee that all @BufferedSource@s will -- handle this correctly. -- -- Since 0.2.0 data BufferedSource m a = BufferedSource (Ref (Base m) (BSState m a)) data BSState m a = ClosedEmpty | OpenEmpty (Source m a) | ClosedFull a | OpenFull (Source m a) a -- | Places the given @Source@ and a buffer into a mutable variable. Note that -- you should manually call 'bsourceClose' when the 'BufferedSource' is no -- longer in use. -- -- Since 0.2.0 bufferSource :: Resource m => Source m a -> ResourceT m (BufferedSource m a) bufferSource src = BufferedSource <$> newRef (OpenEmpty src) -- | Turn a 'BufferedSource' into a 'Source'. Note that in general this will -- mean your original 'BufferedSource' will be closed. Additionally, all -- leftover data from usage of the returned @Source@ will be discarded. In -- other words: this is a no-going-back move. -- -- Note: @bufferSource@ . @unbufferSource@ is /not/ the identity function. -- -- Since 0.2.0 unbufferSource :: Resource m => BufferedSource m a -> Source m a unbufferSource (BufferedSource bs) = Source { sourcePull = msrc >>= sourcePull , sourceClose = msrc >>= sourceClose } where msrc = do buf <- readRef bs case buf of OpenEmpty src -> return src OpenFull src a -> return Source { sourcePull = return $ Open src a , sourceClose = sourceClose src } ClosedEmpty -> return Source -- Note: we could put some invariant checking in here if we wanted { sourcePull = return Closed , sourceClose = return () } ClosedFull a -> return Source { sourcePull = return $ Open (Source (return Closed) (return ())) a , sourceClose = return () } bufferedConnect :: Resource m => BufferedSource m a -> Sink a m b -> ResourceT m b bufferedConnect _ (SinkNoData output) = return output bufferedConnect bsrc (SinkLift msink) = msink >>= bufferedConnect bsrc bufferedConnect (BufferedSource bs) (SinkData push0 close0) = do bsState <- readRef bs case bsState of ClosedEmpty -> close0 OpenEmpty src -> connect' src push0 close0 ClosedFull a -> do res <- push0 a case res of Done mleftover res' -> do writeRef bs $ maybe ClosedEmpty ClosedFull mleftover return res' Processing _ close' -> do writeRef bs ClosedEmpty close' OpenFull src a -> push0 a >>= onRes src where connect' src push close = do res <- sourcePull src case res of Closed -> do writeRef bs ClosedEmpty res' <- close return res' Open src' a -> push a >>= onRes src' onRes src (Done mleftover res) = do writeRef bs $ maybe (OpenEmpty src) (OpenFull src) mleftover return res onRes src (Processing push close) = connect' src push close bufferedFuseLeft :: Resource m => BufferedSource m a -> Conduit a m b -> Source m b bufferedFuseLeft bsrc conduit0 = Source { sourcePull = pullF $ FLOpen () conduit0 [] -- still open, no buffer , sourceClose = return () } where mkSrc state = Source (pullF state) (closeF state) pullF state' = case state' of FLClosed [] -> return Closed FLClosed (x:xs) -> return $ Open (mkSrc (FLClosed xs)) x FLOpen () conduit (x:xs) -> return $ Open (mkSrc (FLOpen () conduit xs)) x FLOpen () conduit [] -> do mres <- bsourcePull bsrc case mres of Nothing -> do res <- conduitClose conduit case res of [] -> return Closed x:xs -> return $ Open (mkSrc (FLClosed xs)) x Just input -> do res' <- conduitPush conduit input case res' of Producing conduit' [] -> pullF (FLOpen () conduit' []) Producing conduit' (x:xs) -> return $ Open (mkSrc (FLOpen () conduit' xs)) x Finished leftover output -> do bsourceUnpull bsrc leftover case output of [] -> return Closed x:xs -> return $ Open (mkSrc (FLClosed xs)) x closeF state = do -- Normally we don't have to worry about double closing, as the -- invariant of a source is that close is never called twice. However, -- here, if the Conduit returned Finished with some data, the overall -- Source will return an Open while the Conduit will be Closed. -- Therefore, we have to do a check. case state of FLClosed _ -> return () FLOpen () (Conduit _ close) _ -> do _ignored <- close return () bsourcePull :: Resource m => BufferedSource m a -> ResourceT m (Maybe a) bsourcePull (BufferedSource bs) = do buf <- readRef bs case buf of OpenEmpty src -> do res <- sourcePull src case res of Open src' a -> do writeRef bs $ OpenEmpty src' return $ Just a Closed -> writeRef bs ClosedEmpty >> return Nothing ClosedEmpty -> return Nothing OpenFull src a -> do writeRef bs (OpenEmpty src) return $ Just a ClosedFull a -> do writeRef bs ClosedEmpty return $ Just a bsourceUnpull :: Resource m => BufferedSource m a -> Maybe a -> ResourceT m () bsourceUnpull _ Nothing = return () bsourceUnpull (BufferedSource ref) (Just a) = do buf <- readRef ref case buf of OpenEmpty src -> writeRef ref (OpenFull src a) ClosedEmpty -> writeRef ref (ClosedFull a) _ -> error $ "Invariant violated: bsourceUnpull called on full data" -- | Close the underlying 'Source' for the given 'BufferedSource'. Note -- that this function can safely be called multiple times, as it will first -- check if the 'Source' was previously closed. -- -- Since 0.2.0 bsourceClose :: Resource m => BufferedSource m a -> ResourceT m () bsourceClose (BufferedSource ref) = do buf <- readRef ref case buf of OpenEmpty src -> sourceClose src OpenFull src _ -> sourceClose src ClosedEmpty -> return () ClosedFull _ -> return () -- | Provide for a stream of data that can be flushed. -- -- A number of @Conduit@s (e.g., zlib compression) need the ability to flush -- the stream at some point. This provides a single wrapper datatype to be used -- in all such circumstances. -- -- Since 0.2.0 data Flush a = Chunk a | Flush deriving (Show, Eq, Ord) instance Functor Flush where fmap _ Flush = Flush fmap f (Chunk a) = Chunk (f a)