{-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-} {-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE InstanceSigs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MagicHash #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE UnboxedTuples #-} {-# LANGUAGE UndecidableInstances #-} -- XXX -- | -- Module : Streamly.Streams.StreamK -- Copyright : (c) 2017 Harendra Kumar -- -- License : BSD3 -- Maintainer : harendra.kumar@gmail.com -- Stability : experimental -- Portability : GHC -- -- -- Continuation passing style (CPS) stream implementation. The symbol 'K' below -- denotes a function as well as a Kontinuation. -- -- @ -- import qualified Streamly.Streams.StreamK as K -- @ -- module Streamly.Streams.StreamK ( -- * A class for streams IsStream (..) , adapt -- * The stream type , Stream (..) -- * Construction , mkStream , nil , cons , (.:) -- * Asynchronous construction , nilK , yieldK , consK -- * Deconstruction , uncons -- * Generation -- ** Unfolds , unfoldr , unfoldrM -- ** Specialized Generation , repeat -- ** Conversions , yield , yieldM , fromFoldable , fromList , fromStreamK -- * Elimination -- ** General Folds , foldStream , foldr , foldrM , foldl' , foldlM' , foldx , foldxM -- ** Specialized Folds , runStream , null , head , tail , elem , notElem , all , any , last , minimum , maximum -- ** Map and Fold , mapM_ -- ** Conversions , toList , toStreamK -- * Transformation -- ** By folding (scans) , scanl' , scanx -- ** Filtering , filter , take , takeWhile , drop , dropWhile -- ** Mapping , map , mapM , sequence -- ** Map and Filter , mapMaybe -- * Semigroup Style Composition , serial -- * Utilities , consMSerial , bindWith , withLocal -- * Deprecated , Streaming -- deprecated , once -- deprecated ) where import Control.Monad (void) import Control.Monad.Reader.Class (MonadReader(..)) import Control.Monad.Trans.Class (MonadTrans(lift)) import Data.Semigroup (Semigroup(..)) import Prelude hiding (foldl, foldr, last, map, mapM, mapM_, repeat, sequence, take, filter, all, any, takeWhile, drop, dropWhile, minimum, maximum, elem, notElem, null, head, tail) import qualified Prelude import Streamly.SVar ------------------------------------------------------------------------------ -- The basic stream type ------------------------------------------------------------------------------ -- | The type @Stream m a@ represents a monadic stream of values of type 'a' -- constructed using actions in monad 'm'. It uses stop, singleton and yield -- continuations equivalent to the following direct style type: -- -- @ -- data Stream m a = Stop | Singleton a | Yield a (Stream m a) -- @ -- -- To facilitate parallel composition we maintain a local state in an 'SVar' -- that is shared across and is used for synchronization of the streams being -- composed. -- -- The singleton case can be expressed in terms of stop and yield but we have -- it as a separate case to optimize composition operations for streams with -- single element. We build singleton streams in the implementation of 'pure' -- for Applicative and Monad, and in 'lift' for MonadTrans. -- newtype Stream m a = Stream { unStream :: forall r. State Stream m a -- state -> m r -- stop -> (a -> m r) -- singleton -> (a -> Stream m a -> m r) -- yield -> m r } ------------------------------------------------------------------------------ -- Types that can behave as a Stream ------------------------------------------------------------------------------ infixr 5 `consM` infixr 5 |: -- | Class of types that can represent a stream of elements of some type 'a' in -- some monad 'm'. -- -- @since 0.2.0 class IsStream t where toStream :: t m a -> Stream m a fromStream :: Stream m a -> t m a -- | Constructs a stream by adding a monadic action at the head of an -- existing stream. For example: -- -- @ -- > toList $ getLine \`consM` getLine \`consM` nil -- hello -- world -- ["hello","world"] -- @ -- -- /Concurrent (do not use 'parallely' to construct infinite streams)/ -- -- @since 0.2.0 consM :: MonadAsync m => m a -> t m a -> t m a -- | Operator equivalent of 'consM'. We can read it as "@parallel colon@" -- to remember that @|@ comes before ':'. -- -- @ -- > toList $ getLine |: getLine |: nil -- hello -- world -- ["hello","world"] -- @ -- -- @ -- let delay = threadDelay 1000000 >> print 1 -- runStream $ serially $ delay |: delay |: delay |: nil -- runStream $ parallely $ delay |: delay |: delay |: nil -- @ -- -- /Concurrent (do not use 'parallely' to construct infinite streams)/ -- -- @since 0.2.0 (|:) :: MonadAsync m => m a -> t m a -> t m a -- We can define (|:) just as 'consM' but it is defined explicitly for each -- type because we want to use SPECIALIZE pragma on the definition. -- | Same as 'IsStream'. -- -- @since 0.1.0 {-# DEPRECATED Streaming "Please use IsStream instead." #-} type Streaming = IsStream ------------------------------------------------------------------------------- -- Type adapting combinators ------------------------------------------------------------------------------- -- | Adapt any specific stream type to any other specific stream type. -- -- @since 0.1.0 adapt :: (IsStream t1, IsStream t2) => t1 m a -> t2 m a adapt = fromStream . toStream ------------------------------------------------------------------------------ -- Building a stream ------------------------------------------------------------------------------ -- | Build a stream from an 'SVar', a stop continuation, a singleton stream -- continuation and a yield continuation. mkStream:: IsStream t => (forall r. State Stream m a -> m r -> (a -> m r) -> (a -> t m a -> m r) -> m r) -> t m a mkStream k = fromStream $ Stream $ \st stp sng yld -> let yieldk a r = yld a (toStream r) in k (rstState st) stp sng yieldk ------------------------------------------------------------------------------ -- Construction ------------------------------------------------------------------------------ -- | An empty stream. -- -- @ -- > toList nil -- [] -- @ -- -- @since 0.1.0 nil :: IsStream t => t m a nil = fromStream $ Stream $ \_ stp _ _ -> stp infixr 5 `cons` -- faster than consM because there is no bind. -- | Construct a stream by adding a pure value at the head of an existing -- stream. For serial streams this is the same as @(return a) \`consM` r@ but -- more efficient. For concurrent streams this is not concurrent whereas -- 'consM' is concurrent. For example: -- -- @ -- > toList $ 1 \`cons` 2 \`cons` 3 \`cons` nil -- [1,2,3] -- @ -- -- @since 0.1.0 cons :: IsStream t => a -> t m a -> t m a cons a r = fromStream $ Stream $ \_ _ _ yld -> yld a (toStream r) infixr 5 .: -- | Operator equivalent of 'cons'. -- -- @ -- > toList $ 1 .: 2 .: 3 .: nil -- [1,2,3] -- @ -- -- @since 0.1.1 (.:) :: IsStream t => a -> t m a -> t m a (.:) = cons {-# INLINE consMSerial #-} consMSerial :: (Monad m) => m a -> Stream m a -> Stream m a consMSerial m r = Stream $ \_ _ _ yld -> m >>= \a -> yld a r ------------------------------------------------------------------------------ -- Asynchronous construction ------------------------------------------------------------------------------ -- | Make an empty stream from a callback function. nilK :: IsStream t => (forall r. m r -> m r) -> t m a nilK k = fromStream $ Stream $ \_ stp _ _ -> k stp -- | Make a singleton stream from a one shot callback function. yieldK :: IsStream t => (forall r. (a -> m r) -> m r) -> t m a yieldK k = fromStream $ Stream $ \_ _ sng _ -> k sng -- | Construct a stream from a callback function. consK :: IsStream t => (forall r. (a -> m r) -> m r) -> t m a -> t m a consK k r = fromStream $ Stream $ \_ _ _ yld -> k (\x -> yld x (toStream r)) -- XXX consK with concurrent callbacks -- XXX Build a stream from a repeating callback function. ------------------------------------------------------------------------------- -- IsStream Stream ------------------------------------------------------------------------------- instance IsStream Stream where toStream = id fromStream = id {-# INLINE consM #-} {-# SPECIALIZE consM :: IO a -> Stream IO a -> Stream IO a #-} consM :: Monad m => m a -> Stream m a -> Stream m a consM = consMSerial {-# INLINE (|:) #-} {-# SPECIALIZE (|:) :: IO a -> Stream IO a -> Stream IO a #-} (|:) :: Monad m => m a -> Stream m a -> Stream m a (|:) = consMSerial ------------------------------------------------------------------------------- -- Deconstruction ------------------------------------------------------------------------------- {-# INLINE uncons #-} uncons :: (IsStream t, Monad m) => t m a -> m (Maybe (a, t m a)) uncons m = let stop = return Nothing single a = return (Just (a, nil)) yieldk a r = return (Just (a, fromStream r)) in (unStream (toStream m)) defState stop single yieldk ------------------------------------------------------------------------------- -- Generation ------------------------------------------------------------------------------- {-# INLINE unfoldr #-} unfoldr :: IsStream t => (b -> Maybe (a, b)) -> b -> t m a unfoldr step = fromStream . go where go s = Stream $ \_ stp _ yld -> case step s of Nothing -> stp Just (a, b) -> yld a (go b) {-# INLINE unfoldrM #-} unfoldrM :: (IsStream t, MonadAsync m) => (b -> m (Maybe (a, b))) -> b -> t m a unfoldrM step = go where go s = fromStream $ Stream $ \svr stp sng yld -> do mayb <- step s case mayb of Nothing -> stp Just (a, b) -> unStream (toStream (return a |: go b)) svr stp sng yld ------------------------------------------------------------------------------- -- Special generation ------------------------------------------------------------------------------- -- Faster than yieldM because there is no bind. Usually we can construct a -- stream from a pure value using "pure" in an applicative, however in case of -- Zip streams pure creates an infinite stream. -- -- | Create a singleton stream from a pure value. In monadic streams, 'pure' or -- 'return' can be used in place of 'yield', however, in Zip applicative -- streams 'pure' is equivalent to 'repeat'. -- -- @since 0.4.0 yield :: IsStream t => a -> t m a yield a = fromStream $ Stream $ \_ _ single _ -> single a -- | Create a singleton stream from a monadic action. Same as @m \`consM` nil@ -- but more efficient. -- -- @ -- > toList $ yieldM getLine -- hello -- ["hello"] -- @ -- -- @since 0.4.0 {-# INLINE yieldM #-} yieldM :: (Monad m, IsStream t) => m a -> t m a yieldM m = fromStream $ Stream $ \_ _ single _ -> m >>= single -- | Same as yieldM -- -- @since 0.2.0 {-# DEPRECATED once "Please use yieldM instead." #-} {-# INLINE once #-} once :: (Monad m, IsStream t) => m a -> t m a once = yieldM -- | Generate an infinite stream by repeating a pure value. -- -- @since 0.4.0 repeat :: IsStream t => a -> t m a repeat a = let x = cons a x in x ------------------------------------------------------------------------------- -- Conversions ------------------------------------------------------------------------------- -- | Construct a stream from a 'Foldable' containing pure values. -- -- @since 0.2.0 {-# INLINE fromFoldable #-} fromFoldable :: (IsStream t, Foldable f) => f a -> t m a fromFoldable = Prelude.foldr cons nil {-# INLINE fromList #-} fromList :: IsStream t => [a] -> t m a fromList = fromFoldable {-# INLINE fromStreamK #-} fromStreamK :: Stream m a -> Stream m a fromStreamK = id ------------------------------------------------------------------------------- -- Elimination by Folding ------------------------------------------------------------------------------- -- | Fold a stream by providing an SVar, a stop continuation, a singleton -- continuation and a yield continuation. foldStream :: IsStream t => State Stream m a -> m r -> (a -> m r) -> (a -> t m a -> m r) -> t m a -> m r foldStream st blank single step m = let yieldk a x = step a (fromStream x) in (unStream (toStream m)) st blank single yieldk -- | Lazy right associative fold. foldr :: (IsStream t, Monad m) => (a -> b -> b) -> b -> t m a -> m b foldr step acc m = go (toStream m) where go m1 = let stop = return acc single a = return (step a acc) yieldk a r = go r >>= \b -> return (step a b) in (unStream m1) defState stop single yieldk -- | Lazy right fold with a monadic step function. {-# INLINE foldrM #-} foldrM :: (IsStream t, Monad m) => (a -> b -> m b) -> b -> t m a -> m b foldrM step acc m = go (toStream m) where go m1 = let stop = return acc single a = step a acc yieldk a r = go r >>= step a in (unStream m1) defState stop single yieldk -- | Strict left fold with an extraction function. Like the standard strict -- left fold, but applies a user supplied extraction function (the third -- argument) to the folded value at the end. This is designed to work with the -- @foldl@ library. The suffix @x@ is a mnemonic for extraction. {-# INLINE foldx #-} foldx :: (IsStream t, Monad m) => (x -> a -> x) -> x -> (x -> b) -> t m a -> m b foldx step begin done m = get $ go (toStream m) begin where {-# NOINLINE get #-} get m1 = let single = return . done in (unStream m1) undefined undefined single undefined -- Note, this can be implemented by making a recursive call to "go", -- however that is more expensive because of unnecessary recursion -- that cannot be tail call optimized. Unfolding recursion explicitly via -- continuations is much more efficient. go m1 !acc = Stream $ \_ _ sng yld -> let stop = sng acc single a = sng $ step acc a yieldk a r = let stream = go r (step acc a) in (unStream stream) defState undefined sng yld in (unStream m1) defState stop single yieldk -- | Strict left associative fold. {-# INLINE foldl' #-} foldl' :: (IsStream t, Monad m) => (b -> a -> b) -> b -> t m a -> m b foldl' step begin m = foldx step begin id m -- XXX replace the recursive "go" with explicit continuations. -- | Like 'foldx', but with a monadic step function. foldxM :: (IsStream t, Monad m) => (x -> a -> m x) -> m x -> (x -> m b) -> t m a -> m b foldxM step begin done m = go begin (toStream m) where go !acc m1 = let stop = acc >>= done single a = acc >>= \b -> step b a >>= done yieldk a r = acc >>= \b -> go (step b a) r in (unStream m1) defState stop single yieldk -- | Like 'foldl'' but with a monadic step function. foldlM' :: (IsStream t, Monad m) => (b -> a -> m b) -> b -> t m a -> m b foldlM' step begin m = foldxM step (return begin) return m ------------------------------------------------------------------------------ -- Specialized folds ------------------------------------------------------------------------------ {-# INLINE runStream #-} runStream :: (Monad m, IsStream t) => t m a -> m () runStream m = go (toStream m) where go m1 = let stop = return () single _ = return () yieldk _ r = go (toStream r) in unStream m1 defState stop single yieldk {-# INLINE null #-} null :: (IsStream t, Monad m) => t m a -> m Bool null m = let stop = return True single _ = return False yieldk _ _ = return False in unStream (toStream m) defState stop single yieldk {-# INLINE head #-} head :: (IsStream t, Monad m) => t m a -> m (Maybe a) head m = let stop = return Nothing single a = return (Just a) yieldk a _ = return (Just a) in unStream (toStream m) defState stop single yieldk {-# INLINE tail #-} tail :: (IsStream t, Monad m) => t m a -> m (Maybe (t m a)) tail m = let stop = return Nothing single _ = return $ Just nil yieldk _ r = return $ Just $ fromStream r in unStream (toStream m) defState stop single yieldk {-# INLINE elem #-} elem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool elem e m = go (toStream m) where go m1 = let stop = return False single a = return (a == e) yieldk a r = if a == e then return True else go r in (unStream m1) defState stop single yieldk {-# INLINE notElem #-} notElem :: (IsStream t, Monad m, Eq a) => a -> t m a -> m Bool notElem e m = go (toStream m) where go m1 = let stop = return True single a = return (a /= e) yieldk a r = if a == e then return False else go r in (unStream m1) defState stop single yieldk all :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool all p m = go (toStream m) where go m1 = let single a | p a = return True | otherwise = return False yieldk a r | p a = go r | otherwise = return False in unStream m1 defState (return True) single yieldk any :: (IsStream t, Monad m) => (a -> Bool) -> t m a -> m Bool any p m = go (toStream m) where go m1 = let single a | p a = return True | otherwise = return False yieldk a r | p a = return True | otherwise = go r in unStream m1 defState (return False) single yieldk -- | Extract the last element of the stream, if any. {-# INLINE last #-} last :: (IsStream t, Monad m) => t m a -> m (Maybe a) last = foldx (\_ y -> Just y) Nothing id {-# INLINE minimum #-} minimum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a) minimum m = go Nothing (toStream m) where go Nothing m1 = let stop = return Nothing single a = return (Just a) yieldk a r = go (Just a) r in unStream m1 defState stop single yieldk go (Just res) m1 = let stop = return (Just res) single a = if res <= a then return (Just res) else return (Just a) yieldk a r = if res <= a then go (Just res) r else go (Just a) r in unStream m1 defState stop single yieldk {-# INLINE maximum #-} maximum :: (IsStream t, Monad m, Ord a) => t m a -> m (Maybe a) maximum m = go Nothing (toStream m) where go Nothing m1 = let stop = return Nothing single a = return (Just a) yieldk a r = go (Just a) r in unStream m1 defState stop single yieldk go (Just res) m1 = let stop = return (Just res) single a = if res <= a then return (Just a) else return (Just res) yieldk a r = if res <= a then go (Just a) r else go (Just res) r in unStream m1 defState stop single yieldk ------------------------------------------------------------------------------ -- Map and Fold ------------------------------------------------------------------------------ -- | Apply a monadic action to each element of the stream and discard the -- output of the action. mapM_ :: (IsStream t, Monad m) => (a -> m b) -> t m a -> m () mapM_ f m = go (toStream m) where go m1 = let stop = return () single a = void (f a) yieldk a r = f a >> go r in (unStream m1) defState stop single yieldk ------------------------------------------------------------------------------ -- Converting folds ------------------------------------------------------------------------------ {-# INLINABLE toList #-} toList :: (IsStream t, Monad m) => t m a -> m [a] toList = foldr (:) [] {-# INLINE toStreamK #-} toStreamK :: Stream m a -> Stream m a toStreamK = id ------------------------------------------------------------------------------- -- Transformation by folding (Scans) ------------------------------------------------------------------------------- {-# INLINE scanx #-} scanx :: IsStream t => (x -> a -> x) -> x -> (x -> b) -> t m a -> t m b scanx step begin done m = cons (done begin) $ fromStream $ go (toStream m) begin where go m1 !acc = Stream $ \st stp sng yld -> let single a = sng (done $ step acc a) yieldk a r = let s = step acc a in yld (done s) (go r s) in unStream m1 (rstState st) stp single yieldk {-# INLINE scanl' #-} scanl' :: IsStream t => (b -> a -> b) -> b -> t m a -> t m b scanl' step begin m = scanx step begin id m ------------------------------------------------------------------------------- -- Filtering ------------------------------------------------------------------------------- {-# INLINE filter #-} filter :: IsStream t => (a -> Bool) -> t m a -> t m a filter p m = fromStream $ go (toStream m) where go m1 = Stream $ \st stp sng yld -> let single a | p a = sng a | otherwise = stp yieldk a r | p a = yld a (go r) | otherwise = (unStream r) (rstState st) stp single yieldk in unStream m1 (rstState st) stp single yieldk {-# INLINE take #-} take :: IsStream t => Int -> t m a -> t m a take n m = fromStream $ go n (toStream m) where go n1 m1 = Stream $ \st stp sng yld -> let yieldk a r = yld a (go (n1 - 1) r) in if n1 <= 0 then stp else unStream m1 (rstState st) stp sng yieldk {-# INLINE takeWhile #-} takeWhile :: IsStream t => (a -> Bool) -> t m a -> t m a takeWhile p m = fromStream $ go (toStream m) where go m1 = Stream $ \st stp sng yld -> let single a | p a = sng a | otherwise = stp yieldk a r | p a = yld a (go r) | otherwise = stp in unStream m1 (rstState st) stp single yieldk drop :: IsStream t => Int -> t m a -> t m a drop n m = fromStream $ Stream $ \st stp sng yld -> unStream (go n (toStream m)) (rstState st) stp sng yld where go n1 m1 = Stream $ \st stp sng yld -> let single _ = stp yieldk _ r = (unStream $ go (n1 - 1) r) st stp sng yld -- Somehow "<=" check performs better than a ">" in if n1 <= 0 then unStream m1 st stp sng yld else unStream m1 st stp single yieldk {-# INLINE dropWhile #-} dropWhile :: IsStream t => (a -> Bool) -> t m a -> t m a dropWhile p m = fromStream $ go (toStream m) where go m1 = Stream $ \st stp sng yld -> let single a | p a = stp | otherwise = sng a yieldk a r | p a = (unStream r) (rstState st) stp single yieldk | otherwise = yld a r in unStream m1 (rstState st) stp single yieldk ------------------------------------------------------------------------------- -- Mapping ------------------------------------------------------------------------------- {-# INLINE map #-} map :: (IsStream t, Monad m) => (a -> b) -> t m a -> t m b map f m = fromStream $ Stream $ \st stp sng yld -> let single = sng . f yieldk a r = yld (f a) (fmap f r) in unStream (toStream m) (rstState st) stp single yieldk -- Be careful when modifying this, this uses a consM (|:) deliberately to allow -- other stream types to overload it. {-# INLINE mapM #-} mapM :: (IsStream t, MonadAsync m) => (a -> m b) -> t m a -> t m b mapM f m = go (toStream m) where go m1 = fromStream $ Stream $ \st stp sng yld -> let single a = f a >>= sng yieldk a r = unStream (toStream (f a |: (go r))) st stp sng yld in (unStream m1) (rstState st) stp single yieldk -- Be careful when modifying this, this uses a consM (|:) deliberately to allow -- other stream types to overload it. {-# INLINE sequence #-} sequence :: (IsStream t, MonadAsync m) => t m (m a) -> t m a sequence m = go (toStream m) where go m1 = fromStream $ Stream $ \st stp sng yld -> let single ma = ma >>= sng yieldk ma r = unStream (toStream $ ma |: go r) st stp sng yld in (unStream m1) (rstState st) stp single yieldk ------------------------------------------------------------------------------- -- Map and Filter ------------------------------------------------------------------------------- {-# INLINE mapMaybe #-} mapMaybe :: IsStream t => (a -> Maybe b) -> t m a -> t m b mapMaybe f m = go (toStream m) where go m1 = fromStream $ Stream $ \st stp sng yld -> let single a = case f a of Just b -> sng b Nothing -> stp yieldk a r = case f a of Just b -> yld b (toStream $ go r) Nothing -> (unStream r) (rstState st) stp single yieldk in unStream m1 (rstState st) stp single yieldk ------------------------------------------------------------------------------ -- Semigroup ------------------------------------------------------------------------------ -- | Concatenates two streams sequentially i.e. the first stream is -- exhausted completely before yielding any element from the second stream. {-# INLINE serial #-} serial :: Stream m a -> Stream m a -> Stream m a serial m1 m2 = go m1 where go (Stream m) = Stream $ \st stp sng yld -> let stop = (unStream m2) (rstState st) stp sng yld single a = yld a m2 yieldk a r = yld a (go r) in m (rstState st) stop single yieldk instance Semigroup (Stream m a) where (<>) = serial ------------------------------------------------------------------------------ -- Monoid ------------------------------------------------------------------------------ instance Monoid (Stream m a) where mempty = nil mappend = (<>) ------------------------------------------------------------------------------- -- Functor ------------------------------------------------------------------------------- instance Monad m => Functor (Stream m) where fmap = map ------------------------------------------------------------------------------- -- Bind utility ------------------------------------------------------------------------------- {-# INLINE bindWith #-} bindWith :: (forall c. Stream m c -> Stream m c -> Stream m c) -> Stream m a -> (a -> Stream m b) -> Stream m b bindWith par m f = go m where go (Stream g) = Stream $ \st stp sng yld -> let run x = (unStream x) st stp sng yld single a = run $ f a yieldk a r = run $ f a `par` go r in g (rstState st) stp single yieldk ------------------------------------------------------------------------------ -- Alternative & MonadPlus ------------------------------------------------------------------------------ _alt :: Stream m a -> Stream m a -> Stream m a _alt m1 m2 = Stream $ \st stp sng yld -> let stop = unStream m2 (rstState st) stp sng yld in unStream m1 (rstState st) stop sng yld ------------------------------------------------------------------------------ -- MonadReader ------------------------------------------------------------------------------ withLocal :: MonadReader r m => (r -> r) -> Stream m a -> Stream m a withLocal f m = Stream $ \st stp sng yld -> let single = local f . sng yieldk a r = local f $ yld a (withLocal f r) in (unStream m) (rstState st) (local f stp) single yieldk ------------------------------------------------------------------------------ -- MonadError ------------------------------------------------------------------------------ {- -- XXX handle and test cross thread state transfer withCatchError :: MonadError e m => Stream m a -> (e -> Stream m a) -> Stream m a withCatchError m h = Stream $ \_ stp sng yld -> let run x = unStream x Nothing stp sng yieldk handle r = r `catchError` \e -> run $ h e yieldk a r = yld a (withCatchError r h) in handle $ run m -} ------------------------------------------------------------------------------- -- Transformers ------------------------------------------------------------------------------- instance MonadTrans Stream where lift = yieldM