{-# LANGUAGE CPP, GADTs, FlexibleContexts, RankNTypes, ScopedTypeVariables,
             TupleSections #-}
-- | The primary use of concurrent machines is to establish a
-- pipelined architecture that can boost overall throughput by running
-- each stage of the pipeline at the same time. The processing, or
-- production, rate of each stage may not be identical, so facilities
-- are provided to loosen the temporal coupling between pipeline
-- stages using buffers.
--
-- This architecture also lends itself to operations where multiple
-- workers are available for procesisng inputs. If each worker is to
-- process the same set of inputs, consider 'fanout' and
-- 'fanoutSteps'. If each worker is to process a disjoint set of
-- inputs, consider 'scatter'.
module Data.Machine.Concurrent (module Data.Machine,
                                -- * Concurrent connection
                                (>~>), (<~<),
                                -- * Buffered machines
                                bufferConnect, rollingConnect,
                                -- * Concurrent processing of shared inputs
                                fanout, fanoutSteps,
                                -- * Concurrent multiple-input machines
                                wye, tee, scatter, splitSum, mergeSum,
                                splitProd) where
#if defined(__GLASGOW_HASKELL__) && __GLASGOW_HASKELL__ < 710
import Control.Applicative
#endif
import Control.Concurrent.Async.Lifted
import Control.Monad (join)
import Control.Monad.Trans.Control
import Data.Machine hiding (tee, wye)
import Data.Machine.Concurrent.AsyncStep
import Data.Machine.Concurrent.Buffer
import Data.Machine.Concurrent.Fanout
import Data.Machine.Concurrent.Scatter
import Data.Machine.Concurrent.Wye
import Data.Machine.Concurrent.Tee

-- | Build a new 'Machine' by adding a 'Process' to the output of an
-- old 'Machine'. The upstream machine is run concurrently with
-- downstream with the aim that upstream will have a yielded value
-- ready as soon as downstream awaits. This effectively creates a
-- buffer between upstream and downstream, or source and sink, that
-- can contain up to one value.
--
-- @
-- ('<~<') :: 'Process' b c -> 'Process' a b -> 'Process' a c
-- ('<~<') :: 'Process' c d -> 'Data.Machine.Tee.Tee' a b c -> 'Data.Machine.Tee.Tee' a b d
-- ('<~<') :: 'Process' b c -> 'Machine' k b -> 'Machine' k c
-- @
(<~<) :: MonadBaseControl IO m
     => ProcessT m b c -> MachineT m k b -> MachineT m k c
ProcessT m b c
mp <~< :: ProcessT m b c -> MachineT m k b -> MachineT m k c
<~< MachineT m k b
ma = MachineT m k b -> ProcessT m b c -> MachineT m k c
forall (m :: * -> *) (k :: * -> *) a b.
MonadBaseControl IO m =>
MachineT m k a -> ProcessT m a b -> MachineT m k b
racers MachineT m k b
ma ProcessT m b c
mp

-- | Flipped ('<~<').
(>~>) :: MonadBaseControl IO m
     => MachineT m k b -> ProcessT m b c -> MachineT m k c
MachineT m k b
ma >~> :: MachineT m k b -> ProcessT m b c -> MachineT m k c
>~> ProcessT m b c
mp = ProcessT m b c
mp ProcessT m b c -> MachineT m k b -> MachineT m k c
forall (m :: * -> *) b c (k :: * -> *).
MonadBaseControl IO m =>
ProcessT m b c -> MachineT m k b -> MachineT m k c
<~< MachineT m k b
ma

infixl 7 >~>

-- | We want the first available response.
waitEither' :: MonadBaseControl IO m
            => Maybe (Async (StM m a)) -> Async (StM m b)
            -> m (Either a b)
waitEither' :: Maybe (Async (StM m a)) -> Async (StM m b) -> m (Either a b)
waitEither' Maybe (Async (StM m a))
Nothing Async (StM m b)
y = b -> Either a b
forall a b. b -> Either a b
Right (b -> Either a b) -> m b -> m (Either a b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async (StM m b) -> m b
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async (StM m b)
y
waitEither' (Just Async (StM m a)
x) Async (StM m b)
y = Async (StM m a) -> Async (StM m b) -> m (Either a b)
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Async (StM m a) -> Async (StM m b) -> m (Either a b)
waitEither Async (StM m a)
x Async (StM m b)
y

-- | Let a source and a sink chase each other, providing an effective
-- one-element buffer between the two. The idea is to run both
-- concurrently at all times so that as soon as the sink 'Await's, we
-- have a source-yielded value to provide it. This, of course,
-- involves eagerly running the source, percolating its 'Await's up
-- the chain as soon as possible.
racers :: forall m k a b. MonadBaseControl IO m
       => MachineT m k a -> ProcessT m a b -> MachineT m k b
racers :: MachineT m k a -> ProcessT m a b -> MachineT m k b
racers MachineT m k a
src ProcessT m a b
snk = m (Step k b (MachineT m k b)) -> MachineT m k b
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step k b (MachineT m k b)) -> MachineT m k b)
-> (m (m (Step k b (MachineT m k b)))
    -> m (Step k b (MachineT m k b)))
-> m (m (Step k b (MachineT m k b)))
-> MachineT m k b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (m (Step k b (MachineT m k b))) -> m (Step k b (MachineT m k b))
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m (Step k b (MachineT m k b))) -> MachineT m k b)
-> m (m (Step k b (MachineT m k b))) -> MachineT m k b
forall a b. (a -> b) -> a -> b
$
                 Maybe (AsyncStep m k a)
-> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b))
go (Maybe (AsyncStep m k a)
 -> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> m (Maybe (AsyncStep m k a))
-> m (AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (AsyncStep m k a -> Maybe (AsyncStep m k a)
forall a. a -> Maybe a
Just (AsyncStep m k a -> Maybe (AsyncStep m k a))
-> m (AsyncStep m k a) -> m (Maybe (AsyncStep m k a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MachineT m k a -> m (AsyncStep m k a)
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun MachineT m k a
src) m (AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> m (AsyncStep m (Is a) b) -> m (m (Step k b (MachineT m k b)))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ProcessT m a b -> m (AsyncStep m (Is a) b)
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun ProcessT m a b
snk
  where go :: Maybe (AsyncStep m k a)
           -> AsyncStep m (Is a) b
           -> m (MachineStep m k b)
        go :: Maybe (AsyncStep m k a)
-> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b))
go Maybe (AsyncStep m k a)
srcA AsyncStep m (Is a) b
snkA =
          Maybe (AsyncStep m k a)
-> AsyncStep m (Is a) b
-> m (Either (MachineStep m k a) (MachineStep m (Is a) b))
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Maybe (Async (StM m a)) -> Async (StM m b) -> m (Either a b)
waitEither' Maybe (AsyncStep m k a)
srcA AsyncStep m (Is a) b
snkA m (Either (MachineStep m k a) (MachineStep m (Is a) b))
-> (Either (MachineStep m k a) (MachineStep m (Is a) b)
    -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Either (MachineStep m k a) (MachineStep m (Is a) b)
n -> case Either (MachineStep m k a) (MachineStep m (Is a) b)
n of
            Left (MachineStep m k a
Stop :: MachineStep m k a) -> Maybe (AsyncStep m k a)
-> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b))
go Maybe (AsyncStep m k a)
forall a. Maybe a
Nothing AsyncStep m (Is a) b
snkA
            Left (Yield a
o MachineT m k a
k) -> AsyncStep m (Is a) b -> m (MachineStep m (Is a) b)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait AsyncStep m (Is a) b
snkA m (MachineStep m (Is a) b)
-> (MachineStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \MachineStep m (Is a) b
m -> case MachineStep m (Is a) b
m of
              (MachineStep m (Is a) b
Stop :: MachineStep m (Is a) b) -> Step k b (MachineT m k b) -> m (Step k b (MachineT m k b))
forall (m :: * -> *) a. Monad m => a -> m a
return Step k b (MachineT m k b)
forall (k :: * -> *) o r. Step k o r
Stop
              Yield b
o' ProcessT m a b
k' -> Step k b (MachineT m k b) -> m (Step k b (MachineT m k b))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step k b (MachineT m k b) -> m (Step k b (MachineT m k b)))
-> (((a -> ProcessT m a b) -> m (Step k b (MachineT m k b)))
    -> Step k b (MachineT m k b))
-> ((a -> ProcessT m a b) -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> MachineT m k b -> Step k b (MachineT m k b)
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield b
o' (MachineT m k b -> Step k b (MachineT m k b))
-> (((a -> ProcessT m a b) -> m (Step k b (MachineT m k b)))
    -> MachineT m k b)
-> ((a -> ProcessT m a b) -> m (Step k b (MachineT m k b)))
-> Step k b (MachineT m k b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Step k b (MachineT m k b)) -> MachineT m k b
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step k b (MachineT m k b)) -> MachineT m k b)
-> (((a -> ProcessT m a b) -> m (Step k b (MachineT m k b)))
    -> m (Step k b (MachineT m k b)))
-> ((a -> ProcessT m a b) -> m (Step k b (MachineT m k b)))
-> MachineT m k b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ProcessT m a b
-> ((a -> ProcessT m a b) -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
flushDown ProcessT m a b
k' (((a -> ProcessT m a b) -> m (Step k b (MachineT m k b)))
 -> m (Step k b (MachineT m k b)))
-> ((a -> ProcessT m a b) -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
forall a b. (a -> b) -> a -> b
$
                             \a -> ProcessT m a b
f -> m (m (Step k b (MachineT m k b))) -> m (Step k b (MachineT m k b))
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m (Step k b (MachineT m k b)))
 -> m (Step k b (MachineT m k b)))
-> m (m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
forall a b. (a -> b) -> a -> b
$ Maybe (AsyncStep m k a)
-> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b))
go (Maybe (AsyncStep m k a)
 -> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> m (Maybe (AsyncStep m k a))
-> m (AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (AsyncStep m k a -> Maybe (AsyncStep m k a)
forall a. a -> Maybe a
Just (AsyncStep m k a -> Maybe (AsyncStep m k a))
-> m (AsyncStep m k a) -> m (Maybe (AsyncStep m k a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MachineT m k a -> m (AsyncStep m k a)
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun MachineT m k a
k)
                                             m (AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> m (AsyncStep m (Is a) b) -> m (m (Step k b (MachineT m k b)))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ProcessT m a b -> m (AsyncStep m (Is a) b)
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun (a -> ProcessT m a b
f a
o)
              Await t -> ProcessT m a b
f Is a t
Refl ProcessT m a b
_ -> m (m (Step k b (MachineT m k b))) -> m (Step k b (MachineT m k b))
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m (Step k b (MachineT m k b)))
 -> m (Step k b (MachineT m k b)))
-> m (m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
forall a b. (a -> b) -> a -> b
$ Maybe (AsyncStep m k a)
-> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b))
go (Maybe (AsyncStep m k a)
 -> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> m (Maybe (AsyncStep m k a))
-> m (AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (AsyncStep m k a -> Maybe (AsyncStep m k a)
forall a. a -> Maybe a
Just (AsyncStep m k a -> Maybe (AsyncStep m k a))
-> m (AsyncStep m k a) -> m (Maybe (AsyncStep m k a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MachineT m k a -> m (AsyncStep m k a)
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun MachineT m k a
k)
                                          m (AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> m (AsyncStep m (Is a) b) -> m (m (Step k b (MachineT m k b)))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ProcessT m a b -> m (AsyncStep m (Is a) b)
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun (t -> ProcessT m a b
f a
t
o)
            Left (Await t -> MachineT m k a
g k t
kg MachineT m k a
fg) -> (t -> MachineT m k a)
-> k t
-> MachineT m k a
-> (AsyncStep m k a -> MachineT m k b)
-> m (Step k b (MachineT m k b))
forall (m :: * -> *) a (k :: * -> *) o (k' :: * -> *)
       (k1 :: * -> *) o1 b.
MonadBaseControl IO m =>
(a -> MachineT m k o)
-> k' a
-> MachineT m k o
-> (AsyncStep m k o -> MachineT m k1 o1)
-> m (Step k' b (MachineT m k1 o1))
asyncAwait t -> MachineT m k a
g k t
kg MachineT m k a
fg ((AsyncStep m k a -> MachineT m k b)
 -> m (Step k b (MachineT m k b)))
-> (AsyncStep m k a -> MachineT m k b)
-> m (Step k b (MachineT m k b))
forall a b. (a -> b) -> a -> b
$
                                    m (Step k b (MachineT m k b)) -> MachineT m k b
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step k b (MachineT m k b)) -> MachineT m k b)
-> (AsyncStep m k a -> m (Step k b (MachineT m k b)))
-> AsyncStep m k a
-> MachineT m k b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe (AsyncStep m k a)
 -> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> AsyncStep m (Is a) b
-> Maybe (AsyncStep m k a)
-> m (Step k b (MachineT m k b))
forall a b c. (a -> b -> c) -> b -> a -> c
flip Maybe (AsyncStep m k a)
-> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b))
go AsyncStep m (Is a) b
snkA (Maybe (AsyncStep m k a) -> m (Step k b (MachineT m k b)))
-> (AsyncStep m k a -> Maybe (AsyncStep m k a))
-> AsyncStep m k a
-> m (Step k b (MachineT m k b))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. AsyncStep m k a -> Maybe (AsyncStep m k a)
forall a. a -> Maybe a
Just
            Right (MachineStep m (Is a) b
Stop :: MachineStep m (Is a) b) -> Step k b (MachineT m k b) -> m (Step k b (MachineT m k b))
forall (m :: * -> *) a. Monad m => a -> m a
return Step k b (MachineT m k b)
forall (k :: * -> *) o r. Step k o r
Stop
            Right (Yield b
o ProcessT m a b
k) -> ProcessT m a b -> m (AsyncStep m (Is a) b)
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun ProcessT m a b
k m (AsyncStep m (Is a) b)
-> (AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
                                 Step k b (MachineT m k b) -> m (Step k b (MachineT m k b))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step k b (MachineT m k b) -> m (Step k b (MachineT m k b)))
-> (AsyncStep m (Is a) b -> Step k b (MachineT m k b))
-> AsyncStep m (Is a) b
-> m (Step k b (MachineT m k b))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> MachineT m k b -> Step k b (MachineT m k b)
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield b
o (MachineT m k b -> Step k b (MachineT m k b))
-> (AsyncStep m (Is a) b -> MachineT m k b)
-> AsyncStep m (Is a) b
-> Step k b (MachineT m k b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Step k b (MachineT m k b)) -> MachineT m k b
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step k b (MachineT m k b)) -> MachineT m k b)
-> (AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> AsyncStep m (Is a) b
-> MachineT m k b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Maybe (AsyncStep m k a)
-> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b))
go Maybe (AsyncStep m k a)
srcA
            Right (Await t -> ProcessT m a b
f Is a t
Refl ProcessT m a b
ff) -> case Maybe (AsyncStep m k a)
srcA of
              Maybe (AsyncStep m k a)
Nothing -> ProcessT m a b -> m (AsyncStep m (Is a) b)
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun ProcessT m a b
ff m (AsyncStep m (Is a) b)
-> (AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe (AsyncStep m k a)
-> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b))
go Maybe (AsyncStep m k a)
forall a. Maybe a
Nothing
              Just AsyncStep m k a
src' -> Async (StM m (Step k t (MachineT m k t)))
-> m (Step k t (MachineT m k t))
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait AsyncStep m k a
Async (StM m (Step k t (MachineT m k t)))
src' m (Step k t (MachineT m k t))
-> (Step k t (MachineT m k t) -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Step k t (MachineT m k t)
m -> case Step k t (MachineT m k t)
m of
                Step k t (MachineT m k t)
Stop -> Step k b (MachineT m k b) -> m (Step k b (MachineT m k b))
forall (m :: * -> *) a. Monad m => a -> m a
return Step k b (MachineT m k b)
forall (k :: * -> *) o r. Step k o r
Stop
                Yield t
o MachineT m k t
k -> m (m (Step k b (MachineT m k b))) -> m (Step k b (MachineT m k b))
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m (Step k b (MachineT m k b)))
 -> m (Step k b (MachineT m k b)))
-> m (m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
forall a b. (a -> b) -> a -> b
$ Maybe (AsyncStep m k a)
-> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b))
go (Maybe (AsyncStep m k a)
 -> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> m (Maybe (AsyncStep m k a))
-> m (AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (AsyncStep m k a -> Maybe (AsyncStep m k a)
forall a. a -> Maybe a
Just (AsyncStep m k a -> Maybe (AsyncStep m k a))
-> m (AsyncStep m k a) -> m (Maybe (AsyncStep m k a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MachineT m k t -> m (Async (StM m (Step k t (MachineT m k t))))
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun MachineT m k t
k)
                                       m (AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> m (AsyncStep m (Is a) b) -> m (m (Step k b (MachineT m k b)))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ProcessT m a b -> m (AsyncStep m (Is a) b)
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun (t -> ProcessT m a b
f t
o)
                Step k t (MachineT m k t)
a -> MachineT m k a
-> (a -> MachineT m k a -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
feedUp (Step k t (MachineT m k t) -> MachineT m k t
forall (m :: * -> *) (k :: * -> *) o.
Monad m =>
Step k o (MachineT m k o) -> MachineT m k o
encased Step k t (MachineT m k t)
a) ((a -> MachineT m k a -> m (Step k b (MachineT m k b)))
 -> m (Step k b (MachineT m k b)))
-> (a -> MachineT m k a -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
forall a b. (a -> b) -> a -> b
$ \a
o MachineT m k a
k -> m (m (Step k b (MachineT m k b))) -> m (Step k b (MachineT m k b))
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m (Step k b (MachineT m k b)))
 -> m (Step k b (MachineT m k b)))
-> m (m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
forall a b. (a -> b) -> a -> b
$
                       Maybe (AsyncStep m k a)
-> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b))
go (Maybe (AsyncStep m k a)
 -> AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> m (Maybe (AsyncStep m k a))
-> m (AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (AsyncStep m k a -> Maybe (AsyncStep m k a)
forall a. a -> Maybe a
Just (AsyncStep m k a -> Maybe (AsyncStep m k a))
-> m (AsyncStep m k a) -> m (Maybe (AsyncStep m k a))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MachineT m k a -> m (AsyncStep m k a)
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun MachineT m k a
k) m (AsyncStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> m (AsyncStep m (Is a) b) -> m (m (Step k b (MachineT m k b)))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ProcessT m a b -> m (AsyncStep m (Is a) b)
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun (t -> ProcessT m a b
f a
t
o)
        -- If we have an upstream source value ready, we must flush
        -- all available values yielded by downstream until it awaits.
        flushDown :: ProcessT m a b
                  -> ((a -> ProcessT m a b) -> m (MachineStep m k b))
                  -> m (MachineStep m k b)
        flushDown :: ProcessT m a b
-> ((a -> ProcessT m a b) -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
flushDown ProcessT m a b
m (a -> ProcessT m a b) -> m (Step k b (MachineT m k b))
k = ProcessT m a b -> m (MachineStep m (Is a) b)
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT ProcessT m a b
m m (MachineStep m (Is a) b)
-> (MachineStep m (Is a) b -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \MachineStep m (Is a) b
s -> case MachineStep m (Is a) b
s of
          MachineStep m (Is a) b
Stop -> Step k b (MachineT m k b) -> m (Step k b (MachineT m k b))
forall (m :: * -> *) a. Monad m => a -> m a
return Step k b (MachineT m k b)
forall (k :: * -> *) o r. Step k o r
Stop
          Yield b
o ProcessT m a b
m' -> Step k b (MachineT m k b) -> m (Step k b (MachineT m k b))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step k b (MachineT m k b) -> m (Step k b (MachineT m k b)))
-> (m (Step k b (MachineT m k b)) -> Step k b (MachineT m k b))
-> m (Step k b (MachineT m k b))
-> m (Step k b (MachineT m k b))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> MachineT m k b -> Step k b (MachineT m k b)
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield b
o (MachineT m k b -> Step k b (MachineT m k b))
-> (m (Step k b (MachineT m k b)) -> MachineT m k b)
-> m (Step k b (MachineT m k b))
-> Step k b (MachineT m k b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (Step k b (MachineT m k b)) -> MachineT m k b
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step k b (MachineT m k b)) -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b)) -> m (Step k b (MachineT m k b))
forall a b. (a -> b) -> a -> b
$ ProcessT m a b
-> ((a -> ProcessT m a b) -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
flushDown ProcessT m a b
m' (a -> ProcessT m a b) -> m (Step k b (MachineT m k b))
k
          Await t -> ProcessT m a b
f Is a t
Refl ProcessT m a b
_ -> (a -> ProcessT m a b) -> m (Step k b (MachineT m k b))
k a -> ProcessT m a b
t -> ProcessT m a b
f
        -- If downstream is awaiting an input, we must pull in all
        -- necessary upstream awaits until we have a yielded value to
        -- push downstream.
        feedUp :: MachineT m k a
               -> (a -> MachineT m k a -> m (MachineStep m k b))
               -> m (MachineStep m k b)
        feedUp :: MachineT m k a
-> (a -> MachineT m k a -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
feedUp MachineT m k a
m a -> MachineT m k a -> m (Step k b (MachineT m k b))
k = MachineT m k a -> m (MachineStep m k a)
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT MachineT m k a
m m (MachineStep m k a)
-> (MachineStep m k a -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \MachineStep m k a
s -> case MachineStep m k a
s of
          MachineStep m k a
Stop -> Step k b (MachineT m k b) -> m (Step k b (MachineT m k b))
forall (m :: * -> *) a. Monad m => a -> m a
return Step k b (MachineT m k b)
forall (k :: * -> *) o r. Step k o r
Stop
          Yield a
o MachineT m k a
m' -> a -> MachineT m k a -> m (Step k b (MachineT m k b))
k a
o MachineT m k a
m'
          Await t -> MachineT m k a
g k t
kg MachineT m k a
fg -> Step k b (MachineT m k b) -> m (Step k b (MachineT m k b))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step k b (MachineT m k b) -> m (Step k b (MachineT m k b)))
-> Step k b (MachineT m k b) -> m (Step k b (MachineT m k b))
forall a b. (a -> b) -> a -> b
$ (t -> MachineT m k a)
-> k t
-> MachineT m k a
-> (MachineT m k a -> MachineT m k b)
-> Step k b (MachineT m k b)
forall a d (k' :: * -> *) r b.
(a -> d) -> k' a -> d -> (d -> r) -> Step k' b r
awaitStep t -> MachineT m k a
g k t
kg MachineT m k a
fg (m (Step k b (MachineT m k b)) -> MachineT m k b
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step k b (MachineT m k b)) -> MachineT m k b)
-> (MachineT m k a -> m (Step k b (MachineT m k b)))
-> MachineT m k a
-> MachineT m k b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MachineT m k a
 -> (a -> MachineT m k a -> m (Step k b (MachineT m k b)))
 -> m (Step k b (MachineT m k b)))
-> (a -> MachineT m k a -> m (Step k b (MachineT m k b)))
-> MachineT m k a
-> m (Step k b (MachineT m k b))
forall a b c. (a -> b -> c) -> b -> a -> c
flip MachineT m k a
-> (a -> MachineT m k a -> m (Step k b (MachineT m k b)))
-> m (Step k b (MachineT m k b))
feedUp a -> MachineT m k a -> m (Step k b (MachineT m k b))
k)