{-# LANGUAGE GADTs #-} -- | Provide a notion of fanout wherein a single input is passed to -- several consumers. module Data.Machine.Fanout (fanout, fanoutSteps) where import Control.Applicative import Control.Arrow import Control.Monad (foldM) import Data.Machine import Data.Maybe (catMaybes) import Data.Monoid import Data.Semigroup (Semigroup(sconcat)) import Data.List.NonEmpty (NonEmpty((:|))) -- | Feed a value to a 'ProcessT' at an 'Await' 'Step'. If the -- 'ProcessT' is awaiting a value, then its next step is -- returned. Otherwise, the original process is returned. feed :: Monad m => a -> ProcessT m a b -> m (Step (Is a) b (ProcessT m a b)) feed x m = runMachineT m >>= \v -> case v of Await f Refl _ -> runMachineT (f x) s -> return s -- | Like 'Data.List.mapAccumL' but with a monadic accumulating -- function. mapAccumLM :: (Functor m, Monad m) => (acc -> x -> m (acc, y)) -> acc -> [x] -> m (acc, [y]) mapAccumLM f z = fmap (second ($ [])) . foldM aux (z,id) where aux (acc,ys) x = second ((. ys) . (:)) <$> f acc x -- | Exhaust a sequence of all successive 'Yield' steps taken by a -- 'MachineT'. Returns the list of yielded values and the next -- (non-Yield) step of the machine. flushYields :: Monad m => Step k o (MachineT m k o) -> m ([o], Maybe (MachineT m k o)) flushYields = go id where go rs (Yield o s) = runMachineT s >>= go ((o:) . rs) go rs Stop = return (rs [], Nothing) go rs s = return (rs [], Just $ encased s) -- | Share inputs with each of a list of processes in lockstep. Any -- values yielded by the processes are combined into a single yield -- from the composite process. fanout :: (Functor m, Monad m, Semigroup r) => [ProcessT m a r] -> ProcessT m a r fanout xs = encased $ Await (MachineT . aux) Refl (fanout xs) where aux y = do (rs,xs') <- mapM (feed y) xs >>= mapAccumLM yields [] let nxt = fanout $ catMaybes xs' case rs of [] -> runMachineT nxt (r:rs') -> return $ Yield (sconcat $ r :| rs') nxt yields rs Stop = return (rs,Nothing) yields rs y@(Yield _ _) = first (++ rs) <$> flushYields y yields rs a@(Await _ _ _) = return (rs, Just $ encased a) -- | Share inputs with each of a list of processes in lockstep. If -- none of the processes yields a value, the composite process will -- itself yield 'mempty'. The idea is to provide a handle on steps -- only executed for their side effects. For instance, if you want to -- run a collection of 'ProcessT's that await but don't yield some -- number of times, you can use 'fanOutSteps . map (fmap (const ()))' -- followed by a 'taking' process. fanoutSteps :: (Functor m, Monad m, Monoid r) => [ProcessT m a r] -> ProcessT m a r fanoutSteps xs = encased $ Await (MachineT . aux) Refl (fanoutSteps xs) where aux y = do (rs,xs') <- mapM (feed y) xs >>= mapAccumLM yields [] let nxt = fanoutSteps $ catMaybes xs' if null rs then return $ Yield mempty nxt else return $ Yield (mconcat rs) nxt yields rs Stop = return (rs,Nothing) yields rs y@(Yield _ _) = first (++rs) <$> flushYields y yields rs a@(Await _ _ _) = return (rs, Just $ encased a)