{-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE PatternSynonyms #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TupleSections #-} {-# LANGUAGE TypeInType #-} {-# LANGUAGE ViewPatterns #-} -- | -- Module : Data.Conduino -- Copyright : (c) Justin Le 2019 -- License : BSD3 -- -- Maintainer : justin@jle.im -- Stability : experimental -- Portability : non-portable -- -- Base API for 'Pipe'. See documentation for 'Pipe', '.|', and 'runPipe' -- for information on usage. -- -- A "prelude" of useful pipes can be found in "Data.Conduino.Combinators". -- -- == Why a stream processing library? -- -- A stream processing library is a way to stream processors in a /composable/ way: -- instead of defining your entire stream processing function as a single -- recursive loop with some global state, instead think about each "stage" of the process, -- and isolate each state to its own segment. Each component can contain its own -- isolated state: -- -- >>> runPipePure $ sourceList [1..10] -- .| scan (+) 0 -- .| sinkList -- [1,3,6,10,15,21,28,36,45,55] -- -- All of these components have internal "state": -- -- * @sourceList@ keeps track of "which" item in the list to yield next -- * @scan@ keeps track of the current running sum -- * @sinkList@ keeps track of all items that have been seen so far, as a list -- -- They all work together without knowing any other component's internal state, so -- you can write your total streaming function without concerning yourself, at -- each stage, with the entire part. -- -- In addition, there are useful functions to "combine" stream processors: -- -- * 'zipSink' combines sinks in an "and" sort of way: combine two sinks in -- parallel and finish when all finish. -- * 'altSink' combines sinks in an "or" sort of way: combine two sinks in -- parallel and finish when any of them finish -- * 'zipSource' combines sources in parallel and collate their outputs. -- -- Stream processing libraries are also useful for streaming composition of -- monadic effects (like IO or State), as well. -- module Data.Conduino ( Pipe , (.|) , runPipe, runPipePure -- * Primitives , awaitEither, await, awaitWith, awaitSurely, awaitForever, yield -- * Special chaining , (&|), (|.) , fuseBoth, fuseUpstream, fuseBothMaybe -- * Incremental running , squeezePipe, squeezePipeEither , feedPipe, feedPipeEither -- * Pipe transformers , mapInput, mapOutput, mapUpRes, trimapPipe , hoistPipe , feedbackPipe -- * Wrappers , ZipSource(..) , unconsZipSource , zipSource , ZipSink(..) , zipSink, altSink -- * Generators , toListT, fromListT , pattern PipeList , withSource, genSource ) where import Control.Applicative import Control.Monad import Control.Monad.Trans.Class import Control.Monad.Trans.Free (FreeT(..), FreeF(..)) import Control.Monad.Trans.Free.Church import Control.Monad.Trans.State import Data.Bifunctor import Data.Conduino.Internal import Data.Functor import Data.Functor.Identity import Data.Sequence (Seq(..)) import Data.Void import List.Transformer (ListT(..), Step(..)) import qualified Data.Sequence as Seq import qualified List.Transformer as LT -- | Await input from upstream. Will block until upstream 'yield's. -- -- Will return 'Nothing' if the upstream pipe finishes and terminates. -- -- If the upstream pipe never terminates, then you can use 'awaitSurely' to -- guarantee a result. -- -- Will always return 'Just' if @u@ is 'Void'. await :: Pipe i o u m (Maybe i) await = either (const Nothing) Just <$> awaitEither -- | 'await', but directly chaining a continuation if the 'await' was -- succesful. -- -- The await will always be succesful if @u@ is 'Void'. -- -- This is a way of writing code in a way that is agnostic to how the -- upstream pipe terminates. awaitWith :: (i -> Pipe i o u m u) -> Pipe i o u m u awaitWith f = awaitEither >>= \case Left r -> pure r Right x -> f x -- | Await input from upstream where the upstream pipe is guaranteed to -- never terminate. -- -- A common type error will occur if @u@ (upstream pipe result type) is not -- 'Void' -- it might be @()@ or some non-'Void' type. This means that the -- upstream pipe terminates, so awaiting cannot be assured. -- -- In that case, either change your upstream pipe to be one that never -- terminates (which is most likely not possible), or use 'await' instead -- of 'awaitSurely'. awaitSurely :: Pipe i o Void m i awaitSurely = either absurd id <$> awaitEither -- | A useful utility function over repeated 'await's. Will repeatedly -- 'await' and then continue with the given pipe whenever the upstream pipe -- yields. -- -- Can be used to implement many pipe combinators: -- -- @ -- 'Data.Conduino.Combinators.map' f = 'awaitForever' $ \x -> 'yield' (f x) -- @ awaitForever :: (i -> Pipe i o u m a) -> Pipe i o u m u awaitForever = awaitForeverWith pure -- | 'awaitForever', but with a way to handle the result of the -- upstream pipe, which will be called when the upstream pipe stops -- producing. awaitForeverWith :: (u -> Pipe () o u m b) -- ^ how to handle upstream ending, transitioning to a source -> (i -> Pipe i o u m a) -- ^ how to handle upstream output -> Pipe i o u m b awaitForeverWith f g = go where go = awaitEither >>= \case Left x -> mapInput (const ()) $ f x Right x -> g x *> go -- | Run a pipe that is both a source and a sink (an "effect") into the -- effect that it represents. -- -- Usually you wouild construct this using something like: -- -- @ -- 'runPipe' $ someSource -- '.|' somePipe -- .| someOtherPipe -- .| someSink -- @ -- -- 'runPipe' will produce the result of that final sink. -- -- Some common errors you might receive: -- -- * @i@ is not @()@: If you give a pipe where the first parameter -- ("input") is not @()@, it means that your pipe is not a producer. -- Pre-compose it (using '.|') with a producer of the type you need. -- -- For example, if you have a @myPipe :: 'Pipe' 'Int' o u m a@, this is -- a pipe that is awaiting 'Int's from upstream. Pre-compose with -- a producer of 'Int's, like @'Data.Conduino.Combinators.sourceList' -- [1,2,3] '.|' myPipe@, in order to be able to run it. -- -- * @o@ is not 'Void': If you give a pipe where the second parameter -- ("output") is not 'Void', it means that your pipe is not a consumer. -- Post-compose it (using '.|') with a consumer of the type you need. -- -- For example, if you have @myPipe :: 'Pipe' i 'Int' u m a@, this is -- a pipe that is yielding 'Int's downstream that are going unhandled. -- Post-compose it a consumer of 'Int's, like @myPipe '.|' -- 'Data.Conduino.foldl' (+) 0@, in order to be able to run it. -- -- If you just want to ignore all downstream yields, post-compose with -- 'Data.Conduino.Combinators.sinkNull'. -- runPipe :: Monad m => Pipe () Void u m a -> m a runPipe = iterT go . pipeFree where go = \case PAwaitF _ f -> f () PYieldF o _ -> absurd o -- | 'runPipe' when the underlying monad is 'Identity', and so has no -- effects. runPipePure :: Pipe () Void Void Identity a -> a runPipePure = runIdentity . runPipe -- | Repeatedly run 'squeezePipe' by giving it items from an input list. -- Returns the outputs observed, and 'Left' if the input list was exhausted -- with more input expected, or 'Right' if the pipe terminated, with the -- leftover inputs and output result. -- -- @since 0.2.1.0 feedPipe :: Monad m => [i] -> Pipe i o u m a -> m ([o], Either (i -> Pipe i o u m a) ([i], a)) feedPipe xs = (fmap . second . first) (. Right) . feedPipeEither xs -- | Repeatedly run 'squeezePipeEither' by giving it items from an input -- list. Returns the outputs observed, and 'Left' if the input list was -- exhausted with more input expected (or a @u@ terminating upstream -- value), or 'Right' if the pipe terminated, with the leftover inputs and -- output result. -- -- @since 0.2.1.0 feedPipeEither :: Monad m => [i] -> Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) ([i], a)) feedPipeEither xs p = do (zs, r) <- squeezePipeEither p case r of Left n -> case xs of [] -> pure (zs, Left n) y:ys -> first (zs ++) <$> feedPipeEither ys (n (Right y)) Right z -> pure (zs, Right (xs, z)) -- | "Squeeze" a pipe by extracting all output that can be extracted -- before any input is requested. Returns a 'Left' if the pipe eventually -- does request input (as a continuation on the new input), or a 'Right' if -- the pipe terminates with a value before ever asking for input. -- -- @since 0.2.1.0 squeezePipe :: Monad m => Pipe i o u m a -> m ([o], Either (i -> Pipe i o u m a) a) squeezePipe = (fmap . second . first) (. Right) . squeezePipeEither -- | "Squeeze" a pipe by extracting all output that can be extracted before -- any input is requested. Returns a 'Left' if the pipe eventually does -- request input (as a continuation on the new input, or a terminating @u@ -- value), or a 'Right' if the pipe terminates with a value before ever -- asking for input. -- -- @since 0.2.1.0 squeezePipeEither :: Monad m => Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) a) squeezePipeEither p = runFT (pipeFree p) (pure . ([],) . Right) (\pNext -> \case PAwaitF f g -> pure . ([],) . Left $ (unSqueeze =<<) . lift . pNext . either f g PYieldF o x -> first (o:) <$> pNext x ) where unSqueeze (os, nxt) = do mapM_ yield os case nxt of Left f -> f =<< awaitEither Right a -> pure a -- | The main operator for chaining pipes together. @pipe1 .| pipe2@ will -- connect the output of @pipe1@ to the input of @pipe2@. -- -- "Running" a pipe will draw from @pipe2@, and if @pipe2@ ever asks for -- input (with 'await' or something similar), it will block until @pipe1@ -- outputs something (or signals termination). -- -- The structure of a full pipeline usually looks like: -- -- @ -- 'runPipe' $ someSource -- '.|' somePipe -- .| someOtherPipe -- .| someSink -- @ -- -- Where you route a source into a series of pipes, which eventually ends -- up at a sink. 'runPipe' will then produce the result of that sink. (.|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r Pipe p .| Pipe q = Pipe $ toFT $ compPipe_ (fromFT p) (fromFT q) infixr 2 .| compPipe_ :: forall a b c u v m r. (Monad m) => RecPipe a b u m v -> RecPipe b c v m r -> RecPipe a c u m r compPipe_ p q = FreeT $ runFreeT q >>= \qq -> case qq of Pure x -> pure . Pure $ x Free (PAwaitF f g) -> runFreeT p >>= \pp -> case pp of Pure x' -> runFreeT $ compPipe_ (FreeT (pure pp)) (f x') Free (PAwaitF f' g') -> pure . Free $ PAwaitF ((`compPipe_` FreeT (pure qq)) . f') ((`compPipe_` FreeT (pure qq)) . g') Free (PYieldF x' y') -> runFreeT $ compPipe_ y' (g x') Free (PYieldF x y) -> pure . Free $ PYieldF x (compPipe_ p y) -- | Useful prefix version of '&|'. -- -- @since 0.2.1.0 fuseBoth :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r) fuseBoth p q = p .| (q >>= exhaust) where exhaust x = go where go = awaitEither >>= \case Left y -> pure (y, x) Right _ -> go -- | Like 'fuseBoth' and '&|', except does not wait for the upstream pipe -- to terminate. Return 'Nothing' in the first field if the upstream pipe hasn't terminated, -- and 'Just' if it has, with the terminating value. -- -- @since 0.2.1.0 fuseBothMaybe :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (Maybe v, r) fuseBothMaybe p q = p .| (q >>= check) where check x = (,x) . either Just (const Nothing) <$> awaitEither -- | Useful prefix version of '|.'. -- -- @since 0.2.1.0 fuseUpstream :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v fuseUpstream p q = fst <$> fuseBoth p q -- | Like @.|@, but get the result of /both/ pipes on termination, instead -- of just the second. This means that @p &| q@ will only terminate with a result when -- /both/ @p@ and @q@ terminate. (Typically, @p .| q@ would terminate as soon as -- @q@ terminates.) -- -- @since 0.2.1.0 (&|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r) (&|) = fuseBoth -- | Like @.|@, but keep the result of the /first/ pipe, instead of the -- second. This means that @p |. q@ will only terminate with a result when -- /both/ @p@ and @q@ terminate. (Typically, @p .| q@ would terminate as soon as -- @q@ terminates.) -- -- @since 0.2.1.0 (|.) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v (|.) = fuseUpstream infixr 2 &| infixr 2 |. -- | Loop a pipe into itself. -- -- * Will feed all output back to the input -- * Will only ask for input upstream if output is stalled. -- * Yields all outputted values downstream, effectively duplicating them. -- -- @since 0.2.1.0 feedbackPipe :: Monad m => Pipe x x u m a -> Pipe x x u m a feedbackPipe p = fmap fst . runStateP Seq.empty $ popper .| hoistPipe lift p .| awaitForever (\x -> lift (modify (:|> x)) *> yield x) where popper = lift get >>= \case Empty -> awaitEither >>= \case Left r -> pure r Right x -> yield x >> popper x :<| xs -> do lift $ put xs yield x popper -- | A newtype wrapper over a source (@'Pipe' () o 'Void'@) that gives it an -- alternative 'Applicative' and 'Alternative' instance, matching "ListT -- done right". -- -- '<*>' will pair up each output that the sources produce: if you 'await' -- a value from downstream, it will wait until both paired sources yield -- before passing them on together. -- -- '<|>' will completely exhaust the first source before moving on to the -- next source. -- -- 'ZipSource' is effectively equivalent to "ListT done right", the true -- List Monad transformer. '<|>' is concatentation. You can use this type -- with 'lift' to lift a yielding action and '<|>' to sequence yields to -- implement the pattern described in -- , -- where you can write streaming producers in a polymorphic way, and have -- it run with pipes, conduit, etc. -- -- The main difference is that its 'Applicative' instance ("zipping") is -- different from the traditional 'Applicative' instance for 'ListT' -- ("all combinations"). Effectively this becomes like a "zipping" -- 'Applicative' instance for 'ListT'. -- -- If you want a 'Monad' (or 'Control.Monad.IO.Class.MonadIO') instance, -- use 'ListT' instead, and convert using 'toListT'/'fromListT' or the -- 'PipeList' pattern/constructor. newtype ZipSource m a = ZipSource { getZipSource :: Pipe () a Void m () } -- | A source is equivalent to a 'ListT' producing a 'Maybe'; this pattern -- synonym lets you treat it as such. It essentialyl wraps over 'toListT' -- and 'fromListT'. pattern PipeList :: Monad m => ListT m (Maybe a) -> Pipe () a u m () pattern PipeList xs <- (toListT->xs) where PipeList xs = fromListT xs {-# COMPLETE PipeList #-} instance Functor (ZipSource m) where fmap f = ZipSource . mapOutput f . getZipSource instance Monad m => Applicative (ZipSource m) where pure = ZipSource . yield ZipSource p <*> ZipSource q = ZipSource $ zipSource p q -- | Takes two sources and runs them in parallel, collating their outputs. -- -- @since 0.2.1.0 zipSource :: Monad m => Pipe () (a -> b) u m () -> Pipe () a v m () -> Pipe () b w m () zipSource (PipeList fs) (PipeList xs) = PipeList . fmap Just $ uncurry ($) <$> LT.zip (concatListT fs) (concatListT xs) concatListT :: Monad m => ListT m (Maybe a) -> ListT m a concatListT xs = ListT $ next xs >>= \case Nil -> pure Nil Cons Nothing ys -> next (concatListT ys) Cons (Just y) ys -> pure $ Cons y (concatListT ys) instance Monad m => Alternative (ZipSource m) where empty = ZipSource $ pure () ZipSource p <|> ZipSource q = ZipSource (p *> q) instance MonadTrans ZipSource where lift = ZipSource . (yield =<<) . lift -- | A source is essentially equivalent to 'ListT' producing a 'Maybe' -- result. This converts it to the 'ListT' it encodes. -- -- See 'ZipSource' for a wrapper over 'Pipe' that gives the right 'Functor' -- and 'Alternative' instances. toListT :: Applicative m => Pipe () o u m () -> ListT m (Maybe o) toListT p = ListT $ runFT (pipeFree p) (\_ -> pure Nil) (\pNext -> \case PAwaitF _ g -> pure $ Cons Nothing (ListT . pNext $ g ()) PYieldF x y -> pure $ Cons (Just x) (ListT . pNext $ y ) ) -- | A source is essentially 'ListT' producing a 'Maybe' result. This -- converts a 'ListT' to the source it encodes. -- -- See 'ZipSource' for a wrapper over 'Pipe' that gives the right 'Functor' -- and 'Alternative' instances. fromListT :: Monad m => ListT m (Maybe o) -> Pipe i o u m () fromListT xs = lift (next xs) >>= \case Nil -> pure () Cons Nothing ys -> fromListT ys Cons (Just y) ys -> yield y *> fromListT ys -- | Given a "generator" of @o@ in @m@, return a /source/ that that -- generator encodes. Is the inverse of 'withSource'. -- -- The generator is essentially a church-encoded 'ListT'. genSource :: (forall r. (Maybe (o, m r) -> m r) -> m r) -> Pipe i o u m () genSource f = Pipe $ FT $ \pDone pFree -> f $ \case Nothing -> pDone () Just (x, xs) -> pFree id (PYieldF x xs) -- | A source can be "run" by providing a continuation to handle and -- sequence each of its outputs. Is ths inverse of 'genSource'. -- -- This essentially turns a pipe into a church-encoded 'ListT'. withSource :: Pipe () o u m () -> (Maybe (o, m r) -> m r) -- ^ handler ('Nothing' = done, @'Just' (x, next)@ = yielded value and next action -> m r withSource p f = runFT (pipeFree p) (\_ -> f Nothing) (\pNext -> \case PAwaitF _ g -> pNext $ g () PYieldF x y -> f (Just (x, pNext y)) ) -- | 'ZipSource' is effectively 'ListT' returning a 'Maybe'. As such, you -- can use 'unconsZipSource' to "peel off" the first yielded item, if it -- exists, and return the "rest of the list". unconsZipSource :: Monad m => ZipSource m a -> m (Maybe (Maybe a, ZipSource m a)) unconsZipSource (ZipSource (PipeList p)) = next p <&> \case Cons x xs -> Just (x, ZipSource (PipeList xs)) Nil -> Nothing -- | A newtype wrapper over a sink (@'Pipe' i 'Void'@) that gives it an -- alternative 'Applicative' and 'Alternative' instance. -- -- '<*>' will distribute input over both sinks, and output a final result -- once both sinks finish. -- -- '<|>' will distribute input over both sinks, and output a final result -- as soon as one or the other finishes. newtype ZipSink i u m a = ZipSink { getZipSink :: Pipe i Void u m a } deriving Functor zipSink_ :: Monad m => RecPipe i Void u m (a -> b) -> RecPipe i Void u m a -> RecPipe i Void u m b zipSink_ p q = FreeT $ runFreeT p >>= \pp -> case pp of Pure x -> runFreeT q >>= \case Pure x' -> pure . Pure $ x x' Free (PAwaitF f' g') -> pure . Free $ PAwaitF (zipSink_ (FreeT (pure pp)) . f') (zipSink_ (FreeT (pure pp)) . g') Free (PYieldF x' _ ) -> absurd x' Free (PAwaitF f g) -> runFreeT q >>= \qq -> case qq of Pure _ -> pure . Free $ PAwaitF ((`zipSink_` FreeT (pure qq)) . f) ((`zipSink_` FreeT (pure qq)) . g) Free (PAwaitF f' g') -> pure . Free $ PAwaitF (zipSink_ <$> f <*> f') (zipSink_ <$> g <*> g') Free (PYieldF x' _ ) -> absurd x' Free (PYieldF x _) -> absurd x altSink_ :: Monad m => RecPipe i Void u m a -> RecPipe i Void u m a -> RecPipe i Void u m a altSink_ p q = FreeT $ runFreeT p >>= \case Pure x -> pure . Pure $ x Free (PAwaitF f g) -> runFreeT q <&> \case Pure x' -> Pure x' Free (PAwaitF f' g') -> Free $ PAwaitF (altSink_ <$> f <*> f') (altSink_ <$> g <*> g') Free (PYieldF x' _ ) -> absurd x' Free (PYieldF x _) -> absurd x -- | Distribute input to both sinks, and finishes with the final result -- once both finish. -- -- Forms an identity with 'pure'. zipSink :: Monad m => Pipe i Void u m (a -> b) -> Pipe i Void u m a -> Pipe i Void u m b zipSink (Pipe p) (Pipe q) = Pipe $ toFT $ zipSink_ (fromFT p) (fromFT q) -- | Distribute input to both sinks, and finishes with the result of -- the one that finishes first. altSink :: Monad m => Pipe i Void u m a -> Pipe i Void u m a -> Pipe i Void u m a altSink (Pipe p) (Pipe q) = Pipe $ toFT $ altSink_ (fromFT p) (fromFT q) -- | '<*>' = distribute input to all, and return result when they finish -- -- 'pure' = immediately finish instance Monad m => Applicative (ZipSink i u m) where pure = ZipSink . pure ZipSink p <*> ZipSink q = ZipSink $ zipSink p q -- | '<|>' = distribute input to all, and return the first result that -- finishes -- -- 'empty' = never finish instance Monad m => Alternative (ZipSink i u m) where empty = ZipSink go where go = forever await ZipSink p <|> ZipSink q = ZipSink $ altSink p q instance MonadTrans (ZipSink i u) where lift = ZipSink . lift