{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE UndecidableInstances #-} {-# LANGUAGE DeriveDataTypeable #-} -- | The main module, exporting types, utility functions, and fuse and connect -- operators. module Data.Conduit ( -- * Types -- | The three core types to this package are 'Source' (the data -- producer), 'Sink' (the data consumer), and 'Conduit' (the data -- transformer). For all three types, a result will provide the next -- value to be used. For example, the @Open@ constructor includes a new -- @Source@ in it. This leads to the main invariant for all conduit code: -- these three types may /never/ be reused. While some specific values -- may work fine with reuse, the result is generally unpredictable and -- should no be relied upon. -- -- The user-facing API provided by the connect and fuse operators -- automatically addresses the low level details of pulling, pushing, and -- closing, and there should rarely be need to perform these actions in -- user code. -- ** Source module Data.Conduit.Types.Source -- *** Buffering , BufferedSource , bufferSource , unbufferSource , bsourceClose -- *** Unifying , IsSource -- ** Sink , module Data.Conduit.Types.Sink -- ** Conduit , module Data.Conduit.Types.Conduit , -- * Connect/fuse operators ($$) , ($=) , (=$) , (=$=) -- * Utility functions -- ** Source , module Data.Conduit.Util.Source -- ** Sink , module Data.Conduit.Util.Sink -- ** Conduit , module Data.Conduit.Util.Conduit -- * Flushing , Flush (..) -- * Convenience re-exports , ResourceT , MonadResource , MonadThrow (..) , MonadUnsafeIO (..) , runResourceT ) where import Control.Monad (liftM) import Control.Monad.Trans.Resource import Control.Monad.IO.Class (MonadIO (liftIO)) import qualified Data.IORef as I import Data.Conduit.Types.Source import Data.Conduit.Util.Source import Data.Conduit.Types.Sink import Data.Conduit.Util.Sink import Data.Conduit.Types.Conduit import Data.Conduit.Util.Conduit -- $typeOverview infixr 0 $$ -- | The connect operator, which pulls data from a source and pushes to a sink. -- There are two ways this process can terminate: -- -- 1. If the @Sink@ is a @Done@ constructor, the @Source@ is closed. -- -- 2. If the @Source@ is a @Closed@ constructor, the @Sink@ is closed. -- -- This function will automatically close any @Source@s, but will not close any -- @BufferedSource@s, allowing them to be reused. Also, leftover data will be -- discarded when connecting a @Source@, but will be buffered when using a -- @BufferedSource@. -- -- Since 0.3.0 ($$) :: IsSource src m => src m a -> Sink a m b -> m b ($$) = connect {-# INLINE ($$) #-} -- | A typeclass allowing us to unify operators for 'Source' and -- 'BufferedSource'. -- -- Since 0.3.0 class IsSource src m where connect :: src m a -> Sink a m b -> m b fuseLeft :: src m a -> Conduit a m b -> Source m b instance Monad m => IsSource Source m where connect = normalConnect {-# INLINE connect #-} fuseLeft = normalFuseLeft {-# INLINE fuseLeft #-} instance MonadIO m => IsSource BufferedSource m where connect = bufferedConnect {-# INLINE connect #-} fuseLeft = bufferedFuseLeft {-# INLINE fuseLeft #-} normalConnect :: Monad m => Source m a -> Sink a m b -> m b -- @Sink@ cannot handle any more input, close the @Source@ (regardless of its -- state) and discard leftovers. normalConnect src (Done _leftover output) = sourceClose src >> return output -- Run the @Sink@'s monadic action and try again. normalConnect src (SinkM msink) = msink >>= normalConnect src -- Run the @Source@'s monadic action and try again. normalConnect (SourceM msrc _) sink@Processing{} = msrc >>= flip normalConnect sink -- No more input available from @Source@, close the @Sink@. normalConnect Closed (Processing _ close) = close -- More input available, and the @Sink@ wants it: plug it in and keep going. normalConnect (Open src _ a) (Processing push _) = normalConnect src $ push a data FuseLeftState srcState input m output = FLClosed | FLOpen srcState (ConduitPush input m output) (ConduitClose m output) | FLHaveOutput srcState (Conduit input m output) (m ()) infixl 1 $= -- | Left fuse, combining a source and a conduit together into a new source. -- -- Any @Source@ passed in will be automatically closed, while a -- @BufferedSource@ will be left open. Leftover input will be discarded for a -- @Source@, and buffered for a @BufferedSource@. -- -- Since 0.3.0 ($=) :: IsSource src m => src m a -> Conduit a m b -> Source m b ($=) = fuseLeft {-# INLINE ($=) #-} normalFuseLeft :: Monad m => Source m a -> Conduit a m b -> Source m b -- No more input, close the @Conduit@. normalFuseLeft Closed (NeedInput _ close) = close normalFuseLeft Closed (Finished _) = Closed -- @Conduit@ is done, discard leftovers and close the @Source@. normalFuseLeft src (Finished _) = SourceM (sourceClose src >> return Closed) (sourceClose src) -- @Conduit@ has some output, return it and keep going. normalFuseLeft src (HaveOutput p c x) = Open (normalFuseLeft src p) (sourceClose src >> c) x -- @Source@ provided input, and @Conduit@ wants it. Pipe it through and keep going. normalFuseLeft (Open src _ a) (NeedInput push _) = normalFuseLeft src $ push a -- Need to perform a monadic action to get the next @Conduit@. normalFuseLeft src (ConduitM mcon conclose) = SourceM (liftM (normalFuseLeft src) mcon) (conclose >> sourceClose src) -- Need to perform a monadic action to get the next @Source@. normalFuseLeft (SourceM msrc closeS) conduit@(NeedInput _ closeC) = SourceM (liftM (flip normalFuseLeft conduit) msrc) $ do closeS sourceClose closeC infixr 0 =$ -- | Right fuse, combining a conduit and a sink together into a new sink. -- -- Any leftover data returns from the @Sink@ will be discarded. -- -- Since 0.3.0 (=$) :: Monad m => Conduit a m b -> Sink b m c -> Sink a m c -- @Sink@ is complete, discard leftovers, close the @Conduit@ and return the -- output. conduit =$ Done _leftover output = SinkM $ conduitClose conduit >> return (Done Nothing output) -- Need to perform a monadic action to get the next @Sink@. conduit =$ SinkM msink = SinkM (liftM (conduit =$) msink) -- Need more input for the @Conduit@, so ask for it. NeedInput pushO closeO =$ sink = Processing (\input -> pushO input =$ sink) (closeO $$ sink) -- @Conduit@ can provide no more input to @Sink@. Close the @Sink@ and return -- the leftovers from the @Conduit@. Finished mleftover =$ Processing _ close = SinkM $ liftM (Done mleftover) close -- Perform a monadic action to get the next @Conduit@. ConduitM mcon _ =$ sink@Processing{} = SinkM $ liftM (=$ sink) mcon -- @Conduit@ is providing input for the @Sink@, so use it and keep going. HaveOutput con _ input =$ Processing pushI _ = con =$ pushI input infixr 0 =$= -- | Middle fuse, combining two conduits together into a new conduit. -- -- Any leftovers provided by the inner @Conduit@ will be discarded. -- -- Since 0.3.0 (=$=) :: Monad m => Conduit a m b -> Conduit b m c -> Conduit a m c -- No more input from outer conduit, and inner wants more input. Close the -- inner conduit and convert its source into a conduit. Finished mleftover =$= NeedInput _ closeI = go closeI where go Closed = Finished mleftover go (Open src close x) = HaveOutput (go src) close x go (SourceM msrc close) = ConduitM (liftM go msrc) close -- No more input from outer conduit, and inner wants no more input anyway. -- Discard the leftovers from the inner. Finished mleftover =$= Finished _ = Finished mleftover -- Provide more output from the inner conduit. conO =$= HaveOutput con close x = HaveOutput (conO =$= con) close x -- Perform a monadic action to get the next outer conduit. ConduitM mcon close =$= conI = ConduitM (liftM (=$= conI) mcon) (close >> conduitClose conI) -- Ask for more data for the outer conduit. Note that this clause comes before -- the inner conduit ConduitM clause, so that we only run that action as -- necessary. NeedInput pushO closeO =$= conI = NeedInput (\input -> pushO input =$= conI) (closeO $= conI) -- Perform a monadic action to get the next inner conduit. conO =$= ConduitM mconI close = ConduitM (liftM (conO =$=) mconI) (close >> conduitClose conO) -- Pipe output from outer conduit to inner conduit and keep going. HaveOutput conO _ inputI =$= NeedInput pushI _ = conO =$= pushI inputI -- Discard output from outer conduit, and discard leftovers from inner conduit. -- Close the outer conduit. HaveOutput _ close _ =$= Finished _ = ConduitM (close >> return (Finished Nothing)) close -- | When actually interacting with @Source@s, we sometimes want to be able to -- buffer the output, in case any intermediate steps return leftover data. A -- @BufferedSource@ allows for such buffering. -- -- A @BufferedSource@, unlike a @Source@, is resumable, meaning it can be -- passed to multiple @Sink@s without restarting. Therefore, a @BufferedSource@ -- relaxes the main invariant of this package: the same value may be used -- multiple times. -- -- The intention of a @BufferedSource@ is to be used internally by an -- application or library, not to be part of its user-facing API. For example, -- the Warp webserver uses a @BufferedSource@ internally for parsing the -- request headers, but then passes a normal @Source@ to the web application -- for reading the request body. -- -- One caveat: while the types will allow you to use the buffered source in -- multiple threads, there is no guarantee that all @BufferedSource@s will -- handle this correctly. -- -- Since 0.3.0 data BufferedSource m a = BufferedSource (I.IORef (BSState m a)) data BSState m a = ClosedEmpty | OpenEmpty (Source m a) | ClosedFull a | OpenFull (Source m a) a -- | Places the given @Source@ and a buffer into a mutable variable. Note that -- you should manually call 'bsourceClose' when the 'BufferedSource' is no -- longer in use. -- -- Since 0.3.0 bufferSource :: MonadIO m => Source m a -> m (BufferedSource m a) bufferSource src = liftM BufferedSource $ liftIO $ I.newIORef $ OpenEmpty src -- | Turn a 'BufferedSource' into a 'Source'. Note that in general this will -- mean your original 'BufferedSource' will be closed. Additionally, all -- leftover data from usage of the returned @Source@ will be discarded. In -- other words: this is a no-going-back move. -- -- Note: @bufferSource@ . @unbufferSource@ is /not/ the identity function. -- -- Since 0.3.0 unbufferSource :: MonadIO m => BufferedSource m a -> Source m a unbufferSource (BufferedSource bs) = SourceM msrc (msrc >>= sourceClose) where msrc = do buf <- liftIO $ I.readIORef bs case buf of OpenEmpty src -> return src OpenFull src a -> return $ Open src (sourceClose src) a ClosedEmpty -> return Closed ClosedFull a -> return $ Open Closed (return ()) a bufferedConnect :: MonadIO m => BufferedSource m a -> Sink a m b -> m b bufferedConnect _ (Done Nothing output) = return output bufferedConnect _ (Done Just{} _) = error "Invariant violated: sink returned leftover without input" bufferedConnect bsrc (SinkM msink) = msink >>= bufferedConnect bsrc bufferedConnect (BufferedSource bs) (Processing push0 close0) = do bsState <- liftIO $ I.readIORef bs case bsState of ClosedEmpty -> close0 OpenEmpty src -> connect' src push0 close0 ClosedFull a -> onRes Nothing $ push0 a OpenFull src a -> onRes (Just src) $ push0 a where connect' Closed _ close = do liftIO $ I.writeIORef bs ClosedEmpty close connect' (Open src _ x) push _ = onRes (Just src) $ push x connect' (SourceM msrc _) push close = msrc >>= \src -> connect' src push close onRes msrc (Done mleftover res) = do let state = case (msrc, mleftover) of (Nothing, Nothing) -> ClosedEmpty (Just src, Nothing) -> OpenEmpty src (Nothing, Just leftover) -> ClosedFull leftover (Just src, Just leftover) -> OpenFull src leftover liftIO $ I.writeIORef bs state return res onRes Nothing (Processing _ close) = do liftIO $ I.writeIORef bs ClosedEmpty close onRes (Just src) (Processing push close) = connect' src push close onRes msrc (SinkM msink) = msink >>= onRes msrc bufferedFuseLeft :: MonadIO m => BufferedSource m a -> Conduit a m b -> Source m b bufferedFuseLeft bsrc (ConduitM mcon close) = SourceM (liftM (bufferedFuseLeft bsrc) mcon) close bufferedFuseLeft _ (Finished _) = Closed bufferedFuseLeft bsrc (HaveOutput next close x) = Open (bufferedFuseLeft bsrc next) close x bufferedFuseLeft bsrc (NeedInput push0 close0) = SourceM (pullF $ FLOpen () push0 close0) (sourceClose close0) where mkSrc state = SourceM (pullF state) (closeF state) pullF state' = case state' of FLClosed -> return Closed FLHaveOutput () pull _ -> goRes pull FLOpen () push close -> do mres <- bsourcePull bsrc case mres of Nothing -> return close Just input -> goRes $ push input goRes (Finished leftover) = do bsourceUnpull bsrc leftover return Closed goRes (HaveOutput pull close' x) = let state = FLHaveOutput () pull close' in return $ Open (mkSrc state) (closeF state) x goRes (NeedInput pushI closeI) = pullF (FLOpen () pushI closeI) goRes (ConduitM mcon _) = mcon >>= goRes closeF state = do -- Normally we don't have to worry about double closing, as the -- invariant of a source is that close is never called twice. However, -- here, if the Conduit returned Finished with some data, the overall -- Source will return an Open while the Conduit will be Closed. -- Therefore, we have to do a check. case state of FLClosed -> return () FLOpen () _ close -> do () <- sourceClose close return () FLHaveOutput () _ close -> close bsourcePull :: MonadIO m => BufferedSource m a -> m (Maybe a) bsourcePull (BufferedSource bs) = liftIO (I.readIORef bs) >>= goBuf where goBuf (OpenEmpty Closed) = liftIO $ I.writeIORef bs ClosedEmpty >> return Nothing goBuf (OpenEmpty (Open src _ a)) = do liftIO $ I.writeIORef bs $ OpenEmpty src return $ Just a goBuf (OpenEmpty (SourceM msrc _)) = msrc >>= goBuf . OpenEmpty goBuf ClosedEmpty = return Nothing goBuf (OpenFull src a) = do liftIO $ I.writeIORef bs (OpenEmpty src) return $ Just a goBuf (ClosedFull a) = do liftIO $ I.writeIORef bs ClosedEmpty return $ Just a bsourceUnpull :: MonadIO m => BufferedSource m a -> Maybe a -> m () bsourceUnpull _ Nothing = return () bsourceUnpull (BufferedSource ref) (Just a) = do buf <- liftIO $ I.readIORef ref case buf of OpenEmpty src -> liftIO $ I.writeIORef ref (OpenFull src a) ClosedEmpty -> liftIO $ I.writeIORef ref (ClosedFull a) _ -> error $ "Invariant violated: bsourceUnpull called on full data" -- | Close the underlying 'Source' for the given 'BufferedSource'. Note -- that this function can safely be called multiple times, as it will first -- check if the 'Source' was previously closed. -- -- Since 0.3.0 bsourceClose :: MonadIO m => BufferedSource m a -> m () bsourceClose (BufferedSource ref) = do buf <- liftIO $ I.readIORef ref case buf of OpenEmpty src -> sourceClose src OpenFull src _ -> sourceClose src ClosedEmpty -> return () ClosedFull _ -> return () -- | Provide for a stream of data that can be flushed. -- -- A number of @Conduit@s (e.g., zlib compression) need the ability to flush -- the stream at some point. This provides a single wrapper datatype to be used -- in all such circumstances. -- -- Since 0.3.0 data Flush a = Chunk a | Flush deriving (Show, Eq, Ord) instance Functor Flush where fmap _ Flush = Flush fmap f (Chunk a) = Chunk (f a)