{-# LANGUAGE CPP, FlexibleContexts, GADTs, ScopedTypeVariables, TupleSections #-}
-- | Place buffers between two machines. This is most useful with
-- irregular production rates.
module Data.Machine.Concurrent.Buffer (
  -- * Blocking buffers
  bufferConnect,
  -- * Non-blocking (rolling) buffers
  rollingConnect,
  -- * Internal helpers
  mediatedConnect, BufferRoom(..)
  ) where
#if defined(__GLASGOW_HASKELL__) && __GLASGOW_HASKELL__ < 710
import Control.Applicative ((<$>), (<*>))
#endif
import Control.Concurrent.Async.Lifted (wait, waitEither)
import Control.Monad.Trans.Control (MonadBaseControl)
import Control.Monad (join, (>=>))
import Data.Machine.Concurrent.AsyncStep
import Data.Machine
import Data.Sequence (ViewL(..), (|>))
import qualified Data.Sequence as S
#if defined(__GLASGOW_HASKELL__) && __GLASGOW_HASKELL__ < 710
import Data.Traversable (traverse)
#endif

-- | Drain downstream until it awaits a value, then pass the awaiting
-- step to the given function.
drain :: Monad m
      => MachineStep m k a
      -> (MachineStep m k a -> m (MachineStep m k' a))
      -> m (MachineStep m k' a)
drain :: MachineStep m k a
-> (MachineStep m k a -> m (MachineStep m k' a))
-> m (MachineStep m k' a)
drain MachineStep m k a
z MachineStep m k a -> m (MachineStep m k' a)
k = MachineStep m k a -> m (MachineStep m k' a)
go MachineStep m k a
z
  where go :: MachineStep m k a -> m (MachineStep m k' a)
go MachineStep m k a
Stop = MachineStep m k' a -> m (MachineStep m k' a)
forall (m :: * -> *) a. Monad m => a -> m a
return MachineStep m k' a
forall (k :: * -> *) o r. Step k o r
Stop
        go (Yield a
o MachineT m k a
kd) = a -> MachineT m k' a -> MachineStep m k' a
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield a
o (MachineT m k' a -> MachineStep m k' a)
-> (MachineStep m k a -> MachineT m k' a)
-> MachineStep m k a
-> MachineStep m k' a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m (MachineStep m k' a) -> MachineT m k' a
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (MachineStep m k' a) -> MachineT m k' a)
-> (MachineStep m k a -> m (MachineStep m k' a))
-> MachineStep m k a
-> MachineT m k' a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MachineStep m k a -> m (MachineStep m k' a)
go (MachineStep m k a -> MachineStep m k' a)
-> m (MachineStep m k a) -> m (MachineStep m k' a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MachineT m k a -> m (MachineStep m k a)
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT MachineT m k a
kd
        go MachineStep m k a
aStep = MachineStep m k a -> m (MachineStep m k' a)
k MachineStep m k a
aStep

-- | Feed upstream until it yields a value, then pass the yielded
-- value and next step to the given function.
feedToBursting :: Monad m
               => MachineStep m k a
               -> (Maybe (a, MachineT m k a) -> m (MachineStep m k b))
               -> m (MachineStep m k b)
feedToBursting :: MachineStep m k a
-> (Maybe (a, MachineT m k a) -> m (MachineStep m k b))
-> m (MachineStep m k b)
feedToBursting MachineStep m k a
z Maybe (a, MachineT m k a) -> m (MachineStep m k b)
k = MachineStep m k a -> m (MachineStep m k b)
go MachineStep m k a
z
  where go :: MachineStep m k a -> m (MachineStep m k b)
go MachineStep m k a
Stop = Maybe (a, MachineT m k a) -> m (MachineStep m k b)
k Maybe (a, MachineT m k a)
forall a. Maybe a
Nothing
        go (Await t -> MachineT m k a
f k t
kf MachineT m k a
ff) = MachineStep m k b -> m (MachineStep m k b)
forall (m :: * -> *) a. Monad m => a -> m a
return (MachineStep m k b -> m (MachineStep m k b))
-> MachineStep m k b -> m (MachineStep m k b)
forall a b. (a -> b) -> a -> b
$
          (t -> MachineT m k b) -> k t -> MachineT m k b -> MachineStep m k b
forall (k :: * -> *) o r t. (t -> r) -> k t -> r -> Step k o r
Await (\t
a -> MachineT m k a -> MachineT m k b
go' (t -> MachineT m k a
f t
a)) k t
kf (MachineT m k a -> MachineT m k b
go' MachineT m k a
ff)
        go (Yield a
o MachineT m k a
kk) = Maybe (a, MachineT m k a) -> m (MachineStep m k b)
k (Maybe (a, MachineT m k a) -> m (MachineStep m k b))
-> Maybe (a, MachineT m k a) -> m (MachineStep m k b)
forall a b. (a -> b) -> a -> b
$ (a, MachineT m k a) -> Maybe (a, MachineT m k a)
forall a. a -> Maybe a
Just (a
o, MachineT m k a
kk)
        go' :: MachineT m k a -> MachineT m k b
go' MachineT m k a
step = m (MachineStep m k b) -> MachineT m k b
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (MachineStep m k b) -> MachineT m k b)
-> m (MachineStep m k b) -> MachineT m k b
forall a b. (a -> b) -> a -> b
$ MachineT m k a -> m (MachineStep m k a)
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT MachineT m k a
step m (MachineStep m k a)
-> (MachineStep m k a -> m (MachineStep m k b))
-> m (MachineStep m k b)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MachineStep m k a -> m (MachineStep m k b)
go

-- | Mediate a 'MachineT' and a 'ProcessT' with a bounded capacity
-- buffer. The source machine runs concurrently with the sink process,
-- and is only blocked when the buffer is full.
bufferConnect :: MonadBaseControl IO m
              => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
bufferConnect :: Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
bufferConnect Int
n = Seq b
-> (Seq b -> b -> BufferRoom (Seq b))
-> (Seq b -> Maybe (b, Seq b))
-> MachineT m k b
-> ProcessT m b c
-> MachineT m k c
forall (m :: * -> *) t b (k :: * -> *) c.
MonadBaseControl IO m =>
t
-> (t -> b -> BufferRoom t)
-> (t -> Maybe (b, t))
-> MachineT m k b
-> ProcessT m b c
-> MachineT m k c
mediatedConnect Seq b
forall a. Seq a
S.empty Seq b -> b -> BufferRoom (Seq b)
snoc Seq b -> Maybe (b, Seq b)
forall a. Seq a -> Maybe (a, Seq a)
view
  where snoc :: Seq b -> b -> BufferRoom (Seq b)
snoc Seq b
acc b
x = (if Seq b -> Int
forall a. Seq a -> Int
S.length Seq b
acc Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1 then Seq b -> BufferRoom (Seq b)
forall a. a -> BufferRoom a
Vacancy else Seq b -> BufferRoom (Seq b)
forall a. a -> BufferRoom a
NoVacancy) (Seq b -> BufferRoom (Seq b)) -> Seq b -> BufferRoom (Seq b)
forall a b. (a -> b) -> a -> b
$
                       Seq b
acc Seq b -> b -> Seq b
forall a. Seq a -> a -> Seq a
|> b
x
        view :: Seq a -> Maybe (a, Seq a)
view Seq a
acc = case Seq a -> ViewL a
forall a. Seq a -> ViewL a
S.viewl Seq a
acc of
                     ViewL a
EmptyL -> Maybe (a, Seq a)
forall a. Maybe a
Nothing
                     a
x :< Seq a
acc' -> (a, Seq a) -> Maybe (a, Seq a)
forall a. a -> Maybe a
Just (a
x, Seq a
acc')

-- | Mediate a 'MachineT' and a 'ProcessT' with a rolling buffer. The
-- source machine runs concurrently with the sink process and is never
-- blocked. If the sink process can not keep up with upstream, yielded
-- values will be dropped.
rollingConnect :: MonadBaseControl IO m
              => Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
rollingConnect :: Int -> MachineT m k b -> ProcessT m b c -> MachineT m k c
rollingConnect Int
n = Seq b
-> (Seq b -> b -> BufferRoom (Seq b))
-> (Seq b -> Maybe (b, Seq b))
-> MachineT m k b
-> ProcessT m b c
-> MachineT m k c
forall (m :: * -> *) t b (k :: * -> *) c.
MonadBaseControl IO m =>
t
-> (t -> b -> BufferRoom t)
-> (t -> Maybe (b, t))
-> MachineT m k b
-> ProcessT m b c
-> MachineT m k c
mediatedConnect Seq b
forall a. Seq a
S.empty Seq b -> b -> BufferRoom (Seq b)
snoc Seq b -> Maybe (b, Seq b)
forall a. Seq a -> Maybe (a, Seq a)
view
  where snoc :: Seq b -> b -> BufferRoom (Seq b)
snoc Seq b
acc b
x = Seq b -> BufferRoom (Seq b)
forall a. a -> BufferRoom a
Vacancy (Seq b -> BufferRoom (Seq b)) -> Seq b -> BufferRoom (Seq b)
forall a b. (a -> b) -> a -> b
$ Int -> Seq b -> Seq b
forall a. Int -> Seq a -> Seq a
S.take (Int
nInt -> Int -> Int
forall a. Num a => a -> a -> a
-Int
1) Seq b
acc Seq b -> b -> Seq b
forall a. Seq a -> a -> Seq a
|> b
x
        view :: Seq a -> Maybe (a, Seq a)
view Seq a
acc = case Seq a -> ViewL a
forall a. Seq a -> ViewL a
S.viewl Seq a
acc of
                     ViewL a
EmptyL -> Maybe (a, Seq a)
forall a. Maybe a
Nothing
                     a
x :< Seq a
acc' -> (a, Seq a) -> Maybe (a, Seq a)
forall a. a -> Maybe a
Just (a
x, Seq a
acc')

-- | Indication if the payload value is "full" or not.
data BufferRoom a = NoVacancy a | Vacancy a deriving (BufferRoom a -> BufferRoom a -> Bool
(BufferRoom a -> BufferRoom a -> Bool)
-> (BufferRoom a -> BufferRoom a -> Bool) -> Eq (BufferRoom a)
forall a. Eq a => BufferRoom a -> BufferRoom a -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: BufferRoom a -> BufferRoom a -> Bool
$c/= :: forall a. Eq a => BufferRoom a -> BufferRoom a -> Bool
== :: BufferRoom a -> BufferRoom a -> Bool
$c== :: forall a. Eq a => BufferRoom a -> BufferRoom a -> Bool
Eq, Eq (BufferRoom a)
Eq (BufferRoom a)
-> (BufferRoom a -> BufferRoom a -> Ordering)
-> (BufferRoom a -> BufferRoom a -> Bool)
-> (BufferRoom a -> BufferRoom a -> Bool)
-> (BufferRoom a -> BufferRoom a -> Bool)
-> (BufferRoom a -> BufferRoom a -> Bool)
-> (BufferRoom a -> BufferRoom a -> BufferRoom a)
-> (BufferRoom a -> BufferRoom a -> BufferRoom a)
-> Ord (BufferRoom a)
BufferRoom a -> BufferRoom a -> Bool
BufferRoom a -> BufferRoom a -> Ordering
BufferRoom a -> BufferRoom a -> BufferRoom a
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
forall a. Ord a => Eq (BufferRoom a)
forall a. Ord a => BufferRoom a -> BufferRoom a -> Bool
forall a. Ord a => BufferRoom a -> BufferRoom a -> Ordering
forall a. Ord a => BufferRoom a -> BufferRoom a -> BufferRoom a
min :: BufferRoom a -> BufferRoom a -> BufferRoom a
$cmin :: forall a. Ord a => BufferRoom a -> BufferRoom a -> BufferRoom a
max :: BufferRoom a -> BufferRoom a -> BufferRoom a
$cmax :: forall a. Ord a => BufferRoom a -> BufferRoom a -> BufferRoom a
>= :: BufferRoom a -> BufferRoom a -> Bool
$c>= :: forall a. Ord a => BufferRoom a -> BufferRoom a -> Bool
> :: BufferRoom a -> BufferRoom a -> Bool
$c> :: forall a. Ord a => BufferRoom a -> BufferRoom a -> Bool
<= :: BufferRoom a -> BufferRoom a -> Bool
$c<= :: forall a. Ord a => BufferRoom a -> BufferRoom a -> Bool
< :: BufferRoom a -> BufferRoom a -> Bool
$c< :: forall a. Ord a => BufferRoom a -> BufferRoom a -> Bool
compare :: BufferRoom a -> BufferRoom a -> Ordering
$ccompare :: forall a. Ord a => BufferRoom a -> BufferRoom a -> Ordering
$cp1Ord :: forall a. Ord a => Eq (BufferRoom a)
Ord, Int -> BufferRoom a -> ShowS
[BufferRoom a] -> ShowS
BufferRoom a -> String
(Int -> BufferRoom a -> ShowS)
-> (BufferRoom a -> String)
-> ([BufferRoom a] -> ShowS)
-> Show (BufferRoom a)
forall a. Show a => Int -> BufferRoom a -> ShowS
forall a. Show a => [BufferRoom a] -> ShowS
forall a. Show a => BufferRoom a -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [BufferRoom a] -> ShowS
$cshowList :: forall a. Show a => [BufferRoom a] -> ShowS
show :: BufferRoom a -> String
$cshow :: forall a. Show a => BufferRoom a -> String
showsPrec :: Int -> BufferRoom a -> ShowS
$cshowsPrec :: forall a. Show a => Int -> BufferRoom a -> ShowS
Show)

-- | Mediate a 'MachineT' and a 'ProcessT' with a buffer.
--
-- @mediatedConnect z snoc view source sink@ pipes @source@ into
-- @sink@ through a buffer initialized to @z@ and updated with
-- @snoc@. Upstream is blocked if @snoc@ indicates that the buffer is
-- full after adding a new element. Downstream blocks if @view@
-- indicates that the buffer is empty. Otherwise, @view@ is expected
-- to return the next element to process and an updated buffer.
mediatedConnect :: forall m t b k c. MonadBaseControl IO m
                => t -> (t -> b -> BufferRoom t) -> (t -> Maybe (b,t))
                -> MachineT m k b -> ProcessT m b c -> MachineT m k c
mediatedConnect :: t
-> (t -> b -> BufferRoom t)
-> (t -> Maybe (b, t))
-> MachineT m k b
-> ProcessT m b c
-> MachineT m k c
mediatedConnect t
z t -> b -> BufferRoom t
snoc t -> Maybe (b, t)
view MachineT m k b
src0 ProcessT m b c
snk0 =
  m (Step k c (MachineT m k c)) -> MachineT m k c
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step k c (MachineT m k c)) -> MachineT m k c)
-> m (Step k c (MachineT m k c)) -> MachineT m k c
forall a b. (a -> b) -> a -> b
$ do Async (StM m (MachineStep m k b))
srcFuture <- MachineT m k b -> m (Async (StM m (MachineStep m k b)))
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun MachineT m k b
src0
                Async (StM m (MachineStep m (Is b) c))
snkFuture <- ProcessT m b c -> m (Async (StM m (MachineStep m (Is b) c)))
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun ProcessT m b c
snk0
                t
-> Maybe (Async (StM m (MachineStep m k b)))
-> Async (StM m (MachineStep m (Is b) c))
-> m (Step k c (MachineT m k c))
go t
z (Async (StM m (MachineStep m k b))
-> Maybe (Async (StM m (MachineStep m k b)))
forall a. a -> Maybe a
Just Async (StM m (MachineStep m k b))
srcFuture) Async (StM m (MachineStep m (Is b) c))
snkFuture
  where -- Wait for the next available step
        go :: t
           -> Maybe (AsyncStep m k b)
           -> AsyncStep m (Is b) c
           -> m (MachineStep m k c)
        go :: t
-> Maybe (Async (StM m (MachineStep m k b)))
-> Async (StM m (MachineStep m (Is b) c))
-> m (Step k c (MachineT m k c))
go t
acc Maybe (Async (StM m (MachineStep m k b)))
src Async (StM m (MachineStep m (Is b) c))
snk = m (Either (MachineStep m (Is b) c) (MachineStep m k b))
-> (Async (StM m (MachineStep m k b))
    -> m (Either (MachineStep m (Is b) c) (MachineStep m k b)))
-> Maybe (Async (StM m (MachineStep m k b)))
-> m (Either (MachineStep m (Is b) c) (MachineStep m k b))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (MachineStep m (Is b) c
-> Either (MachineStep m (Is b) c) (MachineStep m k b)
forall a b. a -> Either a b
Left (MachineStep m (Is b) c
 -> Either (MachineStep m (Is b) c) (MachineStep m k b))
-> m (MachineStep m (Is b) c)
-> m (Either (MachineStep m (Is b) c) (MachineStep m k b))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Async (StM m (MachineStep m (Is b) c))
-> m (MachineStep m (Is b) c)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async (StM m (MachineStep m (Is b) c))
snk) (Async (StM m (MachineStep m (Is b) c))
-> Async (StM m (MachineStep m k b))
-> m (Either (MachineStep m (Is b) c) (MachineStep m k b))
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
Async (StM m a) -> Async (StM m b) -> m (Either a b)
waitEither Async (StM m (MachineStep m (Is b) c))
snk) Maybe (Async (StM m (MachineStep m k b)))
src m (Either (MachineStep m (Is b) c) (MachineStep m k b))
-> (Either (MachineStep m (Is b) c) (MachineStep m k b)
    -> m (Step k c (MachineT m k c)))
-> m (Step k c (MachineT m k c))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>=
                           t
-> Either
     (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
     (MachineStep m (Is b) c, Maybe (Async (StM m (MachineStep m k b))))
-> m (Step k c (MachineT m k c))
goStep t
acc (Either
   (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
   (MachineStep m (Is b) c, Maybe (Async (StM m (MachineStep m k b))))
 -> m (Step k c (MachineT m k c)))
-> (Either (MachineStep m (Is b) c) (MachineStep m k b)
    -> Either
         (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
         (MachineStep m (Is b) c,
          Maybe (Async (StM m (MachineStep m k b)))))
-> Either (MachineStep m (Is b) c) (MachineStep m k b)
-> m (Step k c (MachineT m k c))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (MachineStep m (Is b) c
 -> Either
      (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
      (MachineStep m (Is b) c,
       Maybe (Async (StM m (MachineStep m k b)))))
-> (MachineStep m k b
    -> Either
         (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
         (MachineStep m (Is b) c,
          Maybe (Async (StM m (MachineStep m k b)))))
-> Either (MachineStep m (Is b) c) (MachineStep m k b)
-> Either
     (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
     (MachineStep m (Is b) c, Maybe (Async (StM m (MachineStep m k b))))
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either ((MachineStep m (Is b) c, Maybe (Async (StM m (MachineStep m k b))))
-> Either
     (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
     (MachineStep m (Is b) c, Maybe (Async (StM m (MachineStep m k b))))
forall a b. b -> Either a b
Right ((MachineStep m (Is b) c,
  Maybe (Async (StM m (MachineStep m k b))))
 -> Either
      (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
      (MachineStep m (Is b) c,
       Maybe (Async (StM m (MachineStep m k b)))))
-> (MachineStep m (Is b) c
    -> (MachineStep m (Is b) c,
        Maybe (Async (StM m (MachineStep m k b)))))
-> MachineStep m (Is b) c
-> Either
     (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
     (MachineStep m (Is b) c, Maybe (Async (StM m (MachineStep m k b))))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (,Maybe (Async (StM m (MachineStep m k b)))
src)) ((MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
-> Either
     (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
     (MachineStep m (Is b) c, Maybe (Async (StM m (MachineStep m k b))))
forall a b. a -> Either a b
Left ((MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
 -> Either
      (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
      (MachineStep m (Is b) c,
       Maybe (Async (StM m (MachineStep m k b)))))
-> (MachineStep m k b
    -> (MachineStep m k b, Async (StM m (MachineStep m (Is b) c))))
-> MachineStep m k b
-> Either
     (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
     (MachineStep m (Is b) c, Maybe (Async (StM m (MachineStep m k b))))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (,Async (StM m (MachineStep m (Is b) c))
snk))

        -- Kick off the next step of both the source and the sink
        goAsync :: t
                -> Maybe (MachineT m k b)
                -> ProcessT m b c
                -> m (MachineStep m k c)
        goAsync :: t
-> Maybe (MachineT m k b)
-> ProcessT m b c
-> m (Step k c (MachineT m k c))
goAsync t
acc Maybe (MachineT m k b)
src ProcessT m b c
snk =
          m (m (Step k c (MachineT m k c))) -> m (Step k c (MachineT m k c))
forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (m (m (Step k c (MachineT m k c)))
 -> m (Step k c (MachineT m k c)))
-> m (m (Step k c (MachineT m k c)))
-> m (Step k c (MachineT m k c))
forall a b. (a -> b) -> a -> b
$ t
-> Maybe (Async (StM m (MachineStep m k b)))
-> Async (StM m (MachineStep m (Is b) c))
-> m (Step k c (MachineT m k c))
go t
acc (Maybe (Async (StM m (MachineStep m k b)))
 -> Async (StM m (MachineStep m (Is b) c))
 -> m (Step k c (MachineT m k c)))
-> m (Maybe (Async (StM m (MachineStep m k b))))
-> m (Async (StM m (MachineStep m (Is b) c))
      -> m (Step k c (MachineT m k c)))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (MachineT m k b -> m (Async (StM m (MachineStep m k b))))
-> Maybe (MachineT m k b)
-> m (Maybe (Async (StM m (MachineStep m k b))))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse MachineT m k b -> m (Async (StM m (MachineStep m k b)))
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun Maybe (MachineT m k b)
src m (Async (StM m (MachineStep m (Is b) c))
   -> m (Step k c (MachineT m k c)))
-> m (Async (StM m (MachineStep m (Is b) c)))
-> m (m (Step k c (MachineT m k c)))
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ProcessT m b c -> m (Async (StM m (MachineStep m (Is b) c)))
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun ProcessT m b c
snk

        -- Handle whichever step is ready first
        goStep :: t  -> Either (MachineStep m k b, AsyncStep m (Is b) c)
                               (MachineStep m (Is b) c, Maybe (AsyncStep m k b))
               -> m (MachineStep m k c)
        goStep :: t
-> Either
     (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
     (MachineStep m (Is b) c, Maybe (Async (StM m (MachineStep m k b))))
-> m (Step k c (MachineT m k c))
goStep t
acc Either
  (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
  (MachineStep m (Is b) c, Maybe (Async (StM m (MachineStep m k b))))
step = case Either
  (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
  (MachineStep m (Is b) c, Maybe (Async (StM m (MachineStep m k b))))
step of
          -- @src@ stepped first
          Left (MachineStep m k b
Stop, Async (StM m (MachineStep m (Is b) c))
snk) -> t
-> Maybe (Async (StM m (MachineStep m k b)))
-> Async (StM m (MachineStep m (Is b) c))
-> m (Step k c (MachineT m k c))
go t
acc Maybe (Async (StM m (MachineStep m k b)))
forall a. Maybe a
Nothing Async (StM m (MachineStep m (Is b) c))
snk
          Left (Await t -> MachineT m k b
g k t
kg MachineT m k b
fg, Async (StM m (MachineStep m (Is b) c))
snk) ->
            (t -> MachineT m k b)
-> k t
-> MachineT m k b
-> (Async (StM m (MachineStep m k b)) -> MachineT m k c)
-> m (Step k c (MachineT m k c))
forall (m :: * -> *) a (k :: * -> *) o (k' :: * -> *)
       (k1 :: * -> *) o1 b.
MonadBaseControl IO m =>
(a -> MachineT m k o)
-> k' a
-> MachineT m k o
-> (AsyncStep m k o -> MachineT m k1 o1)
-> m (Step k' b (MachineT m k1 o1))
asyncAwait t -> MachineT m k b
g k t
kg MachineT m k b
fg (m (Step k c (MachineT m k c)) -> MachineT m k c
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step k c (MachineT m k c)) -> MachineT m k c)
-> (Async (StM m (MachineStep m k b))
    -> m (Step k c (MachineT m k c)))
-> Async (StM m (MachineStep m k b))
-> MachineT m k c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Maybe (Async (StM m (MachineStep m k b)))
 -> Async (StM m (MachineStep m (Is b) c))
 -> m (Step k c (MachineT m k c)))
-> Async (StM m (MachineStep m (Is b) c))
-> Maybe (Async (StM m (MachineStep m k b)))
-> m (Step k c (MachineT m k c))
forall a b c. (a -> b -> c) -> b -> a -> c
flip (t
-> Maybe (Async (StM m (MachineStep m k b)))
-> Async (StM m (MachineStep m (Is b) c))
-> m (Step k c (MachineT m k c))
go t
acc) Async (StM m (MachineStep m (Is b) c))
snk (Maybe (Async (StM m (MachineStep m k b)))
 -> m (Step k c (MachineT m k c)))
-> (Async (StM m (MachineStep m k b))
    -> Maybe (Async (StM m (MachineStep m k b))))
-> Async (StM m (MachineStep m k b))
-> m (Step k c (MachineT m k c))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async (StM m (MachineStep m k b))
-> Maybe (Async (StM m (MachineStep m k b)))
forall a. a -> Maybe a
Just)
          Left (Yield b
o MachineT m k b
k, Async (StM m (MachineStep m (Is b) c))
snk) -> case t -> b -> BufferRoom t
snoc t
acc b
o of
            -- add it to the right end of the buffer
            Vacancy t
acc' -> MachineT m k b -> m (Async (StM m (MachineStep m k b)))
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun MachineT m k b
k m (Async (StM m (MachineStep m k b)))
-> (Async (StM m (MachineStep m k b))
    -> m (Step k c (MachineT m k c)))
-> m (Step k c (MachineT m k c))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (Maybe (Async (StM m (MachineStep m k b)))
 -> Async (StM m (MachineStep m (Is b) c))
 -> m (Step k c (MachineT m k c)))
-> Async (StM m (MachineStep m (Is b) c))
-> Maybe (Async (StM m (MachineStep m k b)))
-> m (Step k c (MachineT m k c))
forall a b c. (a -> b -> c) -> b -> a -> c
flip (t
-> Maybe (Async (StM m (MachineStep m k b)))
-> Async (StM m (MachineStep m (Is b) c))
-> m (Step k c (MachineT m k c))
go t
acc') Async (StM m (MachineStep m (Is b) c))
snk (Maybe (Async (StM m (MachineStep m k b)))
 -> m (Step k c (MachineT m k c)))
-> (Async (StM m (MachineStep m k b))
    -> Maybe (Async (StM m (MachineStep m k b))))
-> Async (StM m (MachineStep m k b))
-> m (Step k c (MachineT m k c))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async (StM m (MachineStep m k b))
-> Maybe (Async (StM m (MachineStep m k b)))
forall a. a -> Maybe a
Just
            -- buffer was full
            NoVacancy t
acc' ->
              let go' :: MachineStep m (Is b) c -> m (Step k c (MachineT m k c))
go' MachineStep m (Is b) c
snk' = do Async (StM m (MachineStep m k b))
src' <- MachineT m k b -> m (Async (StM m (MachineStep m k b)))
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun MachineT m k b
k
                                t
-> Either
     (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
     (MachineStep m (Is b) c, Maybe (Async (StM m (MachineStep m k b))))
-> m (Step k c (MachineT m k c))
goStep t
acc' ((MachineStep m (Is b) c, Maybe (Async (StM m (MachineStep m k b))))
-> Either
     (MachineStep m k b, Async (StM m (MachineStep m (Is b) c)))
     (MachineStep m (Is b) c, Maybe (Async (StM m (MachineStep m k b))))
forall a b. b -> Either a b
Right (MachineStep m (Is b) c
snk', Async (StM m (MachineStep m k b))
-> Maybe (Async (StM m (MachineStep m k b)))
forall a. a -> Maybe a
Just Async (StM m (MachineStep m k b))
src'))
              in Async (StM m (MachineStep m (Is b) c))
-> m (MachineStep m (Is b) c)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait Async (StM m (MachineStep m (Is b) c))
snk m (MachineStep m (Is b) c)
-> (MachineStep m (Is b) c -> m (Step k c (MachineT m k c)))
-> m (Step k c (MachineT m k c))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= (MachineStep m (Is b) c
 -> (MachineStep m (Is b) c -> m (Step k c (MachineT m k c)))
 -> m (Step k c (MachineT m k c)))
-> (MachineStep m (Is b) c -> m (Step k c (MachineT m k c)))
-> MachineStep m (Is b) c
-> m (Step k c (MachineT m k c))
forall a b c. (a -> b -> c) -> b -> a -> c
flip MachineStep m (Is b) c
-> (MachineStep m (Is b) c -> m (Step k c (MachineT m k c)))
-> m (Step k c (MachineT m k c))
forall (m :: * -> *) (k :: * -> *) a (k' :: * -> *).
Monad m =>
MachineStep m k a
-> (MachineStep m k a -> m (MachineStep m k' a))
-> m (MachineStep m k' a)
drain MachineStep m (Is b) c -> m (Step k c (MachineT m k c))
go'

          -- @snk@ stepped first
          Right (MachineStep m (Is b) c
Stop, Maybe (Async (StM m (MachineStep m k b)))
_) -> Step k c (MachineT m k c) -> m (Step k c (MachineT m k c))
forall (m :: * -> *) a. Monad m => a -> m a
return Step k c (MachineT m k c)
forall (k :: * -> *) o r. Step k o r
Stop
          Right (Yield c
o ProcessT m b c
k, Maybe (Async (StM m (MachineStep m k b)))
src) -> do
            Step k c (MachineT m k c) -> m (Step k c (MachineT m k c))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step k c (MachineT m k c) -> m (Step k c (MachineT m k c)))
-> Step k c (MachineT m k c) -> m (Step k c (MachineT m k c))
forall a b. (a -> b) -> a -> b
$ c -> MachineT m k c -> Step k c (MachineT m k c)
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield c
o (m (Step k c (MachineT m k c)) -> MachineT m k c
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step k c (MachineT m k c)) -> MachineT m k c)
-> m (Step k c (MachineT m k c)) -> MachineT m k c
forall a b. (a -> b) -> a -> b
$ ProcessT m b c -> m (Async (StM m (MachineStep m (Is b) c)))
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun ProcessT m b c
k m (Async (StM m (MachineStep m (Is b) c)))
-> (Async (StM m (MachineStep m (Is b) c))
    -> m (Step k c (MachineT m k c)))
-> m (Step k c (MachineT m k c))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= t
-> Maybe (Async (StM m (MachineStep m k b)))
-> Async (StM m (MachineStep m (Is b) c))
-> m (Step k c (MachineT m k c))
go t
acc Maybe (Async (StM m (MachineStep m k b)))
src)
          Right (Await t -> ProcessT m b c
f Is b t
Refl ProcessT m b c
ff, Maybe (Async (StM m (MachineStep m k b)))
src) ->
            case t -> Maybe (b, t)
view t
acc of
              Maybe (b, t)
Nothing -> m (Step k c (MachineT m k c))
-> (Async (StM m (MachineStep m k b))
    -> m (Step k c (MachineT m k c)))
-> Maybe (Async (StM m (MachineStep m k b)))
-> m (Step k c (MachineT m k c))
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (t
-> Maybe (MachineT m k b)
-> ProcessT m b c
-> m (Step k c (MachineT m k c))
goAsync t
acc Maybe (MachineT m k b)
forall a. Maybe a
Nothing ProcessT m b c
ff) (Async (StM m (MachineStep m k b)) -> m (MachineStep m k t)
forall (m :: * -> *) a.
MonadBaseControl IO m =>
Async (StM m a) -> m a
wait (Async (StM m (MachineStep m k b)) -> m (MachineStep m k t))
-> (MachineStep m k t -> m (Step k c (MachineT m k c)))
-> Async (StM m (MachineStep m k b))
-> m (Step k c (MachineT m k c))
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> MachineStep m k t -> m (Step k c (MachineT m k c))
demandSrc) Maybe (Async (StM m (MachineStep m k b)))
src
              Just (b
x, t
acc') -> ProcessT m b c -> m (Async (StM m (MachineStep m (Is b) c)))
forall (m :: * -> *) (k :: * -> *) o.
MonadBaseControl IO m =>
MachineT m k o -> m (AsyncStep m k o)
asyncRun (t -> ProcessT m b c
f b
t
x) m (Async (StM m (MachineStep m (Is b) c)))
-> (Async (StM m (MachineStep m (Is b) c))
    -> m (Step k c (MachineT m k c)))
-> m (Step k c (MachineT m k c))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= t
-> Maybe (Async (StM m (MachineStep m k b)))
-> Async (StM m (MachineStep m (Is b) c))
-> m (Step k c (MachineT m k c))
go t
acc' Maybe (Async (StM m (MachineStep m k b)))
src
            where demandSrc :: MachineStep m k t -> m (Step k c (MachineT m k c))
demandSrc = (MachineStep m k t
 -> (Maybe (t, MachineT m k t) -> m (Step k c (MachineT m k c)))
 -> m (Step k c (MachineT m k c)))
-> (Maybe (t, MachineT m k t) -> m (Step k c (MachineT m k c)))
-> MachineStep m k t
-> m (Step k c (MachineT m k c))
forall a b c. (a -> b -> c) -> b -> a -> c
flip MachineStep m k t
-> (Maybe (t, MachineT m k t) -> m (Step k c (MachineT m k c)))
-> m (Step k c (MachineT m k c))
forall (m :: * -> *) (k :: * -> *) a b.
Monad m =>
MachineStep m k a
-> (Maybe (a, MachineT m k a) -> m (MachineStep m k b))
-> m (MachineStep m k b)
feedToBursting Maybe (t, MachineT m k b) -> m (Step k c (MachineT m k c))
Maybe (t, MachineT m k t) -> m (Step k c (MachineT m k c))
go'
                  go' :: Maybe (t, MachineT m k b) -> m (Step k c (MachineT m k c))
go' Maybe (t, MachineT m k b)
Nothing = t
-> Maybe (MachineT m k b)
-> ProcessT m b c
-> m (Step k c (MachineT m k c))
goAsync t
acc Maybe (MachineT m k b)
forall a. Maybe a
Nothing ProcessT m b c
ff
                  go' (Just (t
o, MachineT m k b
k)) = t
-> Maybe (MachineT m k b)
-> ProcessT m b c
-> m (Step k c (MachineT m k c))
goAsync t
acc (MachineT m k b -> Maybe (MachineT m k b)
forall a. a -> Maybe a
Just MachineT m k b
k) (t -> ProcessT m b c
f t
o)