{-# OPTIONS_HADDOCK not-home #-} {-# OPTIONS_GHC -O2 #-} -- necessary to avoid some space leaks {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE UndecidableInstances #-} {-# LANGUAGE RankNTypes #-} module Data.Conduit.Internal ( -- * Types Pipe (..) , Source , Sink , Conduit , Finalize (..) -- * Functions , pipeClose , pipe , pipeResume , runPipe , sinkToPipe , await , yield , hasInput , transPipe , mapOutput , runFinalize , addCleanup ) where import Control.Applicative (Applicative (..), (<$>)) import Control.Monad ((>=>), liftM, ap) import Control.Monad.Trans.Class (MonadTrans (lift)) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Base (MonadBase (liftBase)) import Data.Void (Void, absurd) import Data.Monoid (Monoid (mappend, mempty)) import Control.Monad.Trans.Resource -- | A cleanup action to be performed. -- -- Previously, we just had a plain action. However, most @Pipe@s simply have -- empty cleanup actions, and storing a large set of them wastes memory. But -- having strict fields and distinguishing between pure and impure actions, we -- can keep memory usage constant, and only allocate memory for the actual -- actions we have to track. -- -- Since 0.4.1 data Finalize m r = FinalizePure r | FinalizeM (m r) instance Monad m => Functor (Finalize m) where fmap f (FinalizePure r) = FinalizePure (f r) fmap f (FinalizeM mr) = FinalizeM (liftM f mr) instance Monad m => Applicative (Finalize m) where pure = FinalizePure (<*>) = ap instance Monad m => Monad (Finalize m) where return = FinalizePure FinalizePure x >>= f = f x FinalizeM mx >>= f = FinalizeM $ mx >>= \x -> case f x of FinalizePure y -> return y FinalizeM my -> my instance MonadTrans Finalize where lift = FinalizeM instance MonadThrow m => MonadThrow (Finalize m) where monadThrow = lift . monadThrow instance MonadIO m => MonadIO (Finalize m) where liftIO = lift . liftIO instance MonadResource m => MonadResource (Finalize m) where allocate a = lift . allocate a register = lift . register release = lift . release resourceMask = lift . resourceMask -- | The underlying datatype for all the types in this package. In has four -- type parameters: -- -- * /i/ is the type of values for this @Pipe@'s input stream. -- -- * /o/ is the type of values for this @Pipe@'s output stream. -- -- * /m/ is the underlying monad. -- -- * /r/ is the result type. -- -- Note that /o/ and /r/ are inherently different. /o/ is the type of the -- stream of values this @Pipe@ will produce and send downstream. /r/ is the -- final output of this @Pipe@. -- -- @Pipe@s can be composed via the 'pipe' function. To do so, the output type -- of the left pipe much match the input type of the left pipe, and the result -- type of the left pipe must be unit @()@. This is due to the fact that any -- result produced by the left pipe must be discarded in favor of the result of -- the right pipe. -- -- Since 0.4.0 data Pipe i o m r = -- | Provide new output to be sent downstream. This constructor has three -- fields: the next @Pipe@ to be used, an early-closed function, and the -- output value. HaveOutput (Pipe i o m r) (Finalize m r) o -- | Request more input from upstream. The first field takes a new input -- value and provides a new @Pipe@. The second is for early termination. It -- gives a new @Pipe@ which takes no input from upstream. This allows a -- @Pipe@ to provide a final stream of output values after no more input is -- available from upstream. | NeedInput (i -> Pipe i o m r) (Pipe i o m r) -- | Processing with this @Pipe@ is complete. Provides an optional leftover -- input value and and result. | Done (Maybe i) r -- | Require running of a monadic action to get the next @Pipe@. Second -- field is an early cleanup function. Technically, this second field -- could be skipped, but doing so would require extra operations to be -- performed in some cases. For example, for a @Pipe@ pulling data from a -- file, it may be forced to pull an extra, unneeded chunk before closing -- the @Handle@. | PipeM (m (Pipe i o m r)) (Finalize m r) -- | A @Pipe@ which provides a stream of output values, without consuming any -- input. The input parameter is set to @Void@ to indicate that this @Pipe@ -- takes no input. A @Source@ is not used to produce a final result, and thus -- the result parameter is set to @()@. -- -- Since 0.4.0 type Source m a = Pipe Void a m () -- | A @Pipe@ which consumes a stream of input values and produces a final -- result. It cannot produce any output values, and thus the output parameter -- is set to @Void@. In other words, it is impossible to create a @HaveOutput@ -- constructor for a @Sink@. -- -- Since 0.4.0 type Sink i m r = Pipe i Void m r -- | A @Pipe@ which consumes a stream of input values and produces a stream of -- output values. It does not produce a result value, and thus the result -- parameter is set to @()@. -- -- Since 0.4.0 type Conduit i m o = Pipe i o m () -- | Perform any close actions available for the given @Pipe@. -- -- Since 0.4.0 pipeClose :: Monad m => Pipe i o m r -> Finalize m r pipeClose (HaveOutput _ c _) = c pipeClose (NeedInput _ p) = pipeClose p pipeClose (Done _ r) = FinalizePure r pipeClose (PipeM _ c) = c pipePush :: Monad m => i -> Pipe i o m r -> Pipe i o m r pipePush i (HaveOutput p c o) = HaveOutput (pipePush i p) c o pipePush i (NeedInput p _) = p i pipePush i (Done _ r) = Done (Just i) r pipePush i (PipeM mp c) = PipeM (pipePush i `liftM` mp) c instance Monad m => Functor (Pipe i o m) where fmap f (HaveOutput p c o) = HaveOutput (f <$> p) (f `liftM` c) o fmap f (NeedInput p c) = NeedInput (fmap f . p) (f <$> c) fmap f (Done l r) = Done l (f r) fmap f (PipeM mp mr) = PipeM ((fmap f) `liftM` mp) (f `liftM` mr) instance Monad m => Applicative (Pipe i o m) where pure = Done Nothing Done Nothing f <*> px = f <$> px Done (Just i) f <*> px = pipePush i $ f <$> px HaveOutput p c o <*> px = HaveOutput (p <*> px) (c `ap` pipeClose px) o NeedInput p c <*> px = NeedInput (\i -> p i <*> px) (c <*> px) PipeM mp c <*> px = PipeM ((<*> px) `liftM` mp) (c `ap` pipeClose px) instance Monad m => Monad (Pipe i o m) where return = Done Nothing Done Nothing x >>= fp = fp x Done (Just i) x >>= fp = pipePush i $ fp x HaveOutput p c o >>= fp = HaveOutput (p >>= fp) (c >>= pipeClose . fp) o NeedInput p c >>= fp = NeedInput (p >=> fp) (c >>= fp) PipeM mp c >>= fp = PipeM ((>>= fp) `liftM` mp) (c >>= pipeClose . fp) instance MonadBase base m => MonadBase base (Pipe i o m) where liftBase = lift . liftBase instance MonadTrans (Pipe i o) where lift mr = PipeM (Done Nothing `liftM` mr) (FinalizeM mr) instance MonadIO m => MonadIO (Pipe i o m) where liftIO = lift . liftIO instance Monad m => Monoid (Pipe i o m ()) where mempty = return () mappend = (>>) -- | Compose a left and right pipe together into a complete pipe. The left pipe -- will be automatically closed when the right pipe finishes, and any leftovers -- from the right pipe will be discarded. -- -- This is in fact a wrapper around 'pipeResume'. This function closes the left -- @Pipe@ returns by @pipeResume@ and returns only the result. -- -- Since 0.4.0 pipe :: Monad m => Pipe a b m () -> Pipe b c m r -> Pipe a c m r pipe l r = pipeResume l r >>= \(l', res) -> lift (runFinalize $ pipeClose l') >> return res -- | Same as 'pipe', but retain both the new left pipe and the leftovers from -- the right pipe. The two components are combined together into a single pipe -- and returned, together with the result of the right pipe. -- -- Note: we're biased towards checking the right side first to avoid pulling -- extra data which is not needed. Doing so could cause data loss. -- -- Since 0.4.0 pipeResume :: Monad m => Pipe a b m () -> Pipe b c m r -> Pipe a c m (Pipe a b m (), r) pipeResume left right = -- We're using a case statement instead of pattern matching in the function -- itself to make the logic explicit. We first check the right pipe, and -- only if the right pipe is asking for more input do we process the left -- pipe. case right of -- Right pipe is done, grab leftovers and the left pipe Done leftoverr r -> -- Get any leftovers from the left pipe, the current state of the -- left pipe (sans leftovers), and a close action for the left -- pipe. let (leftover, left', leftClose) = case left of Done leftoverl () -> (leftoverl, Done Nothing (), FinalizePure ()) _ -> (Nothing, left, pipeClose left) -- Combine the current state of the left pipe with any leftovers -- from the right pipe. left'' = case leftoverr of Just a -> HaveOutput left' leftClose a Nothing -> left' -- Return the leftovers, the final left pipe state, and the result. in Done leftover (left'', r) -- Right pipe needs to run a monadic action. PipeM mp c -> PipeM (pipeResume left `liftM` mp) (((,) left) `fmap` c) -- Right pipe has some output, provide it downstream and continue. HaveOutput p c o -> HaveOutput (pipeResume left p) (((,) left) `fmap` c) o -- Right pipe needs input, so let's get it NeedInput rp rc -> case left of -- Left pipe has output, right pipe wants it. HaveOutput lp _ a -> pipeResume lp $ rp a -- Left pipe needs more input, ask for it. NeedInput p c -> NeedInput (\a -> pipeResume (p a) right) (do -- There is no more input available, so connect the -- no-more-input record with the right. (left', res) <- pipeResume c right -- Theoretically, we could return the left' value as -- the first element in the tuple. However, it is not -- recommended to give input to a pipe after it has -- been told there is no more input. Instead, we close -- the pipe and return mempty in its place. lift $ runFinalize $ pipeClose left' return (mempty, res) ) -- Left pipe is done, right pipe needs input. In such a case, -- tell the right pipe there is no more input, and eventually -- replace its leftovers with the left pipe's leftover. Done l () -> ((,) mempty) `liftM` replaceLeftover l rc -- Left pipe needs to run a monadic action. PipeM mp c -> PipeM ((`pipeResume` right) `liftM` mp) (fmap ((,) mempty) $ combineFinalize c $ pipeClose right) -- | A minor optimization on @>>@ which does not cause any allocations for the -- common case of missing left actions. -- -- Since 0.4.1 combineFinalize :: Monad m => Finalize m () -> Finalize m r -> Finalize m r combineFinalize (FinalizePure ()) f = f combineFinalize (FinalizeM x) (FinalizeM y) = FinalizeM $ x >> y combineFinalize (FinalizeM x) (FinalizePure y) = FinalizeM $ x >> return y replaceLeftover :: Monad m => Maybe i -> Pipe i' o m r -> Pipe i o m r replaceLeftover l (Done _ r) = Done l r replaceLeftover l (HaveOutput p c o) = HaveOutput (replaceLeftover l p) c o -- This function is only called on pipes when there is no more input available. -- Therefore, we can ignore the push record. replaceLeftover l (NeedInput _ c) = replaceLeftover l c replaceLeftover l (PipeM mp c) = PipeM (replaceLeftover l `liftM` mp) c -- | Run a complete pipeline until processing completes. -- -- Since 0.4.0 runPipe :: Monad m => Pipe Void Void m r -> m r runPipe (HaveOutput _ c _) = runFinalize c runPipe (NeedInput _ c) = runPipe c runPipe (Done _ r) = return r runPipe (PipeM mp _) = mp >>= runPipe -- | Perform any necessary finalization actions. -- -- Since 0.4.1 runFinalize :: Monad m => Finalize m r -> m r runFinalize (FinalizePure r) = return r runFinalize (FinalizeM mr) = mr -- | Send a single output value downstream. -- -- Since 0.4.0 yield :: Monad m => o -> Pipe i o m () yield = HaveOutput (Done Nothing ()) (FinalizePure ()) -- | Wait for a single input value from upstream, and remove it from the -- stream. Returns @Nothing@ if no more data is available. -- -- Since 0.4.0 await :: Pipe i o m (Maybe i) await = NeedInput (Done Nothing . Just) (Done Nothing Nothing) -- | Check if input is available from upstream. Will not remove the data from -- the stream. -- -- Since 0.4.0 hasInput :: Pipe i o m Bool hasInput = NeedInput (\i -> Done (Just i) True) (Done Nothing False) -- | A @Sink@ has a @Void@ type parameter for the output, which makes it -- difficult to compose with @Source@s and @Conduit@s. This function replaces -- that parameter with a free variable. This function is essentially @id@; it -- only modifies the types, not the actions performed. -- -- Since 0.4.0 sinkToPipe :: Monad m => Sink i m r -> Pipe i o m r sinkToPipe (HaveOutput _ _ o) = absurd o sinkToPipe (NeedInput p c) = NeedInput (sinkToPipe . p) (sinkToPipe c) sinkToPipe (Done i r) = Done i r sinkToPipe (PipeM mp c) = PipeM (liftM sinkToPipe mp) c -- | Transform the monad that a @Pipe@ lives in. -- -- Since 0.4.0 transPipe :: Monad m => (forall a. m a -> n a) -> Pipe i o m r -> Pipe i o n r transPipe f (HaveOutput p c o) = HaveOutput (transPipe f p) (transFinalize f c) o transPipe f (NeedInput p c) = NeedInput (transPipe f . p) (transPipe f c) transPipe _ (Done i r) = Done i r transPipe f (PipeM mp c) = PipeM (f $ liftM (transPipe f) mp) (transFinalize f c) transFinalize :: (forall a. m a -> n a) -> Finalize m r -> Finalize n r transFinalize _ (FinalizePure r) = FinalizePure r transFinalize f (FinalizeM mr) = FinalizeM $ f mr -- | Apply a function to all the output values of a `Pipe`. -- -- This mimics the behavior of `fmap` for a `Source` and `Conduit` in pre-0.4 -- days. -- -- Since 0.4.1 mapOutput :: Monad m => (o1 -> o2) -> Pipe i o1 m r -> Pipe i o2 m r mapOutput f (HaveOutput p c o) = HaveOutput (mapOutput f p) c (f o) mapOutput f (NeedInput p c) = NeedInput (mapOutput f . p) (mapOutput f c) mapOutput _ (Done i r) = Done i r mapOutput f (PipeM mp c) = PipeM (liftM (mapOutput f) mp) c -- | Add some code to be run when the given @Pipe@ cleans up. -- -- Since 0.4.1 addCleanup :: Monad m => (Bool -> m ()) -- ^ @True@ if @Pipe@ ran to completion, @False@ for early termination. -> Pipe i o m r -> Pipe i o m r addCleanup cleanup (Done leftover r) = PipeM (cleanup True >> return (Done leftover r)) (lift (cleanup True) >> return r) addCleanup cleanup (HaveOutput src close x) = HaveOutput (addCleanup cleanup src) (lift (cleanup False) >> close) x addCleanup cleanup (PipeM msrc close) = PipeM (liftM (addCleanup cleanup) msrc) (lift (cleanup False) >> close) addCleanup cleanup (NeedInput p c) = NeedInput (addCleanup cleanup . p) (addCleanup cleanup c)