-- Module : Foundation.Conduit.Internal -- License : BSD-style -- Maintainer : Foundation -- Stability : experimental -- Portability : portable -- -- Taken from the conduit package almost verbatim, and -- Copyright (c) 2012 Michael Snoyman -- {-# LANGUAGE CPP #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE Rank2Types #-} {-# LANGUAGE OverloadedStrings #-} module Foundation.Conduit.Internal ( Pipe(..) , Conduit(..) , ZipSink(..) , ResourceT(..) , MonadResource(..) , runResourceT , await , awaitForever , yield , yieldOr , leftover , runConduit , runConduitRes , runConduitPure , fuse , bracketConduit ) where import Basement.Imports hiding (throw) import Foundation.Monad import Foundation.Numerical import Basement.Monad import Control.Monad ((>=>), liftM, void, mapM_, join) import Control.Exception (SomeException, mask_) import Data.IORef (atomicModifyIORef) -- | A pipe producing and consuming values -- -- A basic intuition is that every @Pipe@ produces a stream of /output/ values -- and eventually indicates that this stream is terminated by sending a -- /result/. On the receiving end of a @Pipe@, these become the /input/ and /upstream/ -- parameters. data Pipe leftOver input output upstream monad result = -- | Provide new output to be sent downstream. This constructor has three -- fields: the next @Pipe@ to be used, a finalization function, and the -- output value. Yield (Pipe leftOver input output upstream monad result) (monad ()) output -- | Request more input from upstream. The first field takes a new input -- value and provides a new @Pipe@. The second takes an upstream result -- value, which indicates that upstream is producing no more results. | Await (input -> Pipe leftOver input output upstream monad result) (upstream -> Pipe leftOver input output upstream monad result) -- | Processing with this @Pipe@ is complete, providing the final result. | Done result -- | Require running of a monadic action to get the next @Pipe@. | PipeM (monad (Pipe leftOver input output upstream monad result)) -- | Return leftover input, which should be provided to future operations. | Leftover (Pipe leftOver input output upstream monad result) leftOver instance Applicative m => Functor (Pipe l i o u m) where fmap = (<$>) {-# INLINE fmap #-} instance Applicative m => Applicative (Pipe l i o u m) where pure = Done {-# INLINE pure #-} Yield p c o <*> fa = Yield (p <*> fa) c o Await p c <*> fa = Await (\i -> p i <*> fa) (\o -> c o <*> fa) Done r <*> fa = r <$> fa PipeM mp <*> fa = PipeM ((<*> fa) <$> mp) Leftover p i <*> fa = Leftover (p <*> fa) i {-# INLINE (<*>) #-} instance (Functor m, Monad m) => Monad (Pipe l i o u m) where return = Done {-# INLINE return #-} Yield p c o >>= fp = Yield (p >>= fp) c o Await p c >>= fp = Await (p >=> fp) (c >=> fp) Done x >>= fp = fp x PipeM mp >>= fp = PipeM ((>>= fp) <$> mp) Leftover p i >>= fp = Leftover (p >>= fp) i -- | A component of a conduit pipeline, which takes a stream of -- @input@, produces a stream of @output@, performs actions in the -- underlying @monad@, and produces a value of @result@ when no more -- output data is available. newtype Conduit input output monad result = Conduit { unConduit :: forall a . (result -> Pipe input input output () monad a) -> Pipe input input output () monad a } instance Functor (Conduit i o m) where fmap f (Conduit c) = Conduit $ \resPipe -> c (resPipe . f) instance Applicative (Conduit i o m) where pure x = Conduit ($ x) {-# INLINE pure #-} fab <*> fa = fab >>= \ab -> fa >>= \a -> pure (ab a) {-# INLINE (<*>) #-} instance Monad (Conduit i o m) where return = pure Conduit f >>= g = Conduit $ \h -> f $ \a -> unConduit (g a) h instance MonadTrans (Conduit i o) where lift m = Conduit $ \rest -> PipeM $ liftM rest m instance MonadIO m => MonadIO (Conduit i o m) where liftIO = lift . liftIO instance MonadFailure m => MonadFailure (Conduit i o m) where type Failure (Conduit i o m) = Failure m mFail = lift . mFail instance MonadThrow m => MonadThrow (Conduit i o m) where throw = lift . throw instance MonadCatch m => MonadCatch (Conduit i o m) where catch (Conduit c0) onExc = Conduit $ \rest -> let go (PipeM m) = PipeM $ catch (liftM go m) (return . flip unConduit rest . onExc) go (Done r) = rest r go (Await p c) = Await (go . p) (go . c) go (Yield p m o) = Yield (go p) m o go (Leftover p i) = Leftover (go p) i in go (c0 Done) -- | Await for a value from upstream. await :: Conduit i o m (Maybe i) await = Conduit $ \f -> Await (f . Just) (const (f Nothing)) {-# NOINLINE[1] await #-} await' :: Conduit i o m r -> (i -> Conduit i o m r) -> Conduit i o m r await' f g = Conduit $ \rest -> Await (\i -> unConduit (g i) rest) (const $ unConduit f rest) {-# INLINE await' #-} {-# RULES "conduit: await >>= maybe" [2] forall x y. await >>= maybe x y = await' x y #-} awaitForever :: (input -> Conduit input output monad b) -> Conduit input output monad () awaitForever f = Conduit $ \rest -> let go = Await (\i -> unConduit (f i) (const go)) rest in go -- | Send a value downstream. yield :: Monad m => o -> Conduit i o m () yield o = Conduit $ \f -> Yield (f ()) (return ()) o -- | Same as 'yield', but additionally takes a finalizer to be run if -- the downstream component terminates. yieldOr :: o -> m () -- ^ finalizer -> Conduit i o m () yieldOr o m = Conduit $ \f -> Yield (f ()) m o -- | Provide leftover input to be consumed by the next component in -- the current monadic binding. leftover :: i -> Conduit i o m () leftover i = Conduit $ \f -> Leftover (f ()) i -- | Run a conduit pipeline to completion. runConduit :: Monad m => Conduit () () m r -> m r runConduit (Conduit f) = runPipe (f Done) -- | Run a pure conduit pipeline to completion. runConduitPure :: Conduit () () Identity r -> r runConduitPure = runIdentity . runConduit -- | Run a conduit pipeline in a 'ResourceT' context for acquiring resources. runConduitRes :: (MonadBracket m, MonadIO m) => Conduit () () (ResourceT m) r -> m r runConduitRes = runResourceT . runConduit bracketConduit :: MonadResource m => IO a -> (a -> IO b) -> (a -> Conduit i o m r) -> Conduit i o m r bracketConduit acquire cleanup inner = do (resource, release) <- allocate acquire cleanup result <- inner resource release return result -- | Internal: run a @Pipe@ runPipe :: Monad m => Pipe () () () () m r -> m r runPipe = go where go (Yield p _ ()) = go p go (Await _ p) = go (p ()) go (Done r) = return r go (PipeM mp) = mp >>= go go (Leftover p ()) = go p -- | Send the output of the first Conduit component to the second -- Conduit component. fuse :: Monad m => Conduit a b m () -> Conduit b c m r -> Conduit a c m r fuse (Conduit left0) (Conduit right0) = Conduit $ \rest -> let goRight final left right = case right of Yield p c o -> Yield (recurse p) (c >> final) o Await rp rc -> goLeft rp rc final left Done r2 -> PipeM (final >> return (rest r2)) PipeM mp -> PipeM (liftM recurse mp) Leftover right' i -> goRight final (Yield left final i) right' where recurse = goRight final left goLeft rp rc final left = case left of Yield left' final' o -> goRight final' left' (rp o) Await left' lc -> Await (recurse . left') (recurse . lc) Done r1 -> goRight (return ()) (Done r1) (rc r1) PipeM mp -> PipeM (liftM recurse mp) Leftover left' i -> Leftover (recurse left') i where recurse = goLeft rp rc final in goRight (return ()) (left0 Done) (right0 Done) {- FIXME for later, if we add resourcet -- | Safely acquire a resource and register a cleanup action for it, -- in the context of a 'Conduit'. bracketConduit :: MonadResource m => IO a -- ^ acquire -> (a -> IO ()) -- ^ cleanup -> (a -> Conduit i o m r) -> Conduit i o m r bracketConduit alloc cleanup inner = Conduit $ \rest -> PipeM $ do (key, val) <- allocate alloc cleanup return $ unConduit (addCleanup (const $ release key) (inside seed)) rest addCleanup :: Monad m => (Bool -> m ()) -> Conduit i o m r -> Conduit i o m r addCleanup cleanup (Conduit c0) = Conduit $ \rest -> let go (Done r) = PipeM (cleanup True >> return (rest r)) go (Yield src close x) = Yield (go src) (cleanup False >> close) x go (PipeM msrc) = PipeM (liftM (go) msrc) go (Await p c) = Await (go . p) (go . c) go (Leftover p i) = Leftover (go p) i in go (c0 Done) -} newtype ZipSink i m r = ZipSink { getZipSink :: Conduit i () m r } instance Monad m => Functor (ZipSink i m) where fmap f (ZipSink x) = ZipSink (liftM f x) instance Monad m => Applicative (ZipSink i m) where pure = ZipSink . return ZipSink (Conduit f0) <*> ZipSink (Conduit x0) = ZipSink $ Conduit $ \rest -> let go (Leftover _ i) _ = absurd i go _ (Leftover _ i) = absurd i go (Yield f _ ()) x = go f x go f (Yield x _ ()) = go f x go (PipeM mf) x = PipeM (liftM (`go` x) mf) go f (PipeM mx) = PipeM (liftM (go f) mx) go (Done f) (Done x) = rest (f x) go (Await pf cf) (Await px cx) = Await (\i -> go (pf i) (px i)) (\() -> go (cf ()) (cx ())) go (Await pf cf) x@Done{} = Await (\i -> go (pf i) x) (\() -> go (cf ()) x) go f@Done{} (Await px cx) = Await (\i -> go f (px i)) (\() -> go f (cx ())) in go (injectLeftovers (f0 Done)) (injectLeftovers (x0 Done)) data Void absurd :: Void -> a absurd _ = error "Foundation.Conduit.Internal.absurd" injectLeftovers :: Monad m => Pipe i i o u m r -> Pipe l i o u m r injectLeftovers = go [] where go ls (Yield p c o) = Yield (go ls p) c o go (l:ls) (Await p _) = go ls $ p l go [] (Await p c) = Await (go [] . p) (go [] . c) go _ (Done r) = Done r go ls (PipeM mp) = PipeM (liftM (go ls) mp) go ls (Leftover p l) = go (l:ls) p --------------------- -- ResourceT --------------------- newtype ResourceT m a = ResourceT { unResourceT :: PrimVar IO ReleaseMap -> m a } instance Functor m => Functor (ResourceT m) where fmap f (ResourceT m) = ResourceT $ \r -> fmap f (m r) instance Applicative m => Applicative (ResourceT m) where pure = ResourceT . const . pure ResourceT mf <*> ResourceT ma = ResourceT $ \r -> mf r <*> ma r instance Monad m => Monad (ResourceT m) where #if !MIN_VERSION_base(4,8,0) return = ResourceT . const . return #endif ResourceT ma >>= f = ResourceT $ \r -> do a <- ma r let ResourceT f' = f a f' r instance MonadTrans ResourceT where lift = ResourceT . const instance MonadIO m => MonadIO (ResourceT m) where liftIO = lift . liftIO instance MonadThrow m => MonadThrow (ResourceT m) where throw = lift . throw instance MonadCatch m => MonadCatch (ResourceT m) where catch (ResourceT f) g = ResourceT $ \env -> f env `catch` \e -> unResourceT (g e) env instance MonadBracket m => MonadBracket (ResourceT m) where generalBracket acquire onSuccess onExc inner = ResourceT $ \env -> generalBracket (unResourceT acquire env) (\x y -> unResourceT (onSuccess x y) env) (\x y -> unResourceT (onExc x y) env) (\x -> unResourceT (inner x) env) data ReleaseMap = ReleaseMap !NextKey !RefCount ![(Word, (ReleaseType -> IO ()))] -- FIXME use a proper Map? | ReleaseMapClosed data ReleaseType = ReleaseEarly | ReleaseNormal | ReleaseException type RefCount = Word type NextKey = Word runResourceT :: (MonadBracket m, MonadIO m) => ResourceT m a -> m a runResourceT (ResourceT inner) = generalBracket (liftIO $ primVarNew $ ReleaseMap maxBound (minBound + 1) []) (\state _res -> liftIO $ cleanup state ReleaseNormal) (\state _exc -> liftIO $ cleanup state ReleaseException) inner where cleanup istate rtype = do mm <- atomicModifyIORef istate $ \rm -> case rm of ReleaseMap nk rf m -> let rf' = rf - 1 in if rf' == minBound then (ReleaseMapClosed, Just m) else (ReleaseMap nk rf' m, Nothing) ReleaseMapClosed -> error "runResourceT: cleanup on ReleaseMapClosed" case mm of Just m -> mapM_ (\(_, x) -> ignoreExceptions (x rtype)) m Nothing -> return () where ignoreExceptions io = void io `catch` (\(_ :: SomeException) -> return ()) allocate :: (MonadResource m, MonadIO n) => IO a -> (a -> IO b) -> m (a, n ()) allocate acquire release = liftResourceT $ ResourceT $ \istate -> liftIO $ mask_ $ do a <- acquire key <- atomicModifyIORef istate $ \rm -> case rm of ReleaseMap key rf m -> ( ReleaseMap (key - 1) rf ((key, const $ void $ release a) : m) , key ) ReleaseMapClosed -> error "allocate: ReleaseMapClosed" let release' = join $ atomicModifyIORef istate $ \rm -> case rm of ReleaseMap nextKey rf m -> let loop front [] = (ReleaseMap nextKey rf (front []), return ()) loop front ((key', action):rest) | key == key' = ( ReleaseMap nextKey rf (front rest) , action ReleaseEarly ) | otherwise = loop (front . ((key', action):)) rest in loop id m ReleaseMapClosed -> error "allocate: ReleaseMapClosed (2)" return (a, liftIO release') class MonadIO m => MonadResource m where liftResourceT :: ResourceT IO a -> m a instance MonadIO m => MonadResource (ResourceT m) where liftResourceT (ResourceT f) = ResourceT (liftIO . f) instance MonadResource m => MonadResource (Conduit i o m) where liftResourceT = lift . liftResourceT