{-# LANGUAGE DeriveFunctor              #-}
{-# LANGUAGE FlexibleContexts           #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase                 #-}
{-# LANGUAGE PatternSynonyms            #-}
{-# LANGUAGE RankNTypes                 #-}
{-# LANGUAGE ScopedTypeVariables        #-}
{-# LANGUAGE TupleSections              #-}
{-# LANGUAGE TypeInType                 #-}
{-# LANGUAGE ViewPatterns               #-}

-- |
-- Module      : Data.Conduino
-- Copyright   : (c) Justin Le 2019
-- License     : BSD3
--
-- Maintainer  : justin@jle.im
-- Stability   : experimental
-- Portability : non-portable
--
-- Base API for 'Pipe'.  See documentation for 'Pipe', '.|', and 'runPipe'
-- for information on usage.
--
-- A "prelude" of useful pipes can be found in "Data.Conduino.Combinators".
--
-- == Why a stream processing library?
--
-- A stream processing library is a way to stream processors in a /composable/ way:
-- instead of defining your entire stream processing function as a single
-- recursive loop with some global state, instead think about each "stage" of the process,
-- and isolate each state to its own segment.  Each component can contain its own
-- isolated state:
--
-- >>> runPipePure $ sourceList [1..10]
--       .| scan (+) 0
--       .| sinkList
-- [1,3,6,10,15,21,28,36,45,55]
--
-- All of these components have internal "state":
--
-- *   @sourceList@ keeps track of "which" item in the list to yield next
-- *   @scan@ keeps track of the current running sum
-- *   @sinkList@ keeps track of all items that have been seen so far, as a list
--
-- They all work together without knowing any other component's internal state, so
-- you can write your total streaming function without concerning yourself, at
-- each stage, with the entire part.
--
-- In addition, there are useful functions to "combine" stream processors:
--
-- *   'zipSink' combines sinks in an "and" sort of way: combine two sinks in
--     parallel and finish when all finish.
-- *   'altSink' combines sinks in an "or" sort of way: combine two sinks in
--     parallel and finish when any of them finish
-- *   'zipSource' combines sources in parallel and collate their outputs.
--
-- Stream processing libraries are also useful for streaming composition of
-- monadic effects (like IO or State), as well.
--
module Data.Conduino (
    Pipe
  , (.|)
  , runPipe, runPipePure
  -- * Primitives
  , awaitEither, await, awaitWith, awaitSurely, awaitForever
  , yield, yieldLazy
  -- * Special chaining
  , (&|), (|.)
  , fuseBoth, fuseUpstream, fuseBothMaybe
  -- * Incremental running
  , squeezePipe, squeezePipeEither
  , feedPipe, feedPipeEither
  -- * Pipe transformers
  , mapInput, mapOutput, mapUpRes, trimapPipe
  , passthrough
  , hoistPipe
  , feedbackPipe, feedbackPipeEither
  -- * Wrappers
  , ZipSource(..)
  , unconsZipSource
  , zipSource
  , ZipSink(..)
  , zipSink, altSink
  -- * Generators
  , toListT, fromListT
  , pattern PipeList
  , withSource, genSource
  ) where

import           Control.Applicative
import           Control.Monad
import           Control.Monad.Trans.Class
import           Control.Monad.Trans.Free         (FreeT(..), FreeF(..))
import           Control.Monad.Trans.Free.Church
import           Control.Monad.Trans.State
import           Data.Bifunctor
import           Data.Conduino.Internal
import           Data.Functor
import           Data.Functor.Identity
import           Data.Sequence                    (Seq(..))
import           Data.Void
import           List.Transformer                 (ListT(..), Step(..))
import qualified Control.Monad.Trans.State.Strict as SS
import qualified Data.Sequence                    as Seq
import qualified List.Transformer                 as LT

-- | Await input from upstream.  Will block until upstream 'yield's.
--
-- Will return 'Nothing' if the upstream pipe finishes and terminates.
--
-- If the upstream pipe never terminates, then you can use 'awaitSurely' to
-- guarantee a result.
--
-- Will always return 'Just' if @u@ is 'Void'.
await :: Pipe i o u m (Maybe i)
await :: forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await = forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either (forall a b. a -> b -> a
const forall a. Maybe a
Nothing) forall a. a -> Maybe a
Just forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall i o u (m :: * -> *). Pipe i o u m (Either u i)
awaitEither
{-# INLINE await #-}

-- | 'await', but directly chaining a continuation if the 'await' was
-- succesful.
--
-- The await will always be succesful if @u@ is 'Void'.
--
-- This is a way of writing code in a way that is agnostic to how the
-- upstream pipe terminates.
awaitWith :: (i -> Pipe i o u m u) -> Pipe i o u m u
awaitWith :: forall i o u (m :: * -> *). (i -> Pipe i o u m u) -> Pipe i o u m u
awaitWith i -> Pipe i o u m u
f = forall i o u (m :: * -> *). Pipe i o u m (Either u i)
awaitEither forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Left  u
r -> forall (f :: * -> *) a. Applicative f => a -> f a
pure u
r
    Right i
x -> i -> Pipe i o u m u
f i
x
{-# INLINE awaitWith #-}

-- | Await input from upstream where the upstream pipe is guaranteed to
-- never terminate.
--
-- A common type error will occur if @u@ (upstream pipe result type) is not
-- 'Void' -- it might be @()@ or some non-'Void' type.  This means that the
-- upstream pipe terminates, so awaiting cannot be assured.
--
-- In that case, either change your upstream pipe to be one that never
-- terminates (which is most likely not possible), or use 'await' instead
-- of 'awaitSurely'.
awaitSurely :: Pipe i o Void m i
awaitSurely :: forall i o (m :: * -> *). Pipe i o Void m i
awaitSurely = forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. Void -> a
absurd forall a. a -> a
id forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall i o u (m :: * -> *). Pipe i o u m (Either u i)
awaitEither
{-# INLINE awaitSurely #-}

-- | A useful utility function over repeated 'await's.  Will repeatedly
-- 'await' and then continue with the given pipe whenever the upstream pipe
-- yields.
--
-- Can be used to implement many pipe combinators:
--
-- @
-- 'Data.Conduino.Combinators.map' f = 'awaitForever' $ \x -> 'yield' (f x)
-- @
awaitForever :: (i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever :: forall i o u (m :: * -> *) a.
(i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever = forall u o (m :: * -> *) b i a.
(u -> Pipe () o u m b) -> (i -> Pipe i o u m a) -> Pipe i o u m b
awaitForeverWith forall (f :: * -> *) a. Applicative f => a -> f a
pure
{-# INLINE awaitForever #-}

-- | 'awaitForever', but with a way to handle the result of the
-- upstream pipe, which will be called when the upstream pipe stops
-- producing.
awaitForeverWith
    :: (u -> Pipe () o u m b)       -- ^ how to handle upstream ending, transitioning to a source
    -> (i -> Pipe i o u m a)        -- ^ how to handle upstream output
    -> Pipe i o u m b
awaitForeverWith :: forall u o (m :: * -> *) b i a.
(u -> Pipe () o u m b) -> (i -> Pipe i o u m a) -> Pipe i o u m b
awaitForeverWith u -> Pipe () o u m b
f i -> Pipe i o u m a
g = Pipe i o u m b
go
  where
    go :: Pipe i o u m b
go = forall i o u (m :: * -> *). Pipe i o u m (Either u i)
awaitEither forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Left u
x  -> forall i j o u (m :: * -> *) a.
(i -> j) -> Pipe j o u m a -> Pipe i o u m a
mapInput (forall a b. a -> b -> a
const ()) forall a b. (a -> b) -> a -> b
$ u -> Pipe () o u m b
f u
x
      Right i
x -> i -> Pipe i o u m a
g i
x forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Pipe i o u m b
go
{-# INLINE awaitForeverWith #-}

-- | Run a pipe that is both a source and a sink (an "effect") into the
-- effect that it represents.
--
-- Usually you wouild construct this using something like:
--
-- @
-- 'runPipe' $ someSource
--        '.|' somePipe
--        .| someOtherPipe
--        .| someSink
-- @
--
-- 'runPipe' will produce the result of that final sink.
--
-- Some common errors you might receive:
--
-- *  @i@ is not @()@: If you give a pipe where the first parameter
--    ("input") is not @()@, it means that your pipe is not a producer.
--    Pre-compose it (using '.|') with a producer of the type you need.
--
--    For example, if you have a @myPipe :: 'Pipe' 'Int' o u m a@, this is
--    a pipe that is awaiting 'Int's from upstream.  Pre-compose with
--    a producer of 'Int's, like @'Data.Conduino.Combinators.sourceList'
--    [1,2,3] '.|' myPipe@, in order to be able to run it.
--
-- *  @o@ is not 'Void': If you give a pipe where the second parameter
--    ("output") is not 'Void', it means that your pipe is not a consumer.
--    Post-compose it (using '.|') with a consumer of the type you need.
--
--    For example, if you have @myPipe :: 'Pipe' i 'Int' u m a@, this is
--    a pipe that is yielding 'Int's downstream that are going unhandled.
--    Post-compose it a consumer of 'Int's, like @myPipe '.|'
--    'Data.Conduino.foldl' (+) 0@, in order to be able to run it.
--
--    If you just want to ignore all downstream yields, post-compose with
--    'Data.Conduino.Combinators.sinkNull'.
--
runPipe :: Monad m => Pipe () Void u m a -> m a
runPipe :: forall (m :: * -> *) u a. Monad m => Pipe () Void u m a -> m a
runPipe = forall (f :: * -> *) (m :: * -> *) a.
(Functor f, Monad m) =>
(f (m a) -> m a) -> FT f m a -> m a
iterT forall {u} {a}. PipeF () Void u a -> a
go forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall i o u (m :: * -> *) a.
Pipe i o u m a -> FT (PipeF i o u) m a
pipeFree
  where
    go :: PipeF () Void u a -> a
go = \case
      PAwaitF u -> a
_ () -> a
f -> () -> a
f ()
      PYieldF Void
o a
_ -> forall a. Void -> a
absurd Void
o

-- | 'runPipe' when the underlying monad is 'Identity', and so has no
-- effects.
runPipePure :: Pipe () Void Void Identity a -> a
runPipePure :: forall a. Pipe () Void Void Identity a -> a
runPipePure = forall a. Identity a -> a
runIdentity forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) u a. Monad m => Pipe () Void u m a -> m a
runPipe
{-# INLINE runPipePure #-}

-- | Repeatedly run 'squeezePipe' by giving it items from an input list.
-- Returns the outputs observed, and 'Left' if the input list was exhausted
-- with more input expected, or 'Right' if the pipe terminated, with the
-- leftover inputs and output result.
--
-- @since 0.2.1.0
feedPipe
    :: Monad m
    => [i]                      -- ^ input to feed in
    -> Pipe i o u m a
    -> m ([o], Either (i -> Pipe i o u m a) ([i], a))
feedPipe :: forall (m :: * -> *) i o u a.
Monad m =>
[i]
-> Pipe i o u m a -> m ([o], Either (i -> Pipe i o u m a) ([i], a))
feedPipe [i]
xs = (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (p :: * -> * -> *) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first) (forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right)
            forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) i o u a.
Monad m =>
[i]
-> Pipe i o u m a
-> m ([o], Either (Either u i -> Pipe i o u m a) ([i], a))
feedPipeEither [i]
xs
{-# INLINE feedPipe #-}

-- | Repeatedly run 'squeezePipeEither' by giving it items from an input
-- list.  Returns the outputs observed, and 'Left' if the input list was
-- exhausted with more input expected (or a @u@ terminating upstream
-- value), or 'Right' if the pipe terminated, with the leftover inputs and
-- output result.
--
-- @since 0.2.1.0
feedPipeEither
    :: Monad m
    => [i]                      -- ^ input to feed in
    -> Pipe i o u m a
    -> m ([o], Either (Either u i -> Pipe i o u m a) ([i], a))
feedPipeEither :: forall (m :: * -> *) i o u a.
Monad m =>
[i]
-> Pipe i o u m a
-> m ([o], Either (Either u i -> Pipe i o u m a) ([i], a))
feedPipeEither [i]
xs Pipe i o u m a
p = do
    ([o]
zs, Either (Either u i -> Pipe i o u m a) a
r) <- forall (m :: * -> *) i o u a.
Monad m =>
Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) a)
squeezePipeEither Pipe i o u m a
p
    case Either (Either u i -> Pipe i o u m a) a
r of
      Left Either u i -> Pipe i o u m a
n -> case [i]
xs of
        []   -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ([o]
zs, forall a b. a -> Either a b
Left Either u i -> Pipe i o u m a
n)
        i
y:[i]
ys -> forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first ([o]
zs forall a. [a] -> [a] -> [a]
++) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) i o u a.
Monad m =>
[i]
-> Pipe i o u m a
-> m ([o], Either (Either u i -> Pipe i o u m a) ([i], a))
feedPipeEither [i]
ys (Either u i -> Pipe i o u m a
n (forall a b. b -> Either a b
Right i
y))
      Right a
z -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ([o]
zs, forall a b. b -> Either a b
Right ([i]
xs, a
z))
{-# INLINE feedPipeEither #-}

-- | "Squeeze" a pipe by extracting all output that can be extracted
-- before any input is requested.  Returns a 'Left' if the pipe eventually
-- does request input (as a continuation on the new input), or a 'Right' if
-- the pipe terminates with a value before ever asking for input.
--
-- @since 0.2.1.0
squeezePipe
    :: Monad m
    => Pipe i o u m a
    -> m ([o], Either (i -> Pipe i o u m a) a)
squeezePipe :: forall (m :: * -> *) i o u a.
Monad m =>
Pipe i o u m a -> m ([o], Either (i -> Pipe i o u m a) a)
squeezePipe = (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (p :: * -> * -> *) b c a.
Bifunctor p =>
(b -> c) -> p a b -> p a c
second forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first) (forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right)
            forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) i o u a.
Monad m =>
Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) a)
squeezePipeEither
{-# INLINE squeezePipe #-}

-- | "Squeeze" a pipe by extracting all output that can be extracted before
-- any input is requested.  Returns a 'Left' if the pipe eventually does
-- request input (as a continuation on the new input, or a terminating @u@
-- value), or a 'Right' if the pipe terminates with a value before ever
-- asking for input.
--
-- @since 0.2.1.0
squeezePipeEither
    :: Monad m
    => Pipe i o u m a
    -> m ([o], Either (Either u i -> Pipe i o u m a) a)
squeezePipeEither :: forall (m :: * -> *) i o u a.
Monad m =>
Pipe i o u m a -> m ([o], Either (Either u i -> Pipe i o u m a) a)
squeezePipeEither (Pipe (FT forall r.
(a -> m r) -> (forall x. (x -> m r) -> PipeF i o u x -> m r) -> m r
p)) = forall r.
(a -> m r) -> (forall x. (x -> m r) -> PipeF i o u x -> m r) -> m r
p
    (forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([],) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. b -> Either a b
Right)
    (\x -> m ([o], Either (Either u i -> Pipe i o u m a) a)
pNext -> \case
        PAwaitF u -> x
f i -> x
g -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([],) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a b. a -> Either a b
Left forall a b. (a -> b) -> a -> b
$ (forall {t :: * -> *} {a} {u} {i} {m :: * -> *} {b}.
Foldable t =>
(t a, Either (Either u i -> Pipe i a u m b) b) -> Pipe i a u m b
unSqueeze forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> m ([o], Either (Either u i -> Pipe i o u m a) a)
pNext forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either u -> x
f i -> x
g
        PYieldF o
o x
x -> forall (p :: * -> * -> *) a b c.
Bifunctor p =>
(a -> b) -> p a c -> p b c
first (o
oforall a. a -> [a] -> [a]
:) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> x -> m ([o], Either (Either u i -> Pipe i o u m a) a)
pNext x
x
    )
  where
    unSqueeze :: (t a, Either (Either u i -> Pipe i a u m b) b) -> Pipe i a u m b
unSqueeze (t a
os, Either (Either u i -> Pipe i a u m b) b
nxt) = do
      forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield t a
os
      case Either (Either u i -> Pipe i a u m b) b
nxt of
        Left Either u i -> Pipe i a u m b
f  -> Either u i -> Pipe i a u m b
f forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall i o u (m :: * -> *). Pipe i o u m (Either u i)
awaitEither
        Right b
a -> forall (f :: * -> *) a. Applicative f => a -> f a
pure b
a
{-# INLINE squeezePipeEither #-}

-- | The main operator for chaining pipes together.  @pipe1 .| pipe2@ will
-- connect the output of @pipe1@ to the input of @pipe2@.
--
-- "Running" a pipe will draw from @pipe2@, and if @pipe2@ ever asks for
-- input (with 'await' or something similar), it will block until @pipe1@
-- outputs something (or signals termination).
--
-- The structure of a full pipeline usually looks like:
--
-- @
-- 'runPipe' $ someSource
--        '.|' somePipe
--        .| someOtherPipe
--        .| someSink
-- @
--
-- Where you route a source into a series of pipes, which eventually ends
-- up at a sink.  'runPipe' will then produce the result of that sink.
(.|)
    :: Monad m
    => Pipe a b u m v
    -> Pipe b c v m r
    -> Pipe a c u m r
Pipe a b u m v
p .| :: forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r
.| Pipe b c v m r
q = forall (m :: * -> *) i o u a.
Monad m =>
RecPipe i o u m a -> Pipe i o u m a
fromRecPipe forall a b. (a -> b) -> a -> b
$ forall a b c u v (m :: * -> *) r.
Monad m =>
RecPipe a b u m v -> RecPipe b c v m r -> RecPipe a c u m r
compPipe_ (forall (m :: * -> *) i o u a.
Monad m =>
Pipe i o u m a -> RecPipe i o u m a
toRecPipe Pipe a b u m v
p) (forall (m :: * -> *) i o u a.
Monad m =>
Pipe i o u m a -> RecPipe i o u m a
toRecPipe Pipe b c v m r
q)
infixr 2 .|
{-# INLINE (.|) #-}

compPipe_
    :: forall a b c u v m r. (Monad m)
    => RecPipe a b u m v
    -> RecPipe b c v m r
    -> RecPipe a c u m r
compPipe_ :: forall a b c u v (m :: * -> *) r.
Monad m =>
RecPipe a b u m v -> RecPipe b c v m r -> RecPipe a c u m r
compPipe_ RecPipe a b u m v
p RecPipe b c v m r
q = forall (f :: * -> *) (m :: * -> *) a.
m (FreeF f a (FreeT f m a)) -> FreeT f m a
FreeT forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) (m :: * -> *) a.
FreeT f m a -> m (FreeF f a (FreeT f m a))
runFreeT RecPipe b c v m r
q forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \FreeF (PipeF b c v) r (RecPipe b c v m r)
qq -> case FreeF (PipeF b c v) r (RecPipe b c v m r)
qq of
    Pure r
x             -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. a -> FreeF f a b
Pure forall a b. (a -> b) -> a -> b
$ r
x
    Free (PAwaitF v -> RecPipe b c v m r
f b -> RecPipe b c v m r
g) -> forall (f :: * -> *) (m :: * -> *) a.
FreeT f m a -> m (FreeF f a (FreeT f m a))
runFreeT RecPipe a b u m v
p forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \FreeF (PipeF a b u) v (RecPipe a b u m v)
pp -> case FreeF (PipeF a b u) v (RecPipe a b u m v)
pp of
      Pure v
x'              -> forall (f :: * -> *) (m :: * -> *) a.
FreeT f m a -> m (FreeF f a (FreeT f m a))
runFreeT forall a b. (a -> b) -> a -> b
$ forall a b c u v (m :: * -> *) r.
Monad m =>
RecPipe a b u m v -> RecPipe b c v m r -> RecPipe a c u m r
compPipe_ (forall (f :: * -> *) (m :: * -> *) a.
m (FreeF f a (FreeT f m a)) -> FreeT f m a
FreeT (forall (f :: * -> *) a. Applicative f => a -> f a
pure FreeF (PipeF a b u) v (RecPipe a b u m v)
pp)) (v -> RecPipe b c v m r
f v
x')
      Free (PAwaitF u -> RecPipe a b u m v
f' a -> RecPipe a b u m v
g') -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. f b -> FreeF f a b
Free forall a b. (a -> b) -> a -> b
$ forall i o u a. (u -> a) -> (i -> a) -> PipeF i o u a
PAwaitF ((forall a b c u v (m :: * -> *) r.
Monad m =>
RecPipe a b u m v -> RecPipe b c v m r -> RecPipe a c u m r
`compPipe_` forall (f :: * -> *) (m :: * -> *) a.
m (FreeF f a (FreeT f m a)) -> FreeT f m a
FreeT (forall (f :: * -> *) a. Applicative f => a -> f a
pure FreeF (PipeF b c v) r (RecPipe b c v m r)
qq)) forall b c a. (b -> c) -> (a -> b) -> a -> c
. u -> RecPipe a b u m v
f')
                                                    ((forall a b c u v (m :: * -> *) r.
Monad m =>
RecPipe a b u m v -> RecPipe b c v m r -> RecPipe a c u m r
`compPipe_` forall (f :: * -> *) (m :: * -> *) a.
m (FreeF f a (FreeT f m a)) -> FreeT f m a
FreeT (forall (f :: * -> *) a. Applicative f => a -> f a
pure FreeF (PipeF b c v) r (RecPipe b c v m r)
qq)) forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> RecPipe a b u m v
g')
      Free (PYieldF b
x' RecPipe a b u m v
y') -> forall (f :: * -> *) (m :: * -> *) a.
FreeT f m a -> m (FreeF f a (FreeT f m a))
runFreeT forall a b. (a -> b) -> a -> b
$ forall a b c u v (m :: * -> *) r.
Monad m =>
RecPipe a b u m v -> RecPipe b c v m r -> RecPipe a c u m r
compPipe_ RecPipe a b u m v
y' (b -> RecPipe b c v m r
g b
x')
    Free (PYieldF c
x RecPipe b c v m r
y) -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. f b -> FreeF f a b
Free forall a b. (a -> b) -> a -> b
$ forall i o u a. o -> a -> PipeF i o u a
PYieldF c
x (forall a b c u v (m :: * -> *) r.
Monad m =>
RecPipe a b u m v -> RecPipe b c v m r -> RecPipe a c u m r
compPipe_ RecPipe a b u m v
p RecPipe b c v m r
y)

-- | Useful prefix version of '&|'.
--
-- @since 0.2.1.0
fuseBoth
    :: Monad m
    => Pipe a b u m v
    -> Pipe b c v m r
    -> Pipe a c u m (v, r)
fuseBoth :: forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r)
fuseBoth Pipe a b u m v
p Pipe b c v m r
q = Pipe a b u m v
p
            forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r
.| (Pipe b c v m r
q forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall {p} {i} {o} {a} {m :: * -> *}. p -> Pipe i o a m (a, p)
exhaust)
  where
    exhaust :: p -> Pipe i o a m (a, p)
exhaust p
x = forall {i} {o} {a} {m :: * -> *}. Pipe i o a m (a, p)
go
      where
        go :: Pipe i o a m (a, p)
go = forall i o u (m :: * -> *). Pipe i o u m (Either u i)
awaitEither forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
          Left  a
y -> forall (f :: * -> *) a. Applicative f => a -> f a
pure (a
y, p
x)
          Right i
_ -> Pipe i o a m (a, p)
go
{-# INLINE fuseBoth #-}

-- | Like 'fuseBoth' and '&|', except does not wait for the upstream pipe
-- to terminate.  Return 'Nothing' in the first field if the upstream pipe hasn't terminated,
-- and 'Just' if it has, with the terminating value.
--
-- @since 0.2.1.0
fuseBothMaybe :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (Maybe v, r)
fuseBothMaybe :: forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (Maybe v, r)
fuseBothMaybe Pipe a b u m v
p Pipe b c v m r
q = Pipe a b u m v
p
                 forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r
.| (Pipe b c v m r
q forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall {t} {b} {o} {a} {m :: * -> *}.
t -> Pipe b o a m (Maybe a, t)
check)
  where
    check :: t -> Pipe b o a m (Maybe a, t)
check t
x = (,t
x) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> Maybe a
Just (forall a b. a -> b -> a
const forall a. Maybe a
Nothing) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall i o u (m :: * -> *). Pipe i o u m (Either u i)
awaitEither
{-# INLINE fuseBothMaybe #-}

-- | Useful prefix version of '|.'.
--
-- @since 0.2.1.0
fuseUpstream
    :: Monad m
    => Pipe a b u m v
    -> Pipe b c v m r
    -> Pipe a c u m v
fuseUpstream :: forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v
fuseUpstream Pipe a b u m v
p Pipe b c v m r
q = forall a b. (a, b) -> a
fst forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r)
fuseBoth Pipe a b u m v
p Pipe b c v m r
q
{-# INLINE fuseUpstream #-}

-- | Like @.|@, but get the result of /both/ pipes on termination, instead
-- of just the second.  This means that @p &| q@ will only terminate with a result when
-- /both/ @p@ and @q@ terminate.  (Typically, @p .| q@ would terminate as soon as
-- @q@ terminates.)
--
-- @since 0.2.1.0
(&|) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r)
&| :: forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r)
(&|) = forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m (v, r)
fuseBoth
{-# INLINE (&|) #-}

-- | Like @.|@, but keep the result of the /first/ pipe, instead of the
-- second.  This means that @p |. q@ will only terminate with a result when
-- /both/ @p@ and @q@ terminate.  (Typically, @p .| q@ would terminate as soon as
-- @q@ terminates.)
--
-- @since 0.2.1.0
(|.) :: Monad m => Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v
|. :: forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v
(|.) = forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m v
fuseUpstream
{-# INLINE (|.) #-}

infixr 2 &|
infixr 2 |.

-- | Passthrough and pair each output with the /last/ input that triggered
-- it.  'Nothing' will occur initially if the pipe outputs anything without
-- consuming any values, but after the first 'Just', should only output
-- Justs forever.
--
-- @since 0.2.3.0
passthrough
    :: Monad m
    => Pipe i o u m a
    -> Pipe i (Maybe i, o) u m a
passthrough :: forall (m :: * -> *) i o u a.
Monad m =>
Pipe i o u m a -> Pipe i (Maybe i, o) u m a
passthrough Pipe i o u m a
p = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> a
fst forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) s i o u a.
Monad m =>
s -> Pipe i o u (StateT s m) a -> Pipe i o u m (a, s)
runStatePS forall a. Maybe a
Nothing forall a b. (a -> b) -> a -> b
$
       forall i o u (m :: * -> *) a.
(i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever forall {m :: * -> *} {o} {i} {u}.
Monad m =>
o -> Pipe i o u (StateT (Maybe o) m) ()
passOn
    forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r
.| forall (m :: * -> *) (n :: * -> *) i o u a.
(Monad m, Monad n) =>
(forall x. m x -> n x) -> Pipe i o u m a -> Pipe i o u n a
hoistPipe forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Pipe i o u m a
p
    forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r
.| forall i o u (m :: * -> *) a.
(i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever forall {m :: * -> *} {t} {i} {a} {u}.
Monad m =>
t -> Pipe i (a, t) u (StateT a m) ()
tagIn
  where
    passOn :: o -> Pipe i o u (StateT (Maybe o) m) ()
passOn o
i = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (forall (m :: * -> *) s. Monad m => s -> StateT s m ()
SS.put (forall a. a -> Maybe a
Just o
i)) forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield o
i
    tagIn :: t -> Pipe i (a, t) u (StateT a m) ()
tagIn t
i = forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield forall b c a. (b -> c) -> (a -> b) -> a -> c
. (,t
i) forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall (m :: * -> *) s. Monad m => StateT s m s
SS.get
{-# INLINE passthrough #-}

-- | Loop a pipe into itself.
--
-- *  Will feed all output back to the input
-- *  Will only ask for input upstream if output is stalled.
-- *  Yields all outputted values downstream, effectively duplicating them.
--
-- @since 0.2.1.0
feedbackPipe
    :: Monad m
    => Pipe x x u m a
    -> Pipe x x u m a
feedbackPipe :: forall (m :: * -> *) x u a.
Monad m =>
Pipe x x u m a -> Pipe x x u m a
feedbackPipe = forall (m :: * -> *) i o u a.
Monad m =>
Pipe (Either i o) o u m a -> Pipe i o u m a
feedbackPipeEither forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall i j o u (m :: * -> *) a.
(i -> j) -> Pipe j o u m a -> Pipe i o u m a
mapInput (forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> a
id forall a. a -> a
id)
{-# INLINE feedbackPipe #-}

-- | A version of 'feedbackPipe' that distinguishes upstream input from
-- downstream output fed back.  Gets 'Left' from upstream, and 'Right' from
-- its own output.
--
-- *  Will feed all output back to the input
-- *  Will only ask for input upstream if output is stalled.
-- *  Yields all outputted values downstream, effectively duplicating them.
--
-- @since 0.2.2.0
feedbackPipeEither
    :: Monad m
    => Pipe (Either i o) o u m a
    -> Pipe i o u m a
feedbackPipeEither :: forall (m :: * -> *) i o u a.
Monad m =>
Pipe (Either i o) o u m a -> Pipe i o u m a
feedbackPipeEither Pipe (Either i o) o u m a
p = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. (a, b) -> a
fst forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) s i o u a.
Monad m =>
s -> Pipe i o u (StateT s m) a -> Pipe i o u m (a, s)
runStatePS forall a. Seq a
Seq.empty forall a b. (a -> b) -> a -> b
$
       forall {a} {b} {b}. Pipe a (Either a b) b (StateT (Seq b) m) b
popper
    forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r
.| forall (m :: * -> *) (n :: * -> *) i o u a.
(Monad m, Monad n) =>
(forall x. m x -> n x) -> Pipe i o u m a -> Pipe i o u n a
hoistPipe forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift Pipe (Either i o) o u m a
p
    forall (m :: * -> *) a b u v c r.
Monad m =>
Pipe a b u m v -> Pipe b c v m r -> Pipe a c u m r
.| forall i o u (m :: * -> *) a.
(i -> Pipe i o u m a) -> Pipe i o u m u
awaitForever (\o
x -> forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (forall (m :: * -> *) s. Monad m => (s -> s) -> StateT s m ()
SS.modify (forall a. Seq a -> a -> Seq a
:|> o
x)) forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield o
x)
  where
    popper :: Pipe a (Either a b) b (StateT (Seq b) m) b
popper = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall (m :: * -> *) s. Monad m => StateT s m s
SS.get forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Seq b
Empty -> forall i o u (m :: * -> *). Pipe i o u m (Either u i)
awaitEither forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
        Left b
r  -> forall (f :: * -> *) a. Applicative f => a -> f a
pure b
r
        Right a
x -> forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield (forall a b. a -> Either a b
Left a
x) forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Pipe a (Either a b) b (StateT (Seq b) m) b
popper
      b
x :<| Seq b
xs -> do
        forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) s. Monad m => s -> StateT s m ()
SS.put Seq b
xs
        forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield (forall a b. b -> Either a b
Right b
x)
        Pipe a (Either a b) b (StateT (Seq b) m) b
popper
{-# INLINE feedbackPipeEither #-}

-- | A newtype wrapper over a source (@'Pipe' () o 'Void'@) that gives it an
-- alternative 'Applicative' and 'Alternative' instance, matching "ListT
-- done right".
--
-- '<*>' will pair up each output that the sources produce: if you 'await'
-- a value from downstream, it will wait until both paired sources yield
-- before passing them on together.
--
-- '<|>' will completely exhaust the first source before moving on to the
-- next source.
--
-- 'ZipSource' is effectively equivalent to "ListT done right", the true
-- List Monad transformer.  '<|>' is concatentation.  You can use this type
-- with 'lift' to lift a yielding action and '<|>' to sequence yields to
-- implement the pattern described in
-- <http://www.haskellforall.com/2014/11/how-to-build-library-agnostic-streaming.html>,
-- where you can write streaming producers in a polymorphic way, and have
-- it run with pipes, conduit, etc.
--
-- The main difference is that its 'Applicative' instance ("zipping") is
-- different from the traditional 'Applicative' instance for 'ListT'
-- ("all combinations").  Effectively this becomes like a "zipping"
-- 'Applicative' instance for 'ListT'.
--
-- If you want a 'Monad' (or 'Control.Monad.IO.Class.MonadIO') instance,
-- use 'ListT' instead, and convert using 'toListT'/'fromListT' or the
-- 'PipeList' pattern/constructor.
newtype ZipSource m a = ZipSource { forall (m :: * -> *) a. ZipSource m a -> Pipe () a Void m ()
getZipSource :: Pipe () a Void m () }

-- | A source is equivalent to a 'ListT' producing a 'Maybe'; this pattern
-- synonym lets you treat it as such.  It essentialyl wraps over 'toListT'
-- and 'fromListT'.
pattern PipeList :: Monad m => ListT m (Maybe a) -> Pipe () a u m ()
pattern $bPipeList :: forall (m :: * -> *) a u.
Monad m =>
ListT m (Maybe a) -> Pipe () a u m ()
$mPipeList :: forall {r} {m :: * -> *} {a} {u}.
Monad m =>
Pipe () a u m () -> (ListT m (Maybe a) -> r) -> ((# #) -> r) -> r
PipeList xs <- (toListT->xs)
  where
    PipeList ListT m (Maybe a)
xs = forall (m :: * -> *) o i u.
Monad m =>
ListT m (Maybe o) -> Pipe i o u m ()
fromListT ListT m (Maybe a)
xs
{-# COMPLETE PipeList #-}

instance Functor (ZipSource m) where
    fmap :: forall a b. (a -> b) -> ZipSource m a -> ZipSource m b
fmap a -> b
f = forall (m :: * -> *) a. Pipe () a Void m () -> ZipSource m a
ZipSource forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall p o i u (m :: * -> *) a.
(p -> o) -> Pipe i p u m a -> Pipe i o u m a
mapOutput a -> b
f forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. ZipSource m a -> Pipe () a Void m ()
getZipSource

instance Monad m => Applicative (ZipSource m) where
    pure :: forall a. a -> ZipSource m a
pure = forall (m :: * -> *) a. Pipe () a Void m () -> ZipSource m a
ZipSource forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield
    ZipSource Pipe () (a -> b) Void m ()
p <*> :: forall a b. ZipSource m (a -> b) -> ZipSource m a -> ZipSource m b
<*> ZipSource Pipe () a Void m ()
q = forall (m :: * -> *) a. Pipe () a Void m () -> ZipSource m a
ZipSource forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a b u v w.
Monad m =>
Pipe () (a -> b) u m () -> Pipe () a v m () -> Pipe () b w m ()
zipSource Pipe () (a -> b) Void m ()
p Pipe () a Void m ()
q

-- | Takes two sources and runs them in parallel, collating their outputs.
--
-- @since 0.2.1.0
zipSource :: Monad m => Pipe () (a -> b) u m () -> Pipe () a v m () -> Pipe () b w m ()
zipSource :: forall (m :: * -> *) a b u v w.
Monad m =>
Pipe () (a -> b) u m () -> Pipe () a v m () -> Pipe () b w m ()
zipSource (PipeList ListT m (Maybe (a -> b))
fs) (PipeList ListT m (Maybe a)
xs) = forall (m :: * -> *) a u.
Monad m =>
ListT m (Maybe a) -> Pipe () a u m ()
PipeList forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$
    forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry forall a b. (a -> b) -> a -> b
($) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a b.
Monad m =>
ListT m a -> ListT m b -> ListT m (a, b)
LT.zip (forall (m :: * -> *) a. Monad m => ListT m (Maybe a) -> ListT m a
concatListT ListT m (Maybe (a -> b))
fs) (forall (m :: * -> *) a. Monad m => ListT m (Maybe a) -> ListT m a
concatListT ListT m (Maybe a)
xs)
{-# INLINE zipSource #-}

concatListT :: Monad m => ListT m (Maybe a) -> ListT m a
concatListT :: forall (m :: * -> *) a. Monad m => ListT m (Maybe a) -> ListT m a
concatListT ListT m (Maybe a)
xs = forall (m :: * -> *) a. m (Step m a) -> ListT m a
ListT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. ListT m a -> m (Step m a)
next ListT m (Maybe a)
xs forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Step m (Maybe a)
Nil              -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall (m :: * -> *) a. Step m a
Nil
    Cons Maybe a
Nothing  ListT m (Maybe a)
ys -> forall (m :: * -> *) a. ListT m a -> m (Step m a)
next (forall (m :: * -> *) a. Monad m => ListT m (Maybe a) -> ListT m a
concatListT ListT m (Maybe a)
ys)
    Cons (Just a
y) ListT m (Maybe a)
ys -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. a -> ListT m a -> Step m a
Cons a
y (forall (m :: * -> *) a. Monad m => ListT m (Maybe a) -> ListT m a
concatListT ListT m (Maybe a)
ys)

instance Monad m => Alternative (ZipSource m) where
    empty :: forall a. ZipSource m a
empty = forall (m :: * -> *) a. Pipe () a Void m () -> ZipSource m a
ZipSource forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    ZipSource Pipe () a Void m ()
p <|> :: forall a. ZipSource m a -> ZipSource m a -> ZipSource m a
<|> ZipSource Pipe () a Void m ()
q = forall (m :: * -> *) a. Pipe () a Void m () -> ZipSource m a
ZipSource (Pipe () a Void m ()
p forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Pipe () a Void m ()
q)

instance MonadTrans ZipSource where
    lift :: forall (m :: * -> *) a. Monad m => m a -> ZipSource m a
lift = forall (m :: * -> *) a. Pipe () a Void m () -> ZipSource m a
ZipSource forall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<<) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift

-- | A source is essentially equivalent to 'ListT' producing a 'Maybe'
-- result.  This converts it to the 'ListT' it encodes.
--
-- See 'ZipSource' for a wrapper over 'Pipe' that gives the right 'Functor'
-- and 'Alternative' instances.
toListT
    :: Applicative m
    => Pipe () o u m ()
    -> ListT m (Maybe o)
toListT :: forall (m :: * -> *) o u.
Applicative m =>
Pipe () o u m () -> ListT m (Maybe o)
toListT (Pipe (FT forall r.
(() -> m r)
-> (forall x. (x -> m r) -> PipeF () o u x -> m r) -> m r
p)) = forall (m :: * -> *) a. m (Step m a) -> ListT m a
ListT forall a b. (a -> b) -> a -> b
$ forall r.
(() -> m r)
-> (forall x. (x -> m r) -> PipeF () o u x -> m r) -> m r
p
    (\()
_ -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall (m :: * -> *) a. Step m a
Nil)
    (\x -> m (Step m (Maybe o))
pNext -> \case
        PAwaitF u -> x
_ () -> x
g -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. a -> ListT m a -> Step m a
Cons forall a. Maybe a
Nothing  (forall (m :: * -> *) a. m (Step m a) -> ListT m a
ListT forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> m (Step m (Maybe o))
pNext forall a b. (a -> b) -> a -> b
$ () -> x
g ())
        PYieldF o
x x
y -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. a -> ListT m a -> Step m a
Cons (forall a. a -> Maybe a
Just o
x) (forall (m :: * -> *) a. m (Step m a) -> ListT m a
ListT forall b c a. (b -> c) -> (a -> b) -> a -> c
. x -> m (Step m (Maybe o))
pNext forall a b. (a -> b) -> a -> b
$ x
y   )
    )
{-# INLINE toListT #-}

-- | A source is essentially 'ListT' producing a 'Maybe' result.  This
-- converts a 'ListT' to the source it encodes.
--
-- See 'ZipSource' for a wrapper over 'Pipe' that gives the right 'Functor'
-- and 'Alternative' instances.
fromListT
    :: Monad m
    => ListT m (Maybe o)
    -> Pipe i o u m ()
fromListT :: forall (m :: * -> *) o i u.
Monad m =>
ListT m (Maybe o) -> Pipe i o u m ()
fromListT ListT m (Maybe o)
xs = forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (forall (m :: * -> *) a. ListT m a -> m (Step m a)
next ListT m (Maybe o)
xs) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Step m (Maybe o)
Nil              -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      Cons Maybe o
Nothing  ListT m (Maybe o)
ys -> forall (m :: * -> *) o i u.
Monad m =>
ListT m (Maybe o) -> Pipe i o u m ()
fromListT ListT m (Maybe o)
ys
      Cons (Just o
y) ListT m (Maybe o)
ys -> forall o i u (m :: * -> *). o -> Pipe i o u m ()
yield o
y forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> forall (m :: * -> *) o i u.
Monad m =>
ListT m (Maybe o) -> Pipe i o u m ()
fromListT ListT m (Maybe o)
ys

-- | Given a "generator" of @o@ in @m@, return a /source/ that that
-- generator encodes.  Is the inverse of 'withSource'.
--
-- The generator is essentially a church-encoded 'ListT'.
genSource
    :: (forall r. (Maybe (o, m r) -> m r) -> m r)
    -> Pipe i o u m ()
genSource :: forall o (m :: * -> *) i u.
(forall r. (Maybe (o, m r) -> m r) -> m r) -> Pipe i o u m ()
genSource forall r. (Maybe (o, m r) -> m r) -> m r
f = forall i o u (m :: * -> *) a.
FT (PipeF i o u) m a -> Pipe i o u m a
Pipe forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) (m :: * -> *) a.
(forall r.
 (a -> m r) -> (forall x. (x -> m r) -> f x -> m r) -> m r)
-> FT f m a
FT forall a b. (a -> b) -> a -> b
$ \() -> m r
pDone forall x. (x -> m r) -> PipeF i o u x -> m r
pFree -> forall r. (Maybe (o, m r) -> m r) -> m r
f forall a b. (a -> b) -> a -> b
$ \case
    Maybe (o, m r)
Nothing      -> () -> m r
pDone ()
    Just (o
x, m r
xs) -> forall x. (x -> m r) -> PipeF i o u x -> m r
pFree forall a. a -> a
id (forall i o u a. o -> a -> PipeF i o u a
PYieldF o
x m r
xs)
{-# INLINE genSource #-}

-- | A source can be "run" by providing a continuation to handle and
-- sequence each of its outputs.  Is ths inverse of 'genSource'.
--
-- This essentially turns a pipe into a church-encoded 'ListT'.
withSource
    :: Pipe () o u m ()
    -> (Maybe (o, m r) -> m r)    -- ^ handler ('Nothing' = done, @'Just' (x, next)@ = yielded value and next action
    -> m r
withSource :: forall o u (m :: * -> *) r.
Pipe () o u m () -> (Maybe (o, m r) -> m r) -> m r
withSource (Pipe (FT forall r.
(() -> m r)
-> (forall x. (x -> m r) -> PipeF () o u x -> m r) -> m r
p)) Maybe (o, m r) -> m r
f = forall r.
(() -> m r)
-> (forall x. (x -> m r) -> PipeF () o u x -> m r) -> m r
p
    (\()
_ -> Maybe (o, m r) -> m r
f forall a. Maybe a
Nothing)
    (\x -> m r
pNext -> \case
        PAwaitF u -> x
_ () -> x
g -> x -> m r
pNext forall a b. (a -> b) -> a -> b
$ () -> x
g ()
        PYieldF o
x x
y -> Maybe (o, m r) -> m r
f (forall a. a -> Maybe a
Just (o
x, x -> m r
pNext x
y))
    )

-- | 'ZipSource' is effectively 'ListT' returning a 'Maybe'.  As such, you
-- can use 'unconsZipSource' to "peel off" the first yielded item, if it
-- exists, and return the "rest of the list".
unconsZipSource
    :: Monad m
    => ZipSource m a
    -> m (Maybe (Maybe a, ZipSource m a))
unconsZipSource :: forall (m :: * -> *) a.
Monad m =>
ZipSource m a -> m (Maybe (Maybe a, ZipSource m a))
unconsZipSource (ZipSource (PipeList ListT m (Maybe a)
p)) = forall (m :: * -> *) a. ListT m a -> m (Step m a)
next ListT m (Maybe a)
p forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
    Cons Maybe a
x ListT m (Maybe a)
xs -> forall a. a -> Maybe a
Just (Maybe a
x, forall (m :: * -> *) a. Pipe () a Void m () -> ZipSource m a
ZipSource (forall (m :: * -> *) a u.
Monad m =>
ListT m (Maybe a) -> Pipe () a u m ()
PipeList ListT m (Maybe a)
xs))
    Step m (Maybe a)
Nil       -> forall a. Maybe a
Nothing
{-# INLINE unconsZipSource #-}

-- | A newtype wrapper over a sink (@'Pipe' i 'Void'@) that gives it an
-- alternative 'Applicative' and 'Alternative' instance.
--
-- '<*>' will distribute input over both sinks, and output a final result
-- once both sinks finish.
--
-- '<|>' will distribute input over both sinks, and output a final result
-- as soon as one or the other finishes.
newtype ZipSink i u m a = ZipSink { forall i u (m :: * -> *) a. ZipSink i u m a -> Pipe i Void u m a
getZipSink :: Pipe i Void u m a }
  deriving forall a b. a -> ZipSink i u m b -> ZipSink i u m a
forall a b. (a -> b) -> ZipSink i u m a -> ZipSink i u m b
forall i u (m :: * -> *) a b.
a -> ZipSink i u m b -> ZipSink i u m a
forall i u (m :: * -> *) a b.
(a -> b) -> ZipSink i u m a -> ZipSink i u m b
forall (f :: * -> *).
(forall a b. (a -> b) -> f a -> f b)
-> (forall a b. a -> f b -> f a) -> Functor f
<$ :: forall a b. a -> ZipSink i u m b -> ZipSink i u m a
$c<$ :: forall i u (m :: * -> *) a b.
a -> ZipSink i u m b -> ZipSink i u m a
fmap :: forall a b. (a -> b) -> ZipSink i u m a -> ZipSink i u m b
$cfmap :: forall i u (m :: * -> *) a b.
(a -> b) -> ZipSink i u m a -> ZipSink i u m b
Functor

zipSink_
    :: Monad m
    => RecPipe i Void u m (a -> b)
    -> RecPipe i Void u m a
    -> RecPipe i Void u m b
zipSink_ :: forall (m :: * -> *) i u a b.
Monad m =>
RecPipe i Void u m (a -> b)
-> RecPipe i Void u m a -> RecPipe i Void u m b
zipSink_ RecPipe i Void u m (a -> b)
p RecPipe i Void u m a
q = forall (f :: * -> *) (m :: * -> *) a.
m (FreeF f a (FreeT f m a)) -> FreeT f m a
FreeT forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) (m :: * -> *) a.
FreeT f m a -> m (FreeF f a (FreeT f m a))
runFreeT RecPipe i Void u m (a -> b)
p forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \FreeF (PipeF i Void u) (a -> b) (RecPipe i Void u m (a -> b))
pp -> case FreeF (PipeF i Void u) (a -> b) (RecPipe i Void u m (a -> b))
pp of
    Pure a -> b
x             -> forall (f :: * -> *) (m :: * -> *) a.
FreeT f m a -> m (FreeF f a (FreeT f m a))
runFreeT RecPipe i Void u m a
q forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
      Pure a
x'              -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. a -> FreeF f a b
Pure forall a b. (a -> b) -> a -> b
$ a -> b
x a
x'
      Free (PAwaitF u -> RecPipe i Void u m a
f' i -> RecPipe i Void u m a
g') -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. f b -> FreeF f a b
Free forall a b. (a -> b) -> a -> b
$
        forall i o u a. (u -> a) -> (i -> a) -> PipeF i o u a
PAwaitF (forall (m :: * -> *) i u a b.
Monad m =>
RecPipe i Void u m (a -> b)
-> RecPipe i Void u m a -> RecPipe i Void u m b
zipSink_ (forall (f :: * -> *) (m :: * -> *) a.
m (FreeF f a (FreeT f m a)) -> FreeT f m a
FreeT (forall (f :: * -> *) a. Applicative f => a -> f a
pure FreeF (PipeF i Void u) (a -> b) (RecPipe i Void u m (a -> b))
pp)) forall b c a. (b -> c) -> (a -> b) -> a -> c
. u -> RecPipe i Void u m a
f')
                (forall (m :: * -> *) i u a b.
Monad m =>
RecPipe i Void u m (a -> b)
-> RecPipe i Void u m a -> RecPipe i Void u m b
zipSink_ (forall (f :: * -> *) (m :: * -> *) a.
m (FreeF f a (FreeT f m a)) -> FreeT f m a
FreeT (forall (f :: * -> *) a. Applicative f => a -> f a
pure FreeF (PipeF i Void u) (a -> b) (RecPipe i Void u m (a -> b))
pp)) forall b c a. (b -> c) -> (a -> b) -> a -> c
. i -> RecPipe i Void u m a
g')
      Free (PYieldF Void
x' RecPipe i Void u m a
_ ) -> forall a. Void -> a
absurd Void
x'
    Free (PAwaitF u -> RecPipe i Void u m (a -> b)
f i -> RecPipe i Void u m (a -> b)
g) -> forall (f :: * -> *) (m :: * -> *) a.
FreeT f m a -> m (FreeF f a (FreeT f m a))
runFreeT RecPipe i Void u m a
q forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \FreeF (PipeF i Void u) a (RecPipe i Void u m a)
qq -> case FreeF (PipeF i Void u) a (RecPipe i Void u m a)
qq of
      Pure a
_               -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. f b -> FreeF f a b
Free forall a b. (a -> b) -> a -> b
$
        forall i o u a. (u -> a) -> (i -> a) -> PipeF i o u a
PAwaitF ((forall (m :: * -> *) i u a b.
Monad m =>
RecPipe i Void u m (a -> b)
-> RecPipe i Void u m a -> RecPipe i Void u m b
`zipSink_` forall (f :: * -> *) (m :: * -> *) a.
m (FreeF f a (FreeT f m a)) -> FreeT f m a
FreeT (forall (f :: * -> *) a. Applicative f => a -> f a
pure FreeF (PipeF i Void u) a (RecPipe i Void u m a)
qq)) forall b c a. (b -> c) -> (a -> b) -> a -> c
. u -> RecPipe i Void u m (a -> b)
f)
                ((forall (m :: * -> *) i u a b.
Monad m =>
RecPipe i Void u m (a -> b)
-> RecPipe i Void u m a -> RecPipe i Void u m b
`zipSink_` forall (f :: * -> *) (m :: * -> *) a.
m (FreeF f a (FreeT f m a)) -> FreeT f m a
FreeT (forall (f :: * -> *) a. Applicative f => a -> f a
pure FreeF (PipeF i Void u) a (RecPipe i Void u m a)
qq)) forall b c a. (b -> c) -> (a -> b) -> a -> c
. i -> RecPipe i Void u m (a -> b)
g)
      Free (PAwaitF u -> RecPipe i Void u m a
f' i -> RecPipe i Void u m a
g') -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. f b -> FreeF f a b
Free forall a b. (a -> b) -> a -> b
$
        forall i o u a. (u -> a) -> (i -> a) -> PipeF i o u a
PAwaitF (forall (m :: * -> *) i u a b.
Monad m =>
RecPipe i Void u m (a -> b)
-> RecPipe i Void u m a -> RecPipe i Void u m b
zipSink_ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> u -> RecPipe i Void u m (a -> b)
f forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> u -> RecPipe i Void u m a
f') (forall (m :: * -> *) i u a b.
Monad m =>
RecPipe i Void u m (a -> b)
-> RecPipe i Void u m a -> RecPipe i Void u m b
zipSink_ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> i -> RecPipe i Void u m (a -> b)
g forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> i -> RecPipe i Void u m a
g')
      Free (PYieldF Void
x' RecPipe i Void u m a
_ ) -> forall a. Void -> a
absurd Void
x'
    Free (PYieldF Void
x RecPipe i Void u m (a -> b)
_) -> forall a. Void -> a
absurd Void
x

altSink_
    :: Monad m
    => RecPipe i Void u m a
    -> RecPipe i Void u m a
    -> RecPipe i Void u m a
altSink_ :: forall (m :: * -> *) i u a.
Monad m =>
RecPipe i Void u m a
-> RecPipe i Void u m a -> RecPipe i Void u m a
altSink_ RecPipe i Void u m a
p RecPipe i Void u m a
q = forall (f :: * -> *) (m :: * -> *) a.
m (FreeF f a (FreeT f m a)) -> FreeT f m a
FreeT forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) (m :: * -> *) a.
FreeT f m a -> m (FreeF f a (FreeT f m a))
runFreeT RecPipe i Void u m a
p forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Pure a
x             -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. a -> FreeF f a b
Pure forall a b. (a -> b) -> a -> b
$ a
x
    Free (PAwaitF u -> RecPipe i Void u m a
f i -> RecPipe i Void u m a
g) -> forall (f :: * -> *) (m :: * -> *) a.
FreeT f m a -> m (FreeF f a (FreeT f m a))
runFreeT RecPipe i Void u m a
q forall (f :: * -> *) a b. Functor f => f a -> (a -> b) -> f b
<&> \case
      Pure a
x'              -> forall (f :: * -> *) a b. a -> FreeF f a b
Pure a
x'
      Free (PAwaitF u -> RecPipe i Void u m a
f' i -> RecPipe i Void u m a
g') -> forall (f :: * -> *) a b. f b -> FreeF f a b
Free forall a b. (a -> b) -> a -> b
$ forall i o u a. (u -> a) -> (i -> a) -> PipeF i o u a
PAwaitF (forall (m :: * -> *) i u a.
Monad m =>
RecPipe i Void u m a
-> RecPipe i Void u m a -> RecPipe i Void u m a
altSink_ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> u -> RecPipe i Void u m a
f forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> u -> RecPipe i Void u m a
f') (forall (m :: * -> *) i u a.
Monad m =>
RecPipe i Void u m a
-> RecPipe i Void u m a -> RecPipe i Void u m a
altSink_ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> i -> RecPipe i Void u m a
g forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> i -> RecPipe i Void u m a
g')
      Free (PYieldF Void
x' RecPipe i Void u m a
_ ) -> forall a. Void -> a
absurd Void
x'
    Free (PYieldF Void
x RecPipe i Void u m a
_) -> forall a. Void -> a
absurd Void
x

-- | Distribute input to both sinks, and finishes with the final result
-- once both finish.
--
-- Forms an identity with 'pure'.
zipSink
    :: Monad m
    => Pipe i Void u m (a -> b)
    -> Pipe i Void u m a
    -> Pipe i Void u m b
zipSink :: forall (m :: * -> *) i u a b.
Monad m =>
Pipe i Void u m (a -> b) -> Pipe i Void u m a -> Pipe i Void u m b
zipSink Pipe i Void u m (a -> b)
p Pipe i Void u m a
q = forall (m :: * -> *) i o u a.
Monad m =>
RecPipe i o u m a -> Pipe i o u m a
fromRecPipe forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) i u a b.
Monad m =>
RecPipe i Void u m (a -> b)
-> RecPipe i Void u m a -> RecPipe i Void u m b
zipSink_ (forall (m :: * -> *) i o u a.
Monad m =>
Pipe i o u m a -> RecPipe i o u m a
toRecPipe Pipe i Void u m (a -> b)
p) (forall (m :: * -> *) i o u a.
Monad m =>
Pipe i o u m a -> RecPipe i o u m a
toRecPipe Pipe i Void u m a
q)
{-# INLINE zipSink #-}

-- | Distribute input to both sinks, and finishes with the result of
-- the one that finishes first.
altSink
    :: Monad m
    => Pipe i Void u m a
    -> Pipe i Void u m a
    -> Pipe i Void u m a
altSink :: forall (m :: * -> *) i u a.
Monad m =>
Pipe i Void u m a -> Pipe i Void u m a -> Pipe i Void u m a
altSink Pipe i Void u m a
p Pipe i Void u m a
q = forall (m :: * -> *) i o u a.
Monad m =>
RecPipe i o u m a -> Pipe i o u m a
fromRecPipe forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) i u a.
Monad m =>
RecPipe i Void u m a
-> RecPipe i Void u m a -> RecPipe i Void u m a
altSink_ (forall (m :: * -> *) i o u a.
Monad m =>
Pipe i o u m a -> RecPipe i o u m a
toRecPipe Pipe i Void u m a
p) (forall (m :: * -> *) i o u a.
Monad m =>
Pipe i o u m a -> RecPipe i o u m a
toRecPipe Pipe i Void u m a
q)
{-# INLINE altSink #-}

-- | '<*>' = distribute input to all, and return result when they finish
--
-- 'pure' = immediately finish
instance Monad m => Applicative (ZipSink i u m) where
    pure :: forall a. a -> ZipSink i u m a
pure = forall i u (m :: * -> *) a. Pipe i Void u m a -> ZipSink i u m a
ZipSink forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a. Applicative f => a -> f a
pure
    ZipSink Pipe i Void u m (a -> b)
p <*> :: forall a b.
ZipSink i u m (a -> b) -> ZipSink i u m a -> ZipSink i u m b
<*> ZipSink Pipe i Void u m a
q = forall i u (m :: * -> *) a. Pipe i Void u m a -> ZipSink i u m a
ZipSink forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) i u a b.
Monad m =>
Pipe i Void u m (a -> b) -> Pipe i Void u m a -> Pipe i Void u m b
zipSink Pipe i Void u m (a -> b)
p Pipe i Void u m a
q

-- | '<|>' = distribute input to all, and return the first result that
-- finishes
--
-- 'empty' = never finish
instance Monad m => Alternative (ZipSink i u m) where
    empty :: forall a. ZipSink i u m a
empty = forall i u (m :: * -> *) a. Pipe i Void u m a -> ZipSink i u m a
ZipSink forall {i} {o} {u} {m :: * -> *} {b}. Pipe i o u m b
go
      where
        go :: Pipe i o u m b
go = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall i o u (m :: * -> *). Pipe i o u m (Maybe i)
await
    ZipSink Pipe i Void u m a
p <|> :: forall a. ZipSink i u m a -> ZipSink i u m a -> ZipSink i u m a
<|> ZipSink Pipe i Void u m a
q = forall i u (m :: * -> *) a. Pipe i Void u m a -> ZipSink i u m a
ZipSink forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) i u a.
Monad m =>
Pipe i Void u m a -> Pipe i Void u m a -> Pipe i Void u m a
altSink Pipe i Void u m a
p Pipe i Void u m a
q

instance MonadTrans (ZipSink i u) where
    lift :: forall (m :: * -> *) a. Monad m => m a -> ZipSink i u m a
lift = forall i u (m :: * -> *) a. Pipe i Void u m a -> ZipSink i u m a
ZipSink forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift