{-# LANGUAGE CPP #-} {-# LANGUAGE ExistentialQuantification #-} #include "inline.hs" -- | -- Module : Streamly.Internal.Data.Pipe.Types -- Copyright : (c) 2019 Composewell Technologies -- License : BSD3 -- Maintainer : streamly@composewell.com -- Stability : experimental -- Portability : GHC module Streamly.Internal.Data.Pipe.Types ( Step (..) , Pipe (..) , PipeState (..) , zipWith , tee , map , compose ) where import Control.Arrow (Arrow(..)) import Control.Category (Category(..)) import Data.Maybe (isJust) #if __GLASGOW_HASKELL__ < 808 import Data.Semigroup (Semigroup(..)) #endif import Prelude hiding (zipWith, map, id, unzip, null) import Streamly.Internal.Data.Strict (Tuple'(..), Tuple3'(..)) import qualified Prelude ------------------------------------------------------------------------------ -- Pipes ------------------------------------------------------------------------------ -- A scan is a much simpler version of pipes. A scan always produces an output -- on an input whereas a pipe does not necessarily produce an output on an -- input, it might consume multiple inputs before producing an output. That way -- it can implement filtering. Similarly, it can produce more than one output -- on an single input. -- -- Therefore when two pipes are composed in parallel formation, one may run -- slower or faster than the other. If all of them are being fed from the same -- source, we may have to buffer the input to match the speeds. In case of -- scans we do not have that problem. -- -- We may also need a "Stop" constructor to indicate that we are not generating -- any more values and we can have a "Done" constructor to indicate that we are -- not consuming any more values. Similarly we can have a stop with error or -- exception and a done with error or leftover values. -- -- In generator mode, Continue means no output/continue. In fold mode Continue means -- need more input to produce result. we can perhaps call it Continue instead. -- data Step s a = Yield a s | Continue s -- | Represents a stateful transformation over an input stream of values of -- type @a@ to outputs of type @b@ in 'Monad' @m@. -- A pipe uses a consume function and a produce function. It can switch from -- consume/fold mode to a produce/source mode. The first step function is a -- fold function while the seocnd one is a stream generator function. -- -- We can upgrade a stream or a fold into a pipe. However, streams are more -- efficient in generation and folds are more efficient in consumption. -- -- For pure transformation we can have a 'Scan' type. A Scan would be more -- efficient in zipping whereas pipes are useful for merging and zipping where -- we know buffering can occur. A Scan type can be upgraded to a pipe. -- -- XXX In general the starting state could either be for generation or for -- consumption. Currently we are only starting with a consumption state. -- -- An explicit either type for better readability of the code data PipeState s1 s2 = Consume s1 | Produce s2 isProduce :: PipeState s1 s2 -> Bool isProduce s = case s of Produce _ -> True Consume _ -> False data Pipe m a b = forall s1 s2. Pipe (s1 -> a -> m (Step (PipeState s1 s2) b)) (s2 -> m (Step (PipeState s1 s2) b)) s1 instance Monad m => Functor (Pipe m a) where {-# INLINE_NORMAL fmap #-} fmap f (Pipe consume produce initial) = Pipe consume' produce' initial where {-# INLINE_LATE consume' #-} consume' st a = do r <- consume st a return $ case r of Yield x s -> Yield (f x) s Continue s -> Continue s {-# INLINE_LATE produce' #-} produce' st = do r <- produce st return $ case r of Yield x s -> Yield (f x) s Continue s -> Continue s -- XXX move this to a separate module data Deque a = Deque [a] [a] {-# INLINE null #-} null :: Deque a -> Bool null (Deque [] []) = True null _ = False {-# INLINE snoc #-} snoc :: a -> Deque a -> Deque a snoc a (Deque snocList consList) = Deque (a : snocList) consList {-# INLINE uncons #-} uncons :: Deque a -> Maybe (a, Deque a) uncons (Deque snocList consList) = case consList of h : t -> Just (h, Deque snocList t) _ -> case Prelude.reverse snocList of h : t -> Just (h, Deque [] t) _ -> Nothing -- | The composed pipe distributes the input to both the constituent pipes and -- zips the output of the two using a supplied zipping function. -- -- @since 0.7.0 {-# INLINE_NORMAL zipWith #-} zipWith :: Monad m => (a -> b -> c) -> Pipe m i a -> Pipe m i b -> Pipe m i c zipWith f (Pipe consumeL produceL stateL) (Pipe consumeR produceR stateR) = Pipe consume produce state where -- Left state means we need to consume input from the source. A Right -- state means we either have buffered input or we are in generation -- mode so we do not need input from source in either case. -- state = Tuple' (Consume stateL, Nothing, Nothing) (Consume stateR, Nothing, Nothing) -- XXX for heavy buffering we need to have the (ring) buffer in pinned -- memory using the Storable instance. {-# INLINE_LATE consume #-} consume (Tuple' (sL, resL, lq) (sR, resR, rq)) a = do s1 <- drive sL resL lq consumeL produceL a s2 <- drive sR resR rq consumeR produceR a yieldOutput s1 s2 where {-# INLINE drive #-} drive st res queue fConsume fProduce val = do case res of Nothing -> goConsume st queue val fConsume fProduce Just x -> return $ case queue of Nothing -> (st, Just x, Just $ (Deque [val] [])) Just q -> (st, Just x, Just $ snoc val q) {-# INLINE goConsume #-} goConsume stt queue val fConsume stp2 = do case stt of Consume st -> do case queue of Nothing -> do r <- fConsume st val return $ case r of Yield x s -> (s, Just x, Nothing) Continue s -> (s, Nothing, Nothing) Just queue' -> case uncons queue' of Just (v, q) -> do r <- fConsume st v let q' = snoc val q return $ case r of Yield x s -> (s, Just x, Just q') Continue s -> (s, Nothing, Just q') Nothing -> undefined -- never occurs Produce st -> do r <- stp2 st return $ case r of Yield x s -> (s, Just x, queue) Continue s -> (s, Nothing, queue) {-# INLINE_LATE produce #-} produce (Tuple' (sL, resL, lq) (sR, resR, rq)) = do s1 <- drive sL resL lq consumeL produceL s2 <- drive sR resR rq consumeR produceR yieldOutput s1 s2 where {-# INLINE drive #-} drive stt res q fConsume fProduce = do case res of Nothing -> goProduce stt q fConsume fProduce Just x -> return (stt, Just x, q) {-# INLINE goProduce #-} goProduce stt queue fConsume fProduce = do case stt of Consume st -> do case queue of -- See yieldOutput. We enter produce mode only when -- each pipe is either in Produce state or the -- queue is non-empty. So this case cannot occur. Nothing -> undefined Just queue' -> case uncons queue' of Just (v, q) -> do r <- fConsume st v -- We provide a guarantee that if the -- queue is "Just" it is always -- non-empty. yieldOutput and goConsume -- depend on it. let q' = if null q then Nothing else Just q return $ case r of Yield x s -> (s, Just x, q') Continue s -> (s, Nothing, q') Nothing -> return (stt, Nothing, Nothing) Produce st -> do r <- fProduce st return $ case r of Yield x s -> (s, Just x, queue) Continue s -> (s, Nothing, queue) {-# INLINE yieldOutput #-} yieldOutput s1@(sL', resL', lq') s2@(sR', resR', rq') = return $ -- switch to produce mode if we do not need input if (isProduce sL' || isJust lq') && (isProduce sR' || isJust rq') then case (resL', resR') of (Just xL, Just xR) -> Yield (f xL xR) (Produce (Tuple' (clear s1) (clear s2))) _ -> Continue (Produce (Tuple' s1 s2)) else case (resL', resR') of (Just xL, Just xR) -> Yield (f xL xR) (Consume (Tuple' (clear s1) (clear s2))) _ -> Continue (Consume (Tuple' s1 s2)) where clear (s, _, q) = (s, Nothing, q) instance Monad m => Applicative (Pipe m a) where {-# INLINE pure #-} pure b = Pipe (\_ _ -> pure $ Yield b (Consume ())) undefined () (<*>) = zipWith id -- | The composed pipe distributes the input to both the constituent pipes and -- merges the outputs of the two. -- -- @since 0.7.0 {-# INLINE_NORMAL tee #-} tee :: Monad m => Pipe m a b -> Pipe m a b -> Pipe m a b tee (Pipe consumeL produceL stateL) (Pipe consumeR produceR stateR) = Pipe consume produce state where state = Tuple' (Consume stateL) (Consume stateR) consume (Tuple' sL sR) a = do case sL of Consume st -> do r <- consumeL st a return $ case r of Yield x s -> Yield x (Produce (Tuple3' (Just a) s sR)) Continue s -> Continue (Produce (Tuple3' (Just a) s sR)) -- XXX we should never come here unless the initial state of the -- first pipe is set to "Right". Produce _st -> undefined -- do {- r <- produceL st return $ case r of Yield x s -> Yield x (Right (Tuple3' (Just a) s sR)) Continue s -> Continue (Right (Tuple3' (Just a) s sR)) -} produce (Tuple3' (Just a) sL sR) = do case sL of Consume _ -> do case sR of Consume st -> do r <- consumeR st a let nextL s = Consume (Tuple' sL s) let nextR s = Produce (Tuple3' Nothing sL s) return $ case r of Yield x s@(Consume _) -> Yield x (nextL s) Yield x s@(Produce _) -> Yield x (nextR s) Continue s@(Consume _) -> Continue (nextL s) Continue s@(Produce _) -> Continue (nextR s) -- We will never come here unless the initial state of -- second pipe is set to "Right". Produce _ -> undefined Produce st -> do r <- produceL st let next s = Produce (Tuple3' (Just a) s sR) return $ case r of Yield x s -> Yield x (next s) Continue s -> Continue (next s) produce (Tuple3' Nothing sL sR) = do case sR of Consume _ -> undefined -- should never occur Produce st -> do r <- produceR st return $ case r of Yield x s@(Consume _) -> Yield x (Consume (Tuple' sL s)) Yield x s@(Produce _) -> Yield x (Produce (Tuple3' Nothing sL s)) Continue s@(Consume _) -> Continue (Consume (Tuple' sL s)) Continue s@(Produce _) -> Continue (Produce (Tuple3' Nothing sL s)) instance Monad m => Semigroup (Pipe m a b) where {-# INLINE (<>) #-} (<>) = tee -- | Lift a pure function to a 'Pipe'. -- -- @since 0.7.0 {-# INLINE map #-} map :: Monad m => (a -> b) -> Pipe m a b map f = Pipe consume undefined () where consume _ a = return $ Yield (f a) (Consume ()) {- -- | A hollow or identity 'Pipe' passes through everything that comes in. -- -- @since 0.7.0 {-# INLINE id #-} id :: Monad m => Pipe m a a id = map Prelude.id -} -- | Compose two pipes such that the output of the second pipe is attached to -- the input of the first pipe. -- -- @since 0.7.0 {-# INLINE_NORMAL compose #-} compose :: Monad m => Pipe m b c -> Pipe m a b -> Pipe m a c compose (Pipe consumeL produceL stateL) (Pipe consumeR produceR stateR) = Pipe consume produce state where state = Tuple' (Consume stateL) (Consume stateR) consume (Tuple' sL sR) a = do case sL of Consume stt -> case sR of Consume st -> do rres <- consumeR st a case rres of Yield x sR' -> do let next s = if isProduce sR' then Produce s else Consume s lres <- consumeL stt x return $ case lres of Yield y s1@(Consume _) -> Yield y (next $ Tuple' s1 sR') Yield y s1@(Produce _) -> Yield y (Produce $ Tuple' s1 sR') Continue s1@(Consume _) -> Continue (next $ Tuple' s1 sR') Continue s1@(Produce _) -> Continue (Produce $ Tuple' s1 sR') Continue s1@(Consume _) -> return $ Continue (Consume $ Tuple' sL s1) Continue s1@(Produce _) -> return $ Continue (Produce $ Tuple' sL s1) Produce _ -> undefined -- XXX we should never come here unless the initial state of the -- first pipe is set to "Right". Produce _ -> undefined -- XXX we need to write the code in mor optimized fashion. Use Continue -- more and less yield points. produce (Tuple' sL sR) = do case sL of Produce st -> do r <- produceL st let next s = if isProduce sR then Produce s else Consume s return $ case r of Yield x s@(Consume _) -> Yield x (next $ Tuple' s sR) Yield x s@(Produce _) -> Yield x (Produce $ Tuple' s sR) Continue s@(Consume _) -> Continue (next $ Tuple' s sR) Continue s@(Produce _) -> Continue (Produce $ Tuple' s sR) Consume stt -> case sR of Produce st -> do rR <- produceR st case rR of Yield x sR' -> do let next s = if isProduce sR' then Produce s else Consume s rL <- consumeL stt x return $ case rL of Yield y s1@(Consume _) -> Yield y (next $ Tuple' s1 sR') Yield y s1@(Produce _) -> Yield y (Produce $ Tuple' s1 sR') Continue s1@(Consume _) -> Continue (next $ Tuple' s1 sR') Continue s1@(Produce _) -> Continue (Produce $ Tuple' s1 sR') Continue s1@(Consume _) -> return $ Continue (Consume $ Tuple' sL s1) Continue s1@(Produce _) -> return $ Continue (Produce $ Tuple' sL s1) Consume _ -> return $ Continue (Consume $ Tuple' sL sR) instance Monad m => Category (Pipe m) where {-# INLINE id #-} id = map Prelude.id {-# INLINE (.) #-} (.) = compose unzip :: Pipe m a x -> Pipe m b y -> Pipe m (a, b) (x, y) unzip = undefined instance Monad m => Arrow (Pipe m) where {-# INLINE arr #-} arr = map {-# INLINE (***) #-} (***) = unzip {-# INLINE (&&&) #-} (&&&) = zipWith (,)