{-# LANGUAGE CPP #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}

-- | Provide a notion of fanout wherein a single input is passed to
-- several consumers.
module Data.Machine.Fanout (fanout, fanoutSteps) where

import           Data.List.NonEmpty (NonEmpty (..))
import           Data.Machine
import           Data.Semigroup     (Semigroup (sconcat))
#if __GLASGOW_HASKELL__  < 710
import           Data.Monoid        (Monoid (..))
import           Data.Traversable   (traverse)
#endif

continue :: ([b] -> r) -> [(a -> b, b)] -> Step (Is a) o r
continue :: ([b] -> r) -> [(a -> b, b)] -> Step (Is a) o r
continue [b] -> r
_ [] = Step (Is a) o r
forall (k :: * -> *) o r. Step k o r
Stop
continue [b] -> r
f [(a -> b, b)]
ws = (a -> r) -> Is a a -> r -> Step (Is a) o r
forall (k :: * -> *) o r t. (t -> r) -> k t -> r -> Step k o r
Await ([b] -> r
f ([b] -> r) -> (a -> [b]) -> a -> r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((a -> b, b) -> a -> b) -> [(a -> b, b)] -> a -> [b]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (a -> b, b) -> a -> b
forall a b. (a, b) -> a
fst [(a -> b, b)]
ws) Is a a
forall a. Is a a
Refl ([b] -> r
f ([b] -> r) -> [b] -> r
forall a b. (a -> b) -> a -> b
$ ((a -> b, b) -> b) -> [(a -> b, b)] -> [b]
forall a b. (a -> b) -> [a] -> [b]
map (a -> b, b) -> b
forall a b. (a, b) -> b
snd [(a -> b, b)]
ws)

semigroupDlist :: Semigroup a => ([a] -> [a]) -> Maybe a
semigroupDlist :: ([a] -> [a]) -> Maybe a
semigroupDlist [a] -> [a]
f = case [a] -> [a]
f [] of
  [] -> Maybe a
forall a. Maybe a
Nothing
  a
x:[a]
xs -> a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> a -> Maybe a
forall a b. (a -> b) -> a -> b
$ NonEmpty a -> a
forall a. Semigroup a => NonEmpty a -> a
sconcat (a
xa -> [a] -> NonEmpty a
forall a. a -> [a] -> NonEmpty a
:|[a]
xs)

-- | Share inputs with each of a list of processes in lockstep. Any
-- values yielded by the processes are combined into a single yield
-- from the composite process.
fanout :: forall m a r. (Monad m, Semigroup r)
       => [ProcessT m a r] -> ProcessT m a r
fanout :: [ProcessT m a r] -> ProcessT m a r
fanout = m (Step (Is a) r (ProcessT m a r)) -> ProcessT m a r
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is a) r (ProcessT m a r)) -> ProcessT m a r)
-> ([ProcessT m a r] -> m (Step (Is a) r (ProcessT m a r)))
-> [ProcessT m a r]
-> ProcessT m a r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([(a -> ProcessT m a r, ProcessT m a r)]
 -> [(a -> ProcessT m a r, ProcessT m a r)])
-> ([r] -> [r])
-> [ProcessT m a r]
-> m (Step (Is a) r (ProcessT m a r))
go [(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
forall a. a -> a
id [r] -> [r]
forall a. a -> a
id
  where
    go :: ([(a -> ProcessT m a r, ProcessT m a r)]
       -> [(a -> ProcessT m a r, ProcessT m a r)])
       -> ([r] -> [r])
       -> [ProcessT m a r]
       -> m (Step (Is a) r (ProcessT m a r))
    go :: ([(a -> ProcessT m a r, ProcessT m a r)]
 -> [(a -> ProcessT m a r, ProcessT m a r)])
-> ([r] -> [r])
-> [ProcessT m a r]
-> m (Step (Is a) r (ProcessT m a r))
go [(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
waiting [r] -> [r]
acc [] = case [(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
waiting [] of
      [(a -> ProcessT m a r, ProcessT m a r)]
ws -> Step (Is a) r (ProcessT m a r)
-> m (Step (Is a) r (ProcessT m a r))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Is a) r (ProcessT m a r)
 -> m (Step (Is a) r (ProcessT m a r)))
-> (Maybe r -> Step (Is a) r (ProcessT m a r))
-> Maybe r
-> m (Step (Is a) r (ProcessT m a r))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Step (Is a) r (ProcessT m a r)
-> (r -> Step (Is a) r (ProcessT m a r))
-> Maybe r
-> Step (Is a) r (ProcessT m a r)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe Step (Is a) r (ProcessT m a r)
k (\r
x -> r -> ProcessT m a r -> Step (Is a) r (ProcessT m a r)
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield r
x (ProcessT m a r -> Step (Is a) r (ProcessT m a r))
-> ProcessT m a r -> Step (Is a) r (ProcessT m a r)
forall a b. (a -> b) -> a -> b
$ Step (Is a) r (ProcessT m a r) -> ProcessT m a r
forall (m :: * -> *) (k :: * -> *) o.
Monad m =>
Step k o (MachineT m k o) -> MachineT m k o
encased Step (Is a) r (ProcessT m a r)
k) (Maybe r -> m (Step (Is a) r (ProcessT m a r)))
-> Maybe r -> m (Step (Is a) r (ProcessT m a r))
forall a b. (a -> b) -> a -> b
$ ([r] -> [r]) -> Maybe r
forall a. Semigroup a => ([a] -> [a]) -> Maybe a
semigroupDlist [r] -> [r]
acc
        where k :: Step (Is a) r (ProcessT m a r)
k = ([ProcessT m a r] -> ProcessT m a r)
-> [(a -> ProcessT m a r, ProcessT m a r)]
-> Step (Is a) r (ProcessT m a r)
forall b r a o. ([b] -> r) -> [(a -> b, b)] -> Step (Is a) o r
continue [ProcessT m a r] -> ProcessT m a r
forall (m :: * -> *) a r.
(Monad m, Semigroup r) =>
[ProcessT m a r] -> ProcessT m a r
fanout [(a -> ProcessT m a r, ProcessT m a r)]
ws
    go [(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
waiting [r] -> [r]
acc (ProcessT m a r
m:[ProcessT m a r]
ms) = ProcessT m a r -> m (Step (Is a) r (ProcessT m a r))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT ProcessT m a r
m m (Step (Is a) r (ProcessT m a r))
-> (Step (Is a) r (ProcessT m a r)
    -> m (Step (Is a) r (ProcessT m a r)))
-> m (Step (Is a) r (ProcessT m a r))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Step (Is a) r (ProcessT m a r)
v -> case Step (Is a) r (ProcessT m a r)
v of
      Step (Is a) r (ProcessT m a r)
Stop           -> ([(a -> ProcessT m a r, ProcessT m a r)]
 -> [(a -> ProcessT m a r, ProcessT m a r)])
-> ([r] -> [r])
-> [ProcessT m a r]
-> m (Step (Is a) r (ProcessT m a r))
go [(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
waiting [r] -> [r]
acc [ProcessT m a r]
ms
      Yield r
x ProcessT m a r
k      -> ([(a -> ProcessT m a r, ProcessT m a r)]
 -> [(a -> ProcessT m a r, ProcessT m a r)])
-> ([r] -> [r])
-> [ProcessT m a r]
-> m (Step (Is a) r (ProcessT m a r))
go [(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
waiting ([r] -> [r]
acc ([r] -> [r]) -> ([r] -> [r]) -> [r] -> [r]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (r
xr -> [r] -> [r]
forall a. a -> [a] -> [a]
:)) (ProcessT m a r
kProcessT m a r -> [ProcessT m a r] -> [ProcessT m a r]
forall a. a -> [a] -> [a]
:[ProcessT m a r]
ms)
      Await t -> ProcessT m a r
f Is a t
Refl ProcessT m a r
k -> ([(a -> ProcessT m a r, ProcessT m a r)]
 -> [(a -> ProcessT m a r, ProcessT m a r)])
-> ([r] -> [r])
-> [ProcessT m a r]
-> m (Step (Is a) r (ProcessT m a r))
go ([(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
waiting ([(a -> ProcessT m a r, ProcessT m a r)]
 -> [(a -> ProcessT m a r, ProcessT m a r)])
-> ([(a -> ProcessT m a r, ProcessT m a r)]
    -> [(a -> ProcessT m a r, ProcessT m a r)])
-> [(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((a -> ProcessT m a r
t -> ProcessT m a r
f, ProcessT m a r
k)(a -> ProcessT m a r, ProcessT m a r)
-> [(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
forall a. a -> [a] -> [a]
:)) [r] -> [r]
acc [ProcessT m a r]
ms

-- | Share inputs with each of a list of processes in lockstep. If
-- none of the processes yields a value, the composite process will
-- itself yield 'mempty'. The idea is to provide a handle on steps
-- only executed for their side effects. For instance, if you want to
-- run a collection of 'ProcessT's that await but don't yield some
-- number of times, you can use 'fanOutSteps . map (fmap (const ()))'
-- followed by a 'taking' process.
fanoutSteps :: forall m a r. (Monad m, Monoid r)
            => [ProcessT m a r] -> ProcessT m a r
fanoutSteps :: [ProcessT m a r] -> ProcessT m a r
fanoutSteps = m (Step (Is a) r (ProcessT m a r)) -> ProcessT m a r
forall (m :: * -> *) (k :: * -> *) o.
m (Step k o (MachineT m k o)) -> MachineT m k o
MachineT (m (Step (Is a) r (ProcessT m a r)) -> ProcessT m a r)
-> ([ProcessT m a r] -> m (Step (Is a) r (ProcessT m a r)))
-> [ProcessT m a r]
-> ProcessT m a r
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([(a -> ProcessT m a r, ProcessT m a r)]
 -> [(a -> ProcessT m a r, ProcessT m a r)])
-> ([r] -> [r])
-> [ProcessT m a r]
-> m (Step (Is a) r (ProcessT m a r))
go [(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
forall a. a -> a
id [r] -> [r]
forall a. a -> a
id
  where
    go :: ([(a -> ProcessT m a r, ProcessT m a r)]
       -> [(a -> ProcessT m a r, ProcessT m a r)])
       -> ([r] -> [r])
       -> [ProcessT m a r]
       -> m (Step (Is a) r (ProcessT m a r))
    go :: ([(a -> ProcessT m a r, ProcessT m a r)]
 -> [(a -> ProcessT m a r, ProcessT m a r)])
-> ([r] -> [r])
-> [ProcessT m a r]
-> m (Step (Is a) r (ProcessT m a r))
go [(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
waiting [r] -> [r]
acc [] = case ([(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
waiting [], [r] -> r
forall a. Monoid a => [a] -> a
mconcat ([r] -> [r]
acc [])) of
      ([(a -> ProcessT m a r, ProcessT m a r)]
ws, r
xs) -> Step (Is a) r (ProcessT m a r)
-> m (Step (Is a) r (ProcessT m a r))
forall (m :: * -> *) a. Monad m => a -> m a
return (Step (Is a) r (ProcessT m a r)
 -> m (Step (Is a) r (ProcessT m a r)))
-> (ProcessT m a r -> Step (Is a) r (ProcessT m a r))
-> ProcessT m a r
-> m (Step (Is a) r (ProcessT m a r))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. r -> ProcessT m a r -> Step (Is a) r (ProcessT m a r)
forall (k :: * -> *) o r. o -> r -> Step k o r
Yield r
xs (ProcessT m a r -> m (Step (Is a) r (ProcessT m a r)))
-> ProcessT m a r -> m (Step (Is a) r (ProcessT m a r))
forall a b. (a -> b) -> a -> b
$ Step (Is a) r (ProcessT m a r) -> ProcessT m a r
forall (m :: * -> *) (k :: * -> *) o.
Monad m =>
Step k o (MachineT m k o) -> MachineT m k o
encased (([ProcessT m a r] -> ProcessT m a r)
-> [(a -> ProcessT m a r, ProcessT m a r)]
-> Step (Is a) r (ProcessT m a r)
forall b r a o. ([b] -> r) -> [(a -> b, b)] -> Step (Is a) o r
continue [ProcessT m a r] -> ProcessT m a r
forall (m :: * -> *) a r.
(Monad m, Monoid r) =>
[ProcessT m a r] -> ProcessT m a r
fanoutSteps [(a -> ProcessT m a r, ProcessT m a r)]
ws)
    go [(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
waiting [r] -> [r]
acc (ProcessT m a r
m:[ProcessT m a r]
ms) = ProcessT m a r -> m (Step (Is a) r (ProcessT m a r))
forall (m :: * -> *) (k :: * -> *) o.
MachineT m k o -> m (Step k o (MachineT m k o))
runMachineT ProcessT m a r
m m (Step (Is a) r (ProcessT m a r))
-> (Step (Is a) r (ProcessT m a r)
    -> m (Step (Is a) r (ProcessT m a r)))
-> m (Step (Is a) r (ProcessT m a r))
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Step (Is a) r (ProcessT m a r)
v -> case Step (Is a) r (ProcessT m a r)
v of
      Step (Is a) r (ProcessT m a r)
Stop           -> ([(a -> ProcessT m a r, ProcessT m a r)]
 -> [(a -> ProcessT m a r, ProcessT m a r)])
-> ([r] -> [r])
-> [ProcessT m a r]
-> m (Step (Is a) r (ProcessT m a r))
go [(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
waiting [r] -> [r]
acc [ProcessT m a r]
ms
      Yield r
x ProcessT m a r
k      -> ([(a -> ProcessT m a r, ProcessT m a r)]
 -> [(a -> ProcessT m a r, ProcessT m a r)])
-> ([r] -> [r])
-> [ProcessT m a r]
-> m (Step (Is a) r (ProcessT m a r))
go [(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
waiting ([r] -> [r]
acc ([r] -> [r]) -> ([r] -> [r]) -> [r] -> [r]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (r
xr -> [r] -> [r]
forall a. a -> [a] -> [a]
:)) (ProcessT m a r
kProcessT m a r -> [ProcessT m a r] -> [ProcessT m a r]
forall a. a -> [a] -> [a]
:[ProcessT m a r]
ms)
      Await t -> ProcessT m a r
f Is a t
Refl ProcessT m a r
k -> ([(a -> ProcessT m a r, ProcessT m a r)]
 -> [(a -> ProcessT m a r, ProcessT m a r)])
-> ([r] -> [r])
-> [ProcessT m a r]
-> m (Step (Is a) r (ProcessT m a r))
go ([(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
waiting ([(a -> ProcessT m a r, ProcessT m a r)]
 -> [(a -> ProcessT m a r, ProcessT m a r)])
-> ([(a -> ProcessT m a r, ProcessT m a r)]
    -> [(a -> ProcessT m a r, ProcessT m a r)])
-> [(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ((a -> ProcessT m a r
t -> ProcessT m a r
f, ProcessT m a r
k)(a -> ProcessT m a r, ProcessT m a r)
-> [(a -> ProcessT m a r, ProcessT m a r)]
-> [(a -> ProcessT m a r, ProcessT m a r)]
forall a. a -> [a] -> [a]
:)) [r] -> [r]
acc [ProcessT m a r]
ms