{-# LANGUAGE GADTs, FlexibleContexts, RankNTypes, ScopedTypeVariables,
             TupleSections #-}
-- | The primary use of concurrent machines is to establish a
-- pipelined architecture that can boost overall throughput by running
-- each stage of the pipeline at the same time. The processing, or
-- production, rate of each stage may not be identical, so facilities
-- are provided to loosen the temporal coupling between pipeline
-- stages using buffers.
--
-- This architecture also lends itself to operations where multiple
-- workers are available for procesisng inputs. If each worker is to
-- process the same set of inputs, consider 'fanout' and
-- 'fanoutSteps'. If each worker is to process a disjoint set of
-- inputs, consider 'scatter'.
module Data.Machine.Concurrent (module Data.Machine,
                                -- * Concurrent connection
                                (>~>), (<~<),
                                -- * Buffered machines
                                buffer, rolling,
                                bufferConnect, rollingConnect,
                                -- * Concurrent processing of shared inputs
                                fanout, fanoutSteps,
                                -- * Concurrent multiple-input machines
                                wye, tee, scatter, splitSum, mergeSum, 
                                splitProd) where
import Control.Applicative
import Control.Concurrent.Async.Lifted
import Control.Monad (join)
import Control.Monad.Trans.Control
import Data.Machine hiding (tee, wye)
import Data.Machine.Concurrent.AsyncStep
import Data.Machine.Concurrent.Buffer
import Data.Machine.Concurrent.Fanout
import Data.Machine.Concurrent.Scatter
import Data.Machine.Concurrent.Wye
import Data.Machine.Concurrent.Tee

-- | Build a new 'Machine' by adding a 'Process' to the output of an
-- old 'Machine'. The upstream machine is run concurrently with
-- downstream with the aim that upstream will have a yielded value
-- ready as soon as downstream awaits. This effectively creates a
-- buffer between upstream and downstream, or source and sink, that
-- can contain up to one value.
--
-- @
-- ('<~<') :: 'Process' b c -> 'Process' a b -> 'Process' a c
-- ('<~<') :: 'Process' c d -> 'Data.Machine.Tee.Tee' a b c -> 'Data.Machine.Tee.Tee' a b d
-- ('<~<') :: 'Process' b c -> 'Machine' k b -> 'Machine' k c
-- @
(<~<) :: MonadBaseControl IO m
     => ProcessT m b c -> MachineT m k b -> MachineT m k c
mp <~< ma = racers ma mp

-- | Flipped ('<~<').
(>~>) :: MonadBaseControl IO m
     => MachineT m k b -> ProcessT m b c -> MachineT m k c
ma >~> mp = mp <~< ma

infixl 7 >~>

-- | We want the first available response.
waitEither' :: MonadBaseControl IO m 
            => Maybe (Async (StM m a)) -> Async (StM m b)
            -> m (Either a b)
waitEither' Nothing y = Right <$> wait y
waitEither' (Just x) y = waitEither x y

-- | Let a source and a sink chase each other, providing an effective
-- one-element buffer between the two. The idea is to run both
-- concurrently at all times so that as soon as the sink 'Await's, we
-- have a source-yielded value to provide it. This, of course,
-- involves eagerly running the source, percolating its 'Await's up
-- the chain as soon as possible.
racers :: forall m k a b. MonadBaseControl IO m
       => MachineT m k a -> ProcessT m a b -> MachineT m k b
racers src snk = MachineT . join $
                 go <$> (Just <$> asyncRun src) <*> asyncRun snk
  where go :: MonadBaseControl IO m
           => Maybe (AsyncStep m k a)
           -> AsyncStep m (Is a) b
           -> m (MachineStep m k b)
        go srcA snkA =
          waitEither' srcA snkA >>= \n -> case n of
            Left (Stop :: MachineStep m k a) -> go Nothing snkA
            Left (Yield o k) -> wait snkA >>= \m -> case m of
              (Stop :: MachineStep m (Is a) b) -> return Stop
              Yield o' k' -> return . Yield o' . MachineT . flushDown k' $
                             \f -> join $ go <$> (Just <$> asyncRun k)
                                             <*> asyncRun (f o)
              Await f Refl _ -> join $ go <$> (Just <$> asyncRun k)
                                          <*> asyncRun (f o)
            Left (Await g kg fg) -> asyncAwait g kg fg $
                                    MachineT . flip go snkA . Just
            Right (Stop :: MachineStep m (Is a) b) -> return Stop
            Right (Yield o k) -> asyncRun k >>=
                                 return . Yield o . MachineT . go srcA
            Right (Await f Refl ff) -> case srcA of
              Nothing -> asyncRun ff >>= go Nothing
              Just src' -> wait src' >>= \m -> case m of
                Stop -> return Stop
                Yield o k -> join $ go <$> (Just <$> asyncRun k)
                                       <*> asyncRun (f o)
                a -> feedUp (encased a) $ \o k -> join $
                       go <$> (Just <$> asyncRun k) <*> asyncRun (f o)
        -- If we have an upstream source value ready, we must flush
        -- all available values yielded by downstream until it awaits.
        flushDown :: Monad m
                  => ProcessT m a b
                  -> ((a -> ProcessT m a b) -> m (MachineStep m k b))
                  -> m (MachineStep m k b)
        flushDown m k = runMachineT m >>= \s -> case s of
          Stop -> return Stop
          Yield o m' -> return . Yield o . MachineT $ flushDown m' k
          Await f Refl _ -> k f
        -- If downstream is awaiting an input, we must pull in all
        -- necessary upstream awaits until we have a yielded value to
        -- push downstream.
        feedUp :: MonadBaseControl IO m
               => MachineT m k a
               -> (a -> MachineT m k a -> m (MachineStep m k b))
               -> m (MachineStep m k b)
        feedUp m k = runMachineT m >>= \s -> case s of
          Stop -> return Stop
          Yield o m' -> k o m'
          Await g kg fg -> return $ awaitStep g kg fg (MachineT . flip feedUp k)