{-# LANGUAGE CPP, GADTs, FlexibleContexts, RankNTypes, ScopedTypeVariables, TupleSections #-} -- | The primary use of concurrent machines is to establish a -- pipelined architecture that can boost overall throughput by running -- each stage of the pipeline at the same time. The processing, or -- production, rate of each stage may not be identical, so facilities -- are provided to loosen the temporal coupling between pipeline -- stages using buffers. -- -- This architecture also lends itself to operations where multiple -- workers are available for procesisng inputs. If each worker is to -- process the same set of inputs, consider 'fanout' and -- 'fanoutSteps'. If each worker is to process a disjoint set of -- inputs, consider 'scatter'. module Data.Machine.Concurrent (module Data.Machine, -- * Concurrent connection (>~>), (<~<), -- * Buffered machines bufferConnect, rollingConnect, -- * Concurrent processing of shared inputs fanout, fanoutSteps, -- * Concurrent multiple-input machines wye, tee, scatter, splitSum, mergeSum, splitProd) where #if defined(__GLASGOW_HASKELL__) && __GLASGOW_HASKELL__ < 710 import Control.Applicative #endif import Control.Concurrent.Async.Lifted import Control.Monad (join) import Control.Monad.Trans.Control import Data.Machine hiding (tee, wye) import Data.Machine.Concurrent.AsyncStep import Data.Machine.Concurrent.Buffer import Data.Machine.Concurrent.Fanout import Data.Machine.Concurrent.Scatter import Data.Machine.Concurrent.Wye import Data.Machine.Concurrent.Tee -- | Build a new 'Machine' by adding a 'Process' to the output of an -- old 'Machine'. The upstream machine is run concurrently with -- downstream with the aim that upstream will have a yielded value -- ready as soon as downstream awaits. This effectively creates a -- buffer between upstream and downstream, or source and sink, that -- can contain up to one value. -- -- @ -- ('<~<') :: 'Process' b c -> 'Process' a b -> 'Process' a c -- ('<~<') :: 'Process' c d -> 'Data.Machine.Tee.Tee' a b c -> 'Data.Machine.Tee.Tee' a b d -- ('<~<') :: 'Process' b c -> 'Machine' k b -> 'Machine' k c -- @ (<~<) :: MonadBaseControl IO m => ProcessT m b c -> MachineT m k b -> MachineT m k c mp <~< ma = racers ma mp -- | Flipped ('<~<'). (>~>) :: MonadBaseControl IO m => MachineT m k b -> ProcessT m b c -> MachineT m k c ma >~> mp = mp <~< ma infixl 7 >~> -- | We want the first available response. waitEither' :: MonadBaseControl IO m => Maybe (Async (StM m a)) -> Async (StM m b) -> m (Either a b) waitEither' Nothing y = Right <$> wait y waitEither' (Just x) y = waitEither x y -- | Let a source and a sink chase each other, providing an effective -- one-element buffer between the two. The idea is to run both -- concurrently at all times so that as soon as the sink 'Await's, we -- have a source-yielded value to provide it. This, of course, -- involves eagerly running the source, percolating its 'Await's up -- the chain as soon as possible. racers :: forall m k a b. MonadBaseControl IO m => MachineT m k a -> ProcessT m a b -> MachineT m k b racers src snk = MachineT . join $ go <$> (Just <$> asyncRun src) <*> asyncRun snk where go :: Maybe (AsyncStep m k a) -> AsyncStep m (Is a) b -> m (MachineStep m k b) go srcA snkA = waitEither' srcA snkA >>= \n -> case n of Left (Stop :: MachineStep m k a) -> go Nothing snkA Left (Yield o k) -> wait snkA >>= \m -> case m of (Stop :: MachineStep m (Is a) b) -> return Stop Yield o' k' -> return . Yield o' . MachineT . flushDown k' $ \f -> join $ go <$> (Just <$> asyncRun k) <*> asyncRun (f o) Await f Refl _ -> join $ go <$> (Just <$> asyncRun k) <*> asyncRun (f o) Left (Await g kg fg) -> asyncAwait g kg fg $ MachineT . flip go snkA . Just Right (Stop :: MachineStep m (Is a) b) -> return Stop Right (Yield o k) -> asyncRun k >>= return . Yield o . MachineT . go srcA Right (Await f Refl ff) -> case srcA of Nothing -> asyncRun ff >>= go Nothing Just src' -> wait src' >>= \m -> case m of Stop -> return Stop Yield o k -> join $ go <$> (Just <$> asyncRun k) <*> asyncRun (f o) a -> feedUp (encased a) $ \o k -> join $ go <$> (Just <$> asyncRun k) <*> asyncRun (f o) -- If we have an upstream source value ready, we must flush -- all available values yielded by downstream until it awaits. flushDown :: ProcessT m a b -> ((a -> ProcessT m a b) -> m (MachineStep m k b)) -> m (MachineStep m k b) flushDown m k = runMachineT m >>= \s -> case s of Stop -> return Stop Yield o m' -> return . Yield o . MachineT $ flushDown m' k Await f Refl _ -> k f -- If downstream is awaiting an input, we must pull in all -- necessary upstream awaits until we have a yielded value to -- push downstream. feedUp :: MachineT m k a -> (a -> MachineT m k a -> m (MachineStep m k b)) -> m (MachineStep m k b) feedUp m k = runMachineT m >>= \s -> case s of Stop -> return Stop Yield o m' -> k o m' Await g kg fg -> return $ awaitStep g kg fg (MachineT . flip feedUp k)