{-# LANGUAGE FlexibleContexts #-}

-- |
-- Module     : Simulation.Aivika.Trans.Stream
-- Copyright  : Copyright (c) 2009-2017, David Sorokin <david.sorokin@gmail.com>
-- License    : BSD3
-- Maintainer : David Sorokin <david.sorokin@gmail.com>
-- Stability  : experimental
-- Tested with: GHC 8.0.1
--
-- The infinite stream of data in time.
--
module Simulation.Aivika.Trans.Stream
       (-- * Stream Type
        Stream(..),
        -- * Merging and Splitting Stream
        emptyStream,
        mergeStreams,
        mergeQueuedStreams,
        mergePriorityStreams,
        concatStreams,
        concatQueuedStreams,
        concatPriorityStreams,
        splitStream,
        splitStreamQueueing,
        splitStreamPrioritising,
        splitStreamFiltering,
        splitStreamFilteringQueueing,
        -- * Specifying Identifier
        streamUsingId,
        -- * Prefetching and Delaying Stream
        prefetchStream,
        delayStream,
        -- * Stream Arriving
        arrivalStream,
        -- * Memoizing, Zipping and Uzipping Stream
        memoStream,
        zipStreamSeq,
        zipStreamParallel,
        zip3StreamSeq,
        zip3StreamParallel,
        unzipStream,
        streamSeq,
        streamParallel,
        -- * Consuming and Sinking Stream
        consumeStream,
        sinkStream,
        -- * Useful Combinators
        repeatProcess,
        mapStream,
        mapStreamM,
        accumStream,
        apStream,
        apStreamM,
        filterStream,
        filterStreamM,
        takeStream,
        takeStreamWhile,
        takeStreamWhileM,
        dropStream,
        dropStreamWhile,
        dropStreamWhileM,
        singletonStream,
        joinStream,
        -- * Failover
        failoverStream,
        -- * Integrating with Signals
        signalStream,
        streamSignal,
        queuedSignalStream,
        -- * Utilities
        leftStream,
        rightStream,
        replaceLeftStream,
        replaceRightStream,
        partitionEitherStream,
        -- * Assemblying Streams
        cloneStream,
        firstArrivalStream,
        lastArrivalStream,
        assembleAccumStream,
        -- * Debugging
        traceStream) where

import Data.Maybe
import Data.Monoid hiding ((<>))
import Data.Semigroup (Semigroup(..))
import Data.List.NonEmpty (NonEmpty((:|)))

import Control.Applicative
import Control.Monad
import Control.Monad.Trans

import Simulation.Aivika.Trans.Ref.Base
import Simulation.Aivika.Trans.DES
import Simulation.Aivika.Trans.Parameter
import Simulation.Aivika.Trans.Simulation
import Simulation.Aivika.Trans.Dynamics
import Simulation.Aivika.Trans.Event
import Simulation.Aivika.Trans.Composite
import Simulation.Aivika.Trans.Cont
import Simulation.Aivika.Trans.Process
import Simulation.Aivika.Trans.Signal
import Simulation.Aivika.Trans.Resource.Base
import Simulation.Aivika.Trans.QueueStrategy
import qualified Simulation.Aivika.Trans.Queue.Infinite.Base as IQ
import Simulation.Aivika.Arrival (Arrival(..))

-- | Represents an infinite stream of data in time,
-- some kind of never-ending cons cell.
newtype Stream m a = Cons { Stream m a -> Process m (a, Stream m a)
runStream :: Process m (a, Stream m a)
                            -- ^ Run the stream.
                          }

instance MonadDES m => Functor (Stream m) where

  {-# INLINE fmap #-}
  fmap :: (a -> b) -> Stream m a -> Stream m b
fmap = (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream

instance MonadDES m => Applicative (Stream m) where

  {-# INLINE pure #-}
  pure :: a -> Stream m a
pure a
a = let y :: Stream m a
y = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons ((a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
y)) in Stream m a
y
  
  {-# INLINE (<*>) #-}
  <*> :: Stream m (a -> b) -> Stream m a -> Stream m b
(<*>) = Stream m (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a -> b) -> Stream m a -> Stream m b
apStream

instance MonadDES m => Alternative (Stream m) where

  {-# INLINE empty #-}
  empty :: Stream m a
empty = Stream m a
forall (m :: * -> *) a. MonadDES m => Stream m a
emptyStream

  {-# INLINE (<|>) #-}
  <|> :: Stream m a -> Stream m a -> Stream m a
(<|>) = Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Stream m a -> Stream m a
mergeStreams

instance MonadDES m => Semigroup (Stream m a) where

  {-# INLINE (<>) #-}
  <> :: Stream m a -> Stream m a -> Stream m a
(<>) = Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Stream m a -> Stream m a
mergeStreams

  {-# INLINE sconcat #-}
  sconcat :: NonEmpty (Stream m a) -> Stream m a
sconcat (Stream m a
h :| [Stream m a]
t) = [Stream m a] -> Stream m a
forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m a
concatStreams (Stream m a
h Stream m a -> [Stream m a] -> [Stream m a]
forall a. a -> [a] -> [a]
: [Stream m a]
t)

instance MonadDES m => Monoid (Stream m a) where

  {-# INLINE mempty #-}
  mempty :: Stream m a
mempty  = Stream m a
forall (m :: * -> *) a. MonadDES m => Stream m a
emptyStream

  {-# INLINE mappend #-}
  mappend :: Stream m a -> Stream m a -> Stream m a
mappend = Stream m a -> Stream m a -> Stream m a
forall a. Semigroup a => a -> a -> a
(<>)

  {-# INLINE mconcat #-}
  mconcat :: [Stream m a] -> Stream m a
mconcat = [Stream m a] -> Stream m a
forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m a
concatStreams

-- | Create a stream that will use the specified process identifier.
-- It can be useful to refer to the underlying 'Process' computation which
-- can be passivated, interrupted, canceled and so on. See also the
-- 'processUsingId' function for more details.
streamUsingId :: MonadDES m => ProcessId m -> Stream m a -> Stream m a
{-# INLINABLE streamUsingId #-}
streamUsingId :: ProcessId m -> Stream m a -> Stream m a
streamUsingId ProcessId m
pid (Cons Process m (a, Stream m a)
s) =
  Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ ProcessId m
-> Process m (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a.
MonadDES m =>
ProcessId m -> Process m a -> Process m a
processUsingId ProcessId m
pid Process m (a, Stream m a)
s

-- | Memoize the stream so that it would always return the same data
-- within the simulation run.
memoStream :: MonadDES m => Stream m a -> Simulation m (Stream m a)
{-# INLINABLE memoStream #-}
memoStream :: Stream m a -> Simulation m (Stream m a)
memoStream (Cons Process m (a, Stream m a)
s) =
  do Process m (a, Stream m a)
p <- Process m (a, Stream m a)
-> Simulation m (Process m (a, Stream m a))
forall (m :: * -> *) a.
MonadDES m =>
Process m a -> Simulation m (Process m a)
memoProcess (Process m (a, Stream m a)
 -> Simulation m (Process m (a, Stream m a)))
-> Process m (a, Stream m a)
-> Simulation m (Process m (a, Stream m a))
forall a b. (a -> b) -> a -> b
$
          do ~(a
x, Stream m a
xs) <- Process m (a, Stream m a)
s
             Stream m a
xs' <- Simulation m (Stream m a) -> Process m (Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Stream m a) -> Process m (Stream m a))
-> Simulation m (Stream m a) -> Process m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Stream m a -> Simulation m (Stream m a)
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Simulation m (Stream m a)
memoStream Stream m a
xs
             (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
x, Stream m a
xs')
     Stream m a -> Simulation m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
p)

-- | Zip two streams trying to get data sequentially.
zipStreamSeq :: MonadDES m => Stream m a -> Stream m b -> Stream m (a, b)
{-# INLINABLE zipStreamSeq #-}
zipStreamSeq :: Stream m a -> Stream m b -> Stream m (a, b)
zipStreamSeq (Cons Process m (a, Stream m a)
sa) (Cons Process m (b, Stream m b)
sb) = Process m ((a, b), Stream m (a, b)) -> Stream m (a, b)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ((a, b), Stream m (a, b))
y where
  y :: Process m ((a, b), Stream m (a, b))
y = do ~(a
x, Stream m a
xs) <- Process m (a, Stream m a)
sa
         ~(b
y, Stream m b
ys) <- Process m (b, Stream m b)
sb
         ((a, b), Stream m (a, b)) -> Process m ((a, b), Stream m (a, b))
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y), Stream m a -> Stream m b -> Stream m (a, b)
forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamSeq Stream m a
xs Stream m b
ys)

-- | Zip two streams trying to get data as soon as possible,
-- launching the sub-processes in parallel.
zipStreamParallel :: MonadDES m => Stream m a -> Stream m b -> Stream m (a, b)
{-# INLINABLE zipStreamParallel #-}
zipStreamParallel :: Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel (Cons Process m (a, Stream m a)
sa) (Cons Process m (b, Stream m b)
sb) = Process m ((a, b), Stream m (a, b)) -> Stream m (a, b)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ((a, b), Stream m (a, b))
y where
  y :: Process m ((a, b), Stream m (a, b))
y = do ~((a
x, Stream m a
xs), (b
y, Stream m b
ys)) <- Process m (a, Stream m a)
-> Process m (b, Stream m b)
-> Process m ((a, Stream m a), (b, Stream m b))
forall (m :: * -> *) a b.
MonadDES m =>
Process m a -> Process m b -> Process m (a, b)
zipProcessParallel Process m (a, Stream m a)
sa Process m (b, Stream m b)
sb
         ((a, b), Stream m (a, b)) -> Process m ((a, b), Stream m (a, b))
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y), Stream m a -> Stream m b -> Stream m (a, b)
forall (m :: * -> *) a b.
MonadDES m =>
Stream m a -> Stream m b -> Stream m (a, b)
zipStreamParallel Stream m a
xs Stream m b
ys)

-- | Zip three streams trying to get data sequentially.
zip3StreamSeq :: MonadDES m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
{-# INLINABLE zip3StreamSeq #-}
zip3StreamSeq :: Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamSeq (Cons Process m (a, Stream m a)
sa) (Cons Process m (b, Stream m b)
sb) (Cons Process m (c, Stream m c)
sc) = Process m ((a, b, c), Stream m (a, b, c)) -> Stream m (a, b, c)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ((a, b, c), Stream m (a, b, c))
y where
  y :: Process m ((a, b, c), Stream m (a, b, c))
y = do ~(a
x, Stream m a
xs) <- Process m (a, Stream m a)
sa
         ~(b
y, Stream m b
ys) <- Process m (b, Stream m b)
sb
         ~(c
z, Stream m c
zs) <- Process m (c, Stream m c)
sc
         ((a, b, c), Stream m (a, b, c))
-> Process m ((a, b, c), Stream m (a, b, c))
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y, c
z), Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
forall (m :: * -> *) a b c.
MonadDES m =>
Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamSeq Stream m a
xs Stream m b
ys Stream m c
zs)

-- | Zip three streams trying to get data as soon as possible,
-- launching the sub-processes in parallel.
zip3StreamParallel :: MonadDES m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
{-# INLINABLE zip3StreamParallel #-}
zip3StreamParallel :: Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamParallel (Cons Process m (a, Stream m a)
sa) (Cons Process m (b, Stream m b)
sb) (Cons Process m (c, Stream m c)
sc) = Process m ((a, b, c), Stream m (a, b, c)) -> Stream m (a, b, c)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ((a, b, c), Stream m (a, b, c))
y where
  y :: Process m ((a, b, c), Stream m (a, b, c))
y = do ~((a
x, Stream m a
xs), (b
y, Stream m b
ys), (c
z, Stream m c
zs)) <- Process m (a, Stream m a)
-> Process m (b, Stream m b)
-> Process m (c, Stream m c)
-> Process m ((a, Stream m a), (b, Stream m b), (c, Stream m c))
forall (m :: * -> *) a b c.
MonadDES m =>
Process m a -> Process m b -> Process m c -> Process m (a, b, c)
zip3ProcessParallel Process m (a, Stream m a)
sa Process m (b, Stream m b)
sb Process m (c, Stream m c)
sc
         ((a, b, c), Stream m (a, b, c))
-> Process m ((a, b, c), Stream m (a, b, c))
forall (m :: * -> *) a. Monad m => a -> m a
return ((a
x, b
y, c
z), Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
forall (m :: * -> *) a b c.
MonadDES m =>
Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
zip3StreamParallel Stream m a
xs Stream m b
ys Stream m c
zs)

-- | Unzip the stream.
unzipStream :: MonadDES m => Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
{-# INLINABLE unzipStream #-}
unzipStream :: Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
unzipStream Stream m (a, b)
s =
  do Stream m (a, b)
s' <- Stream m (a, b) -> Simulation m (Stream m (a, b))
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Simulation m (Stream m a)
memoStream Stream m (a, b)
s
     let sa :: Stream m a
sa = ((a, b) -> a) -> Stream m (a, b) -> Stream m a
forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream (a, b) -> a
forall a b. (a, b) -> a
fst Stream m (a, b)
s'
         sb :: Stream m b
sb = ((a, b) -> b) -> Stream m (a, b) -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream (a, b) -> b
forall a b. (a, b) -> b
snd Stream m (a, b)
s'
     (Stream m a, Stream m b) -> Simulation m (Stream m a, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a
sa, Stream m b
sb)

-- | To form each new portion of data for the output stream,
-- read data sequentially from the input streams.
--
-- This is a generalization of 'zipStreamSeq'.
streamSeq :: MonadDES m => [Stream m a] -> Stream m [a]
{-# INLINABLE streamSeq #-}
streamSeq :: [Stream m a] -> Stream m [a]
streamSeq [Stream m a]
xs = Process m ([a], Stream m [a]) -> Stream m [a]
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ([a], Stream m [a])
y where
  y :: Process m ([a], Stream m [a])
y = do [(a, Stream m a)]
ps <- [Stream m a]
-> (Stream m a -> Process m (a, Stream m a))
-> Process m [(a, Stream m a)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Stream m a]
xs Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream
         ([a], Stream m [a]) -> Process m ([a], Stream m [a])
forall (m :: * -> *) a. Monad m => a -> m a
return (((a, Stream m a) -> a) -> [(a, Stream m a)] -> [a]
forall a b. (a -> b) -> [a] -> [b]
map (a, Stream m a) -> a
forall a b. (a, b) -> a
fst [(a, Stream m a)]
ps, [Stream m a] -> Stream m [a]
forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m [a]
streamSeq ([Stream m a] -> Stream m [a]) -> [Stream m a] -> Stream m [a]
forall a b. (a -> b) -> a -> b
$ ((a, Stream m a) -> Stream m a)
-> [(a, Stream m a)] -> [Stream m a]
forall a b. (a -> b) -> [a] -> [b]
map (a, Stream m a) -> Stream m a
forall a b. (a, b) -> b
snd [(a, Stream m a)]
ps)

-- | To form each new portion of data for the output stream,
-- read data from the input streams in parallel.
--
-- This is a generalization of 'zipStreamParallel'.
streamParallel :: MonadDES m => [Stream m a] -> Stream m [a]
{-# INLINABLE streamParallel #-}
streamParallel :: [Stream m a] -> Stream m [a]
streamParallel [Stream m a]
xs = Process m ([a], Stream m [a]) -> Stream m [a]
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m ([a], Stream m [a])
y where
  y :: Process m ([a], Stream m [a])
y = do [(a, Stream m a)]
ps <- [Process m (a, Stream m a)] -> Process m [(a, Stream m a)]
forall (m :: * -> *) a.
MonadDES m =>
[Process m a] -> Process m [a]
processParallel ([Process m (a, Stream m a)] -> Process m [(a, Stream m a)])
-> [Process m (a, Stream m a)] -> Process m [(a, Stream m a)]
forall a b. (a -> b) -> a -> b
$ (Stream m a -> Process m (a, Stream m a))
-> [Stream m a] -> [Process m (a, Stream m a)]
forall a b. (a -> b) -> [a] -> [b]
map Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream [Stream m a]
xs
         ([a], Stream m [a]) -> Process m ([a], Stream m [a])
forall (m :: * -> *) a. Monad m => a -> m a
return (((a, Stream m a) -> a) -> [(a, Stream m a)] -> [a]
forall a b. (a -> b) -> [a] -> [b]
map (a, Stream m a) -> a
forall a b. (a, b) -> a
fst [(a, Stream m a)]
ps, [Stream m a] -> Stream m [a]
forall (m :: * -> *) a. MonadDES m => [Stream m a] -> Stream m [a]
streamParallel ([Stream m a] -> Stream m [a]) -> [Stream m a] -> Stream m [a]
forall a b. (a -> b) -> a -> b
$ ((a, Stream m a) -> Stream m a)
-> [(a, Stream m a)] -> [Stream m a]
forall a b. (a -> b) -> [a] -> [b]
map (a, Stream m a) -> Stream m a
forall a b. (a, b) -> b
snd [(a, Stream m a)]
ps)

-- | Return a stream of values generated by the specified process.
repeatProcess :: MonadDES m => Process m a -> Stream m a
{-# INLINABLE repeatProcess #-}
repeatProcess :: Process m a -> Stream m a
repeatProcess Process m a
p = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
y where
  y :: Process m (a, Stream m a)
y = do a
a <- Process m a
p
         (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
p)

-- | Map the stream according the specified function.
mapStream :: MonadDES m => (a -> b) -> Stream m a -> Stream m b
{-# INLINABLE mapStream #-}
mapStream :: (a -> b) -> Stream m a -> Stream m b
mapStream a -> b
f (Cons Process m (a, Stream m a)
s) = Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (b, Stream m b)
y where
  y :: Process m (b, Stream m b)
y = do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
         (b, Stream m b) -> Process m (b, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b
f a
a, (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream a -> b
f Stream m a
xs)

-- | Compose the stream.
mapStreamM :: MonadDES m => (a -> Process m b) -> Stream m a -> Stream m b
{-# INLINABLE mapStreamM #-}
mapStreamM :: (a -> Process m b) -> Stream m a -> Stream m b
mapStreamM a -> Process m b
f (Cons Process m (a, Stream m a)
s) = Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (b, Stream m b)
y where
  y :: Process m (b, Stream m b)
y = do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
         b
b <- a -> Process m b
f a
a
         (b, Stream m b) -> Process m (b, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, (a -> Process m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
(a -> Process m b) -> Stream m a -> Stream m b
mapStreamM a -> Process m b
f Stream m a
xs)

-- | Accumulator that outputs a value determined by the supplied function.
accumStream :: MonadDES m => (acc -> a -> Process m (acc, b)) -> acc -> Stream m a -> Stream m b
{-# INLINABLE accumStream #-}
accumStream :: (acc -> a -> Process m (acc, b)) -> acc -> Stream m a -> Stream m b
accumStream acc -> a -> Process m (acc, b)
f acc
acc Stream m a
xs = Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> Process m (b, Stream m b) -> Stream m b
forall a b. (a -> b) -> a -> b
$ Stream m a -> acc -> Process m (b, Stream m b)
loop Stream m a
xs acc
acc where
  loop :: Stream m a -> acc -> Process m (b, Stream m b)
loop (Cons Process m (a, Stream m a)
s) acc
acc =
    do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
       (acc
acc', b
b) <- acc -> a -> Process m (acc, b)
f acc
acc a
a
       (b, Stream m b) -> Process m (b, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b
b, Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (b, Stream m b) -> Stream m b)
-> Process m (b, Stream m b) -> Stream m b
forall a b. (a -> b) -> a -> b
$ Stream m a -> acc -> Process m (b, Stream m b)
loop Stream m a
xs acc
acc') 

-- | Sequential application.
apStream :: MonadDES m => Stream m (a -> b) -> Stream m a -> Stream m b
{-# INLINABLE apStream #-}
apStream :: Stream m (a -> b) -> Stream m a -> Stream m b
apStream (Cons Process m (a -> b, Stream m (a -> b))
sf) (Cons Process m (a, Stream m a)
sa) = Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (b, Stream m b)
y where
  y :: Process m (b, Stream m b)
y = do (a -> b
f, Stream m (a -> b)
sf') <- Process m (a -> b, Stream m (a -> b))
sf
         (a
a, Stream m a
sa') <- Process m (a, Stream m a)
sa
         (b, Stream m b) -> Process m (b, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> b
f a
a, Stream m (a -> b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a -> b) -> Stream m a -> Stream m b
apStream Stream m (a -> b)
sf' Stream m a
sa')

-- | Sequential application.
apStreamM :: MonadDES m => Stream m (a -> Process m b) -> Stream m a -> Stream m b
{-# INLINABLE apStreamM #-}
apStreamM :: Stream m (a -> Process m b) -> Stream m a -> Stream m b
apStreamM (Cons Process m (a -> Process m b, Stream m (a -> Process m b))
sf) (Cons Process m (a, Stream m a)
sa) = Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (b, Stream m b)
y where
  y :: Process m (b, Stream m b)
y = do (a -> Process m b
f, Stream m (a -> Process m b)
sf') <- Process m (a -> Process m b, Stream m (a -> Process m b))
sf
         (a
a, Stream m a
sa') <- Process m (a, Stream m a)
sa
         b
x <- a -> Process m b
f a
a
         (b, Stream m b) -> Process m (b, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b
x, Stream m (a -> Process m b) -> Stream m a -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (a -> Process m b) -> Stream m a -> Stream m b
apStreamM Stream m (a -> Process m b)
sf' Stream m a
sa')

-- | Filter only those data values that satisfy to the specified predicate.
filterStream :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
{-# INLINABLE filterStream #-}
filterStream :: (a -> Bool) -> Stream m a -> Stream m a
filterStream a -> Bool
p (Cons Process m (a, Stream m a)
s) = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
y where
  y :: Process m (a, Stream m a)
y = do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
         if a -> Bool
p a
a
           then (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
filterStream a -> Bool
p Stream m a
xs)
           else let Cons Process m (a, Stream m a)
z = (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
filterStream a -> Bool
p Stream m a
xs in Process m (a, Stream m a)
z

-- | Filter only those data values that satisfy to the specified predicate.
filterStreamM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
{-# INLINABLE filterStreamM #-}
filterStreamM :: (a -> Process m Bool) -> Stream m a -> Stream m a
filterStreamM a -> Process m Bool
p (Cons Process m (a, Stream m a)
s) = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
y where
  y :: Process m (a, Stream m a)
y = do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
         Bool
b <- a -> Process m Bool
p a
a
         if Bool
b
           then (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, (a -> Process m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m Bool) -> Stream m a -> Stream m a
filterStreamM a -> Process m Bool
p Stream m a
xs)
           else let Cons Process m (a, Stream m a)
z = (a -> Process m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m Bool) -> Stream m a -> Stream m a
filterStreamM a -> Process m Bool
p Stream m a
xs in Process m (a, Stream m a)
z

-- | The stream of 'Left' values.
leftStream :: MonadDES m => Stream m (Either a b) -> Stream m a
{-# INLINABLE leftStream #-}
leftStream :: Stream m (Either a b) -> Stream m a
leftStream (Cons Process m (Either a b, Stream m (Either a b))
s) = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
y where
  y :: Process m (a, Stream m a)
y = do (Either a b
a, Stream m (Either a b)
xs) <- Process m (Either a b, Stream m (Either a b))
s
         case Either a b
a of
           Left a
a  -> (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m (Either a b) -> Stream m a
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m a
leftStream Stream m (Either a b)
xs)
           Right b
_ -> let Cons Process m (a, Stream m a)
z = Stream m (Either a b) -> Stream m a
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m a
leftStream Stream m (Either a b)
xs in Process m (a, Stream m a)
z

-- | The stream of 'Right' values.
rightStream :: MonadDES m => Stream m (Either a b) -> Stream m b
{-# INLINABLE rightStream #-}
rightStream :: Stream m (Either a b) -> Stream m b
rightStream (Cons Process m (Either a b, Stream m (Either a b))
s) = Process m (b, Stream m b) -> Stream m b
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (b, Stream m b)
y where
  y :: Process m (b, Stream m b)
y = do (Either a b
a, Stream m (Either a b)
xs) <- Process m (Either a b, Stream m (Either a b))
s
         case Either a b
a of
           Left a
_  -> let Cons Process m (b, Stream m b)
z = Stream m (Either a b) -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m b
rightStream Stream m (Either a b)
xs in Process m (b, Stream m b)
z
           Right b
a -> (b, Stream m b) -> Process m (b, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (b
a, Stream m (Either a b) -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m b
rightStream Stream m (Either a b)
xs)

-- | Replace the 'Left' values.
replaceLeftStream :: MonadDES m => Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
{-# INLINABLE replaceLeftStream #-}
replaceLeftStream :: Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
replaceLeftStream (Cons Process m (Either a b, Stream m (Either a b))
sab) (ys0 :: Stream m c
ys0@(Cons Process m (c, Stream m c)
sc)) = Process m (Either c b, Stream m (Either c b))
-> Stream m (Either c b)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (Either c b, Stream m (Either c b))
z where
  z :: Process m (Either c b, Stream m (Either c b))
z = do (Either a b
a, Stream m (Either a b)
xs) <- Process m (Either a b, Stream m (Either a b))
sab
         case Either a b
a of
           Left a
_ ->
             do (c
b, Stream m c
ys) <- Process m (c, Stream m c)
sc
                (Either c b, Stream m (Either c b))
-> Process m (Either c b, Stream m (Either c b))
forall (m :: * -> *) a. Monad m => a -> m a
return (c -> Either c b
forall a b. a -> Either a b
Left c
b, Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
replaceLeftStream Stream m (Either a b)
xs Stream m c
ys)
           Right b
a ->
             (Either c b, Stream m (Either c b))
-> Process m (Either c b, Stream m (Either c b))
forall (m :: * -> *) a. Monad m => a -> m a
return (b -> Either c b
forall a b. b -> Either a b
Right b
a, Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
replaceLeftStream Stream m (Either a b)
xs Stream m c
ys0)

-- | Replace the 'Right' values.
replaceRightStream :: MonadDES m => Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
{-# INLINABLE replaceRightStream #-}
replaceRightStream :: Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
replaceRightStream (Cons Process m (Either a b, Stream m (Either a b))
sab) (ys0 :: Stream m c
ys0@(Cons Process m (c, Stream m c)
sc)) = Process m (Either a c, Stream m (Either a c))
-> Stream m (Either a c)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (Either a c, Stream m (Either a c))
z where
  z :: Process m (Either a c, Stream m (Either a c))
z = do (Either a b
a, Stream m (Either a b)
xs) <- Process m (Either a b, Stream m (Either a b))
sab
         case Either a b
a of
           Right b
_ ->
             do (c
b, Stream m c
ys) <- Process m (c, Stream m c)
sc
                (Either a c, Stream m (Either a c))
-> Process m (Either a c, Stream m (Either a c))
forall (m :: * -> *) a. Monad m => a -> m a
return (c -> Either a c
forall a b. b -> Either a b
Right c
b, Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
replaceRightStream Stream m (Either a b)
xs Stream m c
ys)
           Left a
a ->
             (Either a c, Stream m (Either a c))
-> Process m (Either a c, Stream m (Either a c))
forall (m :: * -> *) a. Monad m => a -> m a
return (a -> Either a c
forall a b. a -> Either a b
Left a
a, Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
forall (m :: * -> *) a b c.
MonadDES m =>
Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
replaceRightStream Stream m (Either a b)
xs Stream m c
ys0)

-- | Partition the stream of 'Either' values into two streams.
partitionEitherStream :: MonadDES m => Stream m (Either a b) -> Simulation m (Stream m a, Stream m b)
{-# INLINABLE partitionEitherStream #-}
partitionEitherStream :: Stream m (Either a b) -> Simulation m (Stream m a, Stream m b)
partitionEitherStream Stream m (Either a b)
s =
  do Stream m (Either a b)
s' <- Stream m (Either a b) -> Simulation m (Stream m (Either a b))
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Simulation m (Stream m a)
memoStream Stream m (Either a b)
s
     (Stream m a, Stream m b) -> Simulation m (Stream m a, Stream m b)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m (Either a b) -> Stream m a
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m a
leftStream Stream m (Either a b)
s', Stream m (Either a b) -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
Stream m (Either a b) -> Stream m b
rightStream Stream m (Either a b)
s')

-- | Split the input stream into the specified number of output streams
-- after applying the 'FCFS' strategy for enqueuing the output requests.
splitStream :: MonadDES m => Int -> Stream m a -> Simulation m [Stream m a]
{-# INLINABLE splitStream #-}
splitStream :: Int -> Stream m a -> Simulation m [Stream m a]
splitStream = FCFS -> Int -> Stream m a -> Simulation m [Stream m a]
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> Int -> Stream m a -> Simulation m [Stream m a]
splitStreamQueueing FCFS
FCFS

-- | Split the input stream into the specified number of output streams.
--
-- If you don't know what the strategy to apply, then you probably
-- need the 'FCFS' strategy, or function 'splitStream' that
-- does namely this.
splitStreamQueueing :: (MonadDES m, EnqueueStrategy m s)
                       => s
                       -- ^ the strategy applied for enqueuing the output requests
                       -> Int
                       -- ^ the number of output streams
                       -> Stream m a
                       -- ^ the input stream
                       -> Simulation m [Stream m a]
                       -- ^ the splitted output streams
{-# INLINABLE splitStreamQueueing #-}
splitStreamQueueing :: s -> Int -> Stream m a -> Simulation m [Stream m a]
splitStreamQueueing s
s Int
n Stream m a
x =
  do Ref m (Stream m a)
ref <- Stream m a -> Simulation m (Ref m (Stream m a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Stream m a
x
     Resource m s
res <- s -> Int -> Simulation m (Resource m s)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Simulation m (Resource m s)
newResource s
s Int
1
     let reader :: Process m a
reader =
           Resource m s -> Process m a -> Process m a
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m a -> Process m a
usingResource Resource m s
res (Process m a -> Process m a) -> Process m a -> Process m a
forall a b. (a -> b) -> a -> b
$
           do Stream m a
p <- Event m (Stream m a) -> Process m (Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Stream m a) -> Process m (Stream m a))
-> Event m (Stream m a) -> Process m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Ref m (Stream m a) -> Event m (Stream m a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Stream m a)
ref
              (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p
              Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Stream m a) -> Stream m a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Stream m a)
ref Stream m a
xs
              a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
     [Stream m a] -> Simulation m [Stream m a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Stream m a] -> Simulation m [Stream m a])
-> [Stream m a] -> Simulation m [Stream m a]
forall a b. (a -> b) -> a -> b
$ (Int -> Stream m a) -> [Int] -> [Stream m a]
forall a b. (a -> b) -> [a] -> [b]
map (\Int
i -> Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
reader) [Int
1..Int
n]

-- | Split the input stream into a list of output streams
-- using the specified priorities.
splitStreamPrioritising :: (MonadDES m, PriorityQueueStrategy m s p)
                           => s
                           -- ^ the strategy applied for enqueuing the output requests
                           -> [Stream m p]
                           -- ^ the streams of priorities
                           -> Stream m a
                           -- ^ the input stream
                           -> Simulation m [Stream m a]
                           -- ^ the splitted output streams
{-# INLINABLE splitStreamPrioritising #-}
splitStreamPrioritising :: s -> [Stream m p] -> Stream m a -> Simulation m [Stream m a]
splitStreamPrioritising s
s [Stream m p]
ps Stream m a
x =
  do Ref m (Stream m a)
ref <- Stream m a -> Simulation m (Ref m (Stream m a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Stream m a
x
     Resource m s
res <- s -> Int -> Simulation m (Resource m s)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Simulation m (Resource m s)
newResource s
s Int
1
     let stream :: Stream m a -> Stream m a
stream (Cons Process m (a, Stream m a)
p) = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
z where
           z :: Process m (a, Stream m a)
z = do (a
p', Stream m a
ps) <- Process m (a, Stream m a)
p
                  a
a <- Resource m s -> a -> Process m a -> Process m a
forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
Resource m s -> p -> Process m a -> Process m a
usingResourceWithPriority Resource m s
res a
p' (Process m a -> Process m a) -> Process m a -> Process m a
forall a b. (a -> b) -> a -> b
$
                       do Stream m a
p <- Event m (Stream m a) -> Process m (Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Stream m a) -> Process m (Stream m a))
-> Event m (Stream m a) -> Process m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Ref m (Stream m a) -> Event m (Stream m a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Stream m a)
ref
                          (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p
                          Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Stream m a) -> Stream m a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Stream m a)
ref Stream m a
xs
                          a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
                  (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a -> Stream m a
stream Stream m a
ps)
     [Stream m a] -> Simulation m [Stream m a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Stream m a] -> Simulation m [Stream m a])
-> [Stream m a] -> Simulation m [Stream m a]
forall a b. (a -> b) -> a -> b
$ (Stream m p -> Stream m a) -> [Stream m p] -> [Stream m a]
forall a b. (a -> b) -> [a] -> [b]
map Stream m p -> Stream m a
forall a. PriorityQueueStrategy m s a => Stream m a -> Stream m a
stream [Stream m p]
ps

-- | Split the input stream into the specified number of output streams
-- after filtering and applying the 'FCFS' strategy for enqueuing the output requests.
splitStreamFiltering :: MonadDES m => [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
{-# INLINABLE splitStreamFiltering #-}
splitStreamFiltering :: [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
splitStreamFiltering = FCFS
-> [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
splitStreamFilteringQueueing FCFS
FCFS

-- | Split the input stream into the specified number of output streams after filtering.
--
-- If you don't know what the strategy to apply, then you probably
-- need the 'FCFS' strategy, or function 'splitStreamFiltering' that
-- does namely this.
splitStreamFilteringQueueing :: (MonadDES m, EnqueueStrategy m s)
                                => s
                                -- ^ the strategy applied for enqueuing the output requests
                                -> [a -> Event m Bool]
                                -- ^ the filters for output streams
                                -> Stream m a
                                -- ^ the input stream
                                -> Simulation m [Stream m a]
                                -- ^ the splitted output streams
{-# INLINABLE splitStreamFilteringQueueing #-}
splitStreamFilteringQueueing :: s -> [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
splitStreamFilteringQueueing s
s [a -> Event m Bool]
preds Stream m a
x =
  do Ref m (Stream m a)
ref <- Simulation m (Ref m (Stream m a))
-> Simulation m (Ref m (Stream m a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Ref m (Stream m a))
 -> Simulation m (Ref m (Stream m a)))
-> Simulation m (Ref m (Stream m a))
-> Simulation m (Ref m (Stream m a))
forall a b. (a -> b) -> a -> b
$ Stream m a -> Simulation m (Ref m (Stream m a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Stream m a
x
     Resource m s
res <- s -> Int -> Simulation m (Resource m s)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Simulation m (Resource m s)
newResource s
s Int
1
     let reader :: (a -> Event m Bool) -> Process m a
reader a -> Event m Bool
pred =
           do Maybe a
a <-
                Resource m s -> Process m (Maybe a) -> Process m (Maybe a)
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m a -> Process m a
usingResource Resource m s
res (Process m (Maybe a) -> Process m (Maybe a))
-> Process m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$
                do Stream m a
p <- Event m (Stream m a) -> Process m (Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Stream m a) -> Process m (Stream m a))
-> Event m (Stream m a) -> Process m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Ref m (Stream m a) -> Event m (Stream m a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Stream m a)
ref
                   (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p
                   Event m (Maybe a) -> Process m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Maybe a) -> Process m (Maybe a))
-> Event m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$
                     do Bool
f <- a -> Event m Bool
pred a
a
                        if Bool
f
                          then do Ref m (Stream m a) -> Stream m a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Stream m a)
ref Stream m a
xs
                                  Maybe a -> Event m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> Event m (Maybe a)) -> Maybe a -> Event m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
a
                          else do Ref m (Stream m a) -> Stream m a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Stream m a)
ref (Stream m a -> Event m ()) -> Stream m a -> Event m ()
forall a b. (a -> b) -> a -> b
$ Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons ((a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
xs))
                                  Maybe a -> Event m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
              case Maybe a
a of
                Just a
a  -> a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
                Maybe a
Nothing -> (a -> Event m Bool) -> Process m a
reader a -> Event m Bool
pred
     [Stream m a] -> Simulation m [Stream m a]
forall (m :: * -> *) a. Monad m => a -> m a
return ([Stream m a] -> Simulation m [Stream m a])
-> [Stream m a] -> Simulation m [Stream m a]
forall a b. (a -> b) -> a -> b
$ ((a -> Event m Bool) -> Stream m a)
-> [a -> Event m Bool] -> [Stream m a]
forall a b. (a -> b) -> [a] -> [b]
map (Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess (Process m a -> Stream m a)
-> ((a -> Event m Bool) -> Process m a)
-> (a -> Event m Bool)
-> Stream m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (a -> Event m Bool) -> Process m a
reader) [a -> Event m Bool]
preds

-- | Concatenate the input streams applying the 'FCFS' strategy and
-- producing one output stream.
concatStreams :: MonadDES m => [Stream m a] -> Stream m a
{-# INLINABLE concatStreams #-}
concatStreams :: [Stream m a] -> Stream m a
concatStreams = FCFS -> [Stream m a] -> Stream m a
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> [Stream m a] -> Stream m a
concatQueuedStreams FCFS
FCFS

-- | Concatenate the input streams producing one output stream.
--
-- If you don't know what the strategy to apply, then you probably
-- need the 'FCFS' strategy, or function 'concatStreams' that
-- does namely this.
concatQueuedStreams :: (MonadDES m, EnqueueStrategy m s)
                       => s
                       -- ^ the strategy applied for enqueuing the input data
                       -> [Stream m a]
                       -- ^ the input stream
                       -> Stream m a
                       -- ^ the combined output stream
{-# INLINABLE concatQueuedStreams #-}
concatQueuedStreams :: s -> [Stream m a] -> Stream m a
concatQueuedStreams s
s [Stream m a]
streams = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
z where
  z :: Process m (a, Stream m a)
z = do Resource m FCFS
reading <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
         Resource m s
writing <- Simulation m (Resource m s) -> Process m (Resource m s)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m s) -> Process m (Resource m s))
-> Simulation m (Resource m s) -> Process m (Resource m s)
forall a b. (a -> b) -> a -> b
$ s -> Int -> Maybe Int -> Simulation m (Resource m s)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount s
s Int
1 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
         Resource m FCFS
conting <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
         Ref m (Maybe a)
ref <- Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a)))
-> Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall a b. (a -> b) -> a -> b
$ Maybe a -> Simulation m (Ref m (Maybe a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Maybe a
forall a. Maybe a
Nothing
         let writer :: Stream m a -> Process m b
writer Stream m a
p =
               do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p
                  Resource m s -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m s
writing
                  Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
                  Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
reading
                  Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
conting
                  Stream m a -> Process m b
writer Stream m a
xs
             reader :: Process m a
reader =
               do Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
reading
                  Just a
a <- Event m (Maybe a) -> Process m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Maybe a) -> Process m (Maybe a))
-> Event m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Event m (Maybe a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Maybe a)
ref
                  Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing
                  Resource m s -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m s
writing
                  a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
         [Stream m a] -> (Stream m a -> Process m ()) -> Process m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Stream m a]
streams ((Stream m a -> Process m ()) -> Process m ())
-> (Stream m a -> Process m ()) -> Process m ()
forall a b. (a -> b) -> a -> b
$ Process m () -> Process m ()
forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess (Process m () -> Process m ())
-> (Stream m a -> Process m ()) -> Stream m a -> Process m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m a -> Process m ()
forall b. Stream m a -> Process m b
writer
         a
a <- Process m a
reader
         let xs :: Stream m a
xs = Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess (Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
conting Process m () -> Process m a -> Process m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process m a
reader)
         (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
xs)

-- | Concatenate the input priority streams producing one output stream.
concatPriorityStreams :: (MonadDES m, PriorityQueueStrategy m s p)
                         => s
                         -- ^ the strategy applied for enqueuing the input data
                         -> [Stream m (p, a)]
                         -- ^ the input stream
                         -> Stream m a
                         -- ^ the combined output stream
{-# INLINABLE concatPriorityStreams #-}
concatPriorityStreams :: s -> [Stream m (p, a)] -> Stream m a
concatPriorityStreams s
s [Stream m (p, a)]
streams = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
z where
  z :: Process m (a, Stream m a)
z = do Resource m FCFS
reading <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
         Resource m s
writing <- Simulation m (Resource m s) -> Process m (Resource m s)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m s) -> Process m (Resource m s))
-> Simulation m (Resource m s) -> Process m (Resource m s)
forall a b. (a -> b) -> a -> b
$ s -> Int -> Maybe Int -> Simulation m (Resource m s)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount s
s Int
1 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
         Resource m FCFS
conting <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
         Ref m (Maybe a)
ref <- Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a)))
-> Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall a b. (a -> b) -> a -> b
$ Maybe a -> Simulation m (Ref m (Maybe a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Maybe a
forall a. Maybe a
Nothing
         let writer :: Stream m (p, a) -> Process m b
writer Stream m (p, a)
p =
               do ((p
priority, a
a), Stream m (p, a)
xs) <- Stream m (p, a) -> Process m ((p, a), Stream m (p, a))
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m (p, a)
p
                  Resource m s -> p -> Process m ()
forall (m :: * -> *) s p.
(MonadDES m, PriorityQueueStrategy m s p) =>
Resource m s -> p -> Process m ()
requestResourceWithPriority Resource m s
writing p
priority
                  Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
                  Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
reading
                  Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
conting
                  Stream m (p, a) -> Process m b
writer Stream m (p, a)
xs
             reader :: Process m a
reader =
               do Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
reading
                  Just a
a <- Event m (Maybe a) -> Process m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Maybe a) -> Process m (Maybe a))
-> Event m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Event m (Maybe a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Maybe a)
ref
                  Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing
                  Resource m s -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m s
writing
                  a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
         [Stream m (p, a)]
-> (Stream m (p, a) -> Process m ()) -> Process m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Stream m (p, a)]
streams ((Stream m (p, a) -> Process m ()) -> Process m ())
-> (Stream m (p, a) -> Process m ()) -> Process m ()
forall a b. (a -> b) -> a -> b
$ Process m () -> Process m ()
forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess (Process m () -> Process m ())
-> (Stream m (p, a) -> Process m ())
-> Stream m (p, a)
-> Process m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Stream m (p, a) -> Process m ()
forall p b.
PriorityQueueStrategy m s p =>
Stream m (p, a) -> Process m b
writer
         a
a <- Process m a
reader
         let xs :: Stream m a
xs = Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess (Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
conting Process m () -> Process m a -> Process m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process m a
reader)
         (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
xs)

-- | Merge two streams applying the 'FCFS' strategy for enqueuing the input data.
mergeStreams :: MonadDES m => Stream m a -> Stream m a -> Stream m a
{-# INLINABLE mergeStreams #-}
mergeStreams :: Stream m a -> Stream m a -> Stream m a
mergeStreams = FCFS -> Stream m a -> Stream m a -> Stream m a
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> Stream m a -> Stream m a -> Stream m a
mergeQueuedStreams FCFS
FCFS

-- | Merge two streams.
--
-- If you don't know what the strategy to apply, then you probably
-- need the 'FCFS' strategy, or function 'mergeStreams' that
-- does namely this.
mergeQueuedStreams :: (MonadDES m, EnqueueStrategy m s)
                      => s
                      -- ^ the strategy applied for enqueuing the input data
                      -> Stream m a
                      -- ^ the fist input stream
                      -> Stream m a
                      -- ^ the second input stream
                      -> Stream m a
                      -- ^ the output combined stream
{-# INLINABLE mergeQueuedStreams #-}
mergeQueuedStreams :: s -> Stream m a -> Stream m a -> Stream m a
mergeQueuedStreams s
s Stream m a
x Stream m a
y = s -> [Stream m a] -> Stream m a
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
s -> [Stream m a] -> Stream m a
concatQueuedStreams s
s [Stream m a
x, Stream m a
y]

-- | Merge two priority streams.
mergePriorityStreams :: (MonadDES m, PriorityQueueStrategy m s p)
                        => s
                        -- ^ the strategy applied for enqueuing the input data
                        -> Stream m (p, a)
                        -- ^ the fist input stream
                        -> Stream m (p, a)
                        -- ^ the second input stream
                        -> Stream m a
                        -- ^ the output combined stream
{-# INLINABLE mergePriorityStreams #-}
mergePriorityStreams :: s -> Stream m (p, a) -> Stream m (p, a) -> Stream m a
mergePriorityStreams s
s Stream m (p, a)
x Stream m (p, a)
y = s -> [Stream m (p, a)] -> Stream m a
forall (m :: * -> *) s p a.
(MonadDES m, PriorityQueueStrategy m s p) =>
s -> [Stream m (p, a)] -> Stream m a
concatPriorityStreams s
s [Stream m (p, a)
x, Stream m (p, a)
y]

-- | An empty stream that never returns data.
emptyStream :: MonadDES m => Stream m a
{-# INLINABLE emptyStream #-}
emptyStream :: Stream m a
emptyStream = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
forall (m :: * -> *) a. MonadDES m => Process m a
neverProcess

-- | Consume the stream. It returns a process that infinitely reads data
-- from the stream and then redirects them to the provided function.
-- It is useful for modeling the process of enqueueing data in the queue
-- from the input stream.
consumeStream :: MonadDES m => (a -> Process m ()) -> Stream m a -> Process m ()
{-# INLINABLE consumeStream #-}
consumeStream :: (a -> Process m ()) -> Stream m a -> Process m ()
consumeStream a -> Process m ()
f (Cons Process m (a, Stream m a)
s) =
  do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
     a -> Process m ()
f a
a
     (a -> Process m ()) -> Stream m a -> Process m ()
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream a -> Process m ()
f Stream m a
xs

-- | Sink the stream. It returns a process that infinitely reads data
-- from the stream. The resulting computation can be a moving force
-- to simulate the whole system of the interconnected streams and
-- processors.
sinkStream :: MonadDES m => Stream m a -> Process m ()
{-# INLINABLE sinkStream #-}
sinkStream :: Stream m a -> Process m ()
sinkStream (Cons Process m (a, Stream m a)
s) =
  do (a
a, Stream m a
xs) <- Process m (a, Stream m a)
s
     Stream m a -> Process m ()
forall (m :: * -> *) a. MonadDES m => Stream m a -> Process m ()
sinkStream Stream m a
xs
  
-- | Prefetch the input stream requesting for one more data item in advance 
-- while the last received item is not yet fully processed in the chain of 
-- streams, usually by the processors.
--
-- You can think of this as the prefetched stream could place its latest 
-- data item in some temporary space for later use, which is very useful 
-- for modeling a sequence of separate and independent work places.
prefetchStream :: MonadDES m => Stream m a -> Stream m a
{-# INLINABLE prefetchStream #-}
prefetchStream :: Stream m a -> Stream m a
prefetchStream Stream m a
s = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
z where
  z :: Process m (a, Stream m a)
z = do Resource m FCFS
reading <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
         Resource m FCFS
writing <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
1 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
         Ref m (Maybe a)
ref <- Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a)))
-> Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall a b. (a -> b) -> a -> b
$ Maybe a -> Simulation m (Ref m (Maybe a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Maybe a
forall a. Maybe a
Nothing
         let writer :: Stream m a -> Process m b
writer Stream m a
p =
               do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p
                  Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
writing
                  Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
                  Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
reading
                  Stream m a -> Process m b
writer Stream m a
xs
             reader :: Process m a
reader =
               do Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
reading
                  Just a
a <- Event m (Maybe a) -> Process m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Maybe a) -> Process m (Maybe a))
-> Event m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Event m (Maybe a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Maybe a)
ref
                  Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing
                  Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
writing
                  a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
         Process m () -> Process m ()
forall (m :: * -> *). MonadDES m => Process m () -> Process m ()
spawnProcess (Process m () -> Process m ()) -> Process m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Stream m a -> Process m ()
forall b. Stream m a -> Process m b
writer Stream m a
s
         Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m a -> Process m (a, Stream m a))
-> Stream m a -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$ Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
reader

-- | Like 'signalStream' but allows specifying an arbitrary queue instead of the unbounded queue.
queuedSignalStream :: MonadDES m
                      => (a -> Event m ())
                      -- ^ enqueue
                      -> Process m a
                      -- ^ dequeue
                      -> Signal m a
                      -- ^ the input signal
                      -> Composite m (Stream m a)
                      -- ^ the output stream
{-# INLINABLE queuedSignalStream #-}
queuedSignalStream :: (a -> Event m ())
-> Process m a -> Signal m a -> Composite m (Stream m a)
queuedSignalStream a -> Event m ()
enqueue Process m a
dequeue Signal m a
s =
  do DisposableEvent m
h <- Event m (DisposableEvent m) -> Composite m (DisposableEvent m)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (DisposableEvent m) -> Composite m (DisposableEvent m))
-> Event m (DisposableEvent m) -> Composite m (DisposableEvent m)
forall a b. (a -> b) -> a -> b
$
          Signal m a -> (a -> Event m ()) -> Event m (DisposableEvent m)
forall (m :: * -> *) a.
Signal m a -> (a -> Event m ()) -> Event m (DisposableEvent m)
handleSignal Signal m a
s a -> Event m ()
enqueue
     DisposableEvent m -> Composite m ()
forall (m :: * -> *).
Monad m =>
DisposableEvent m -> Composite m ()
disposableComposite DisposableEvent m
h
     Stream m a -> Composite m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> Composite m (Stream m a))
-> Stream m a -> Composite m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
dequeue

-- | Return a stream of values triggered by the specified signal.
--
-- Since the time at which the values of the stream are requested for may differ from
-- the time at which the signal is triggered, it can be useful to apply the 'arrivalSignal'
-- function to add the information about the time points at which the signal was 
-- actually received.
--
-- The point is that the 'Stream' is requested outside, while the 'Signal' is triggered
-- inside. They are different by nature. The former is passive, while the latter is active.
--
-- The resulting stream may be a root of space leak as it uses an internal unbounded queue to store
-- the values received from the signal. The oldest value is dequeued each time we request
-- the stream and it is returned within the computation. Consider using 'queuedSignalStream' that
-- allows specifying the bounded queue in case of need.
signalStream :: MonadDES m => Signal m a -> Composite m (Stream m a)
{-# INLINABLE signalStream #-}
signalStream :: Signal m a -> Composite m (Stream m a)
signalStream Signal m a
s =
  do FCFSQueue m a
q <- Simulation m (FCFSQueue m a) -> Composite m (FCFSQueue m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation Simulation m (FCFSQueue m a)
forall (m :: * -> *) a. MonadDES m => Simulation m (FCFSQueue m a)
IQ.newFCFSQueue
     (a -> Event m ())
-> Process m a -> Signal m a -> Composite m (Stream m a)
forall (m :: * -> *) a.
MonadDES m =>
(a -> Event m ())
-> Process m a -> Signal m a -> Composite m (Stream m a)
queuedSignalStream (FCFSQueue m a -> a -> Event m ()
forall (m :: * -> *) sm so a.
(MonadDES m, EnqueueStrategy m sm, DequeueStrategy m so) =>
Queue m sm so a -> a -> Event m ()
IQ.enqueue FCFSQueue m a
q) (FCFSQueue m a -> Process m a
forall (m :: * -> *) sm so a.
(MonadDES m, DequeueStrategy m sm, EnqueueStrategy m so) =>
Queue m sm so a -> Process m a
IQ.dequeue FCFSQueue m a
q) Signal m a
s

-- | Return a computation of the signal that triggers values from the specified stream,
-- each time the next value of the stream is received within the underlying 'Process' 
-- computation.
streamSignal :: MonadDES m => Stream m a -> Composite m (Signal m a)
{-# INLINABLE streamSignal #-}
streamSignal :: Stream m a -> Composite m (Signal m a)
streamSignal Stream m a
z =
  do SignalSource m a
s <- Simulation m (SignalSource m a) -> Composite m (SignalSource m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation Simulation m (SignalSource m a)
forall (m :: * -> *) a.
MonadDES m =>
Simulation m (SignalSource m a)
newSignalSource
     ProcessId m
pid <- Simulation m (ProcessId m) -> Composite m (ProcessId m)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation Simulation m (ProcessId m)
forall (m :: * -> *). MonadDES m => Simulation m (ProcessId m)
newProcessId
     Event m () -> Composite m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Composite m ()) -> Event m () -> Composite m ()
forall a b. (a -> b) -> a -> b
$
       ProcessId m -> Process m () -> Event m ()
forall (m :: * -> *).
MonadDES m =>
ProcessId m -> Process m () -> Event m ()
runProcessUsingId ProcessId m
pid (Process m () -> Event m ()) -> Process m () -> Event m ()
forall a b. (a -> b) -> a -> b
$
       (a -> Process m ()) -> Stream m a -> Process m ()
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m ()) -> Stream m a -> Process m ()
consumeStream (Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ())
-> (a -> Event m ()) -> a -> Process m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. SignalSource m a -> a -> Event m ()
forall (m :: * -> *) a. SignalSource m a -> a -> Event m ()
triggerSignal SignalSource m a
s) Stream m a
z
     DisposableEvent m -> Composite m ()
forall (m :: * -> *).
Monad m =>
DisposableEvent m -> Composite m ()
disposableComposite (DisposableEvent m -> Composite m ())
-> DisposableEvent m -> Composite m ()
forall a b. (a -> b) -> a -> b
$
       Event m () -> DisposableEvent m
forall (m :: * -> *). Event m () -> DisposableEvent m
DisposableEvent (Event m () -> DisposableEvent m)
-> Event m () -> DisposableEvent m
forall a b. (a -> b) -> a -> b
$
       ProcessId m -> Event m ()
forall (m :: * -> *). MonadDES m => ProcessId m -> Event m ()
cancelProcessWithId ProcessId m
pid
     Signal m a -> Composite m (Signal m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Signal m a -> Composite m (Signal m a))
-> Signal m a -> Composite m (Signal m a)
forall a b. (a -> b) -> a -> b
$ SignalSource m a -> Signal m a
forall (m :: * -> *) a. SignalSource m a -> Signal m a
publishSignal SignalSource m a
s

-- | Transform a stream so that the resulting stream returns a sequence of arrivals
-- saving the information about the time points at which the original stream items 
-- were received by demand.
arrivalStream :: MonadDES m => Stream m a -> Stream m (Arrival a)
{-# INLINABLE arrivalStream #-}
arrivalStream :: Stream m a -> Stream m (Arrival a)
arrivalStream Stream m a
s = Process m (Arrival a, Stream m (Arrival a)) -> Stream m (Arrival a)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (Arrival a, Stream m (Arrival a))
 -> Stream m (Arrival a))
-> Process m (Arrival a, Stream m (Arrival a))
-> Stream m (Arrival a)
forall a b. (a -> b) -> a -> b
$ Stream m a
-> Maybe Double -> Process m (Arrival a, Stream m (Arrival a))
forall (m :: * -> *) a.
MonadDES m =>
Stream m a
-> Maybe Double -> Process m (Arrival a, Stream m (Arrival a))
loop Stream m a
s Maybe Double
forall a. Maybe a
Nothing where
  loop :: Stream m a
-> Maybe Double -> Process m (Arrival a, Stream m (Arrival a))
loop Stream m a
s Maybe Double
t0 = do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
                 Double
t <- Dynamics m Double -> Process m Double
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
DynamicsLift t m =>
Dynamics m a -> t m a
liftDynamics Dynamics m Double
forall (m :: * -> *). Monad m => Dynamics m Double
time
                 let b :: Arrival a
b = Arrival :: forall a. a -> Double -> Maybe Double -> Arrival a
Arrival { arrivalValue :: a
arrivalValue = a
a,
                                   arrivalTime :: Double
arrivalTime  = Double
t,
                                   arrivalDelay :: Maybe Double
arrivalDelay =
                                     case Maybe Double
t0 of
                                       Maybe Double
Nothing -> Maybe Double
forall a. Maybe a
Nothing
                                       Just t0 -> Double -> Maybe Double
forall a. a -> Maybe a
Just (Double
t Double -> Double -> Double
forall a. Num a => a -> a -> a
- Double
t0) }
                 (Arrival a, Stream m (Arrival a))
-> Process m (Arrival a, Stream m (Arrival a))
forall (m :: * -> *) a. Monad m => a -> m a
return (Arrival a
b, Process m (Arrival a, Stream m (Arrival a)) -> Stream m (Arrival a)
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (Arrival a, Stream m (Arrival a))
 -> Stream m (Arrival a))
-> Process m (Arrival a, Stream m (Arrival a))
-> Stream m (Arrival a)
forall a b. (a -> b) -> a -> b
$ Stream m a
-> Maybe Double -> Process m (Arrival a, Stream m (Arrival a))
loop Stream m a
xs (Double -> Maybe Double
forall a. a -> Maybe a
Just Double
t))

-- | Delay the stream by one step using the specified initial value.
delayStream :: MonadDES m => a -> Stream m a -> Stream m a
{-# INLINABLE delayStream #-}
delayStream :: a -> Stream m a -> Stream m a
delayStream a
a0 Stream m a
s = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a0, Stream m a
s)

-- | Return a stream consisting of exactly one element and inifinite tail.
singletonStream :: MonadDES m => a -> Stream m a
{-# INLINABLE singletonStream #-}
singletonStream :: a -> Stream m a
singletonStream a
a = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
forall (m :: * -> *) a. MonadDES m => Stream m a
emptyStream)

-- | Removes one level of the computation, projecting its bound stream into the outer level.
joinStream :: MonadDES m => Process m (Stream m a) -> Stream m a
{-# INLINABLE joinStream #-}
joinStream :: Process m (Stream m a) -> Stream m a
joinStream Process m (Stream m a)
m = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ Process m (Stream m a)
m Process m (Stream m a)
-> (Stream m a -> Process m (a, Stream m a))
-> Process m (a, Stream m a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream

-- | Takes the next stream from the list after the current stream fails because of cancelling the underlying process.
failoverStream :: MonadDES m => [Stream m a] -> Stream m a
{-# INLINABLE failoverStream #-}
failoverStream :: [Stream m a] -> Stream m a
failoverStream [Stream m a]
ps = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons Process m (a, Stream m a)
z where
  z :: Process m (a, Stream m a)
z = do Resource m FCFS
reading <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
         Resource m FCFS
writing <- Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Resource m FCFS) -> Process m (Resource m FCFS))
-> Simulation m (Resource m FCFS) -> Process m (Resource m FCFS)
forall a b. (a -> b) -> a -> b
$ FCFS -> Int -> Maybe Int -> Simulation m (Resource m FCFS)
forall (m :: * -> *) s.
(MonadDES m, QueueStrategy m s) =>
s -> Int -> Maybe Int -> Simulation m (Resource m s)
newResourceWithMaxCount FCFS
FCFS Int
0 (Int -> Maybe Int
forall a. a -> Maybe a
Just Int
1)
         Ref m (Maybe a)
ref <- Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
SimulationLift t m =>
Simulation m a -> t m a
liftSimulation (Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a)))
-> Simulation m (Ref m (Maybe a)) -> Process m (Ref m (Maybe a))
forall a b. (a -> b) -> a -> b
$ Maybe a -> Simulation m (Ref m (Maybe a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Maybe a
forall a. Maybe a
Nothing
         ProcessId m
pid <- Process m (ProcessId m)
forall (m :: * -> *). MonadDES m => Process m (ProcessId m)
processId
         let writer :: Stream m a -> Process m b
writer Stream m a
p =
               do Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
writing
                  ProcessId m
pid' <- Process m (ProcessId m)
forall (m :: * -> *). MonadDES m => Process m (ProcessId m)
processId
                  (a
a, Stream m a
xs) <-
                    Process m (a, Stream m a)
-> Process m () -> Process m (a, Stream m a)
forall (m :: * -> *) a b.
MonadDES m =>
Process m a -> Process m b -> Process m a
finallyProcess (Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
p) (Process m () -> Process m (a, Stream m a))
-> Process m () -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$
                    Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$
                    do Bool
cancelled' <- ProcessId m -> Event m Bool
forall (m :: * -> *). MonadDES m => ProcessId m -> Event m Bool
processCancelled ProcessId m
pid'
                       Bool -> Event m () -> Event m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
cancelled' (Event m () -> Event m ()) -> Event m () -> Event m ()
forall a b. (a -> b) -> a -> b
$
                         Resource m FCFS -> Event m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Event m ()
releaseResourceWithinEvent Resource m FCFS
writing
                  Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref (a -> Maybe a
forall a. a -> Maybe a
Just a
a)
                  Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
reading
                  Stream m a -> Process m b
writer Stream m a
xs
             reader :: Process m a
reader =
               do Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, DequeueStrategy m s) =>
Resource m s -> Process m ()
releaseResource Resource m FCFS
writing
                  Resource m FCFS -> Process m ()
forall (m :: * -> *) s.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m ()
requestResource Resource m FCFS
reading
                  Just a
a <- Event m (Maybe a) -> Process m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Maybe a) -> Process m (Maybe a))
-> Event m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Event m (Maybe a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Maybe a)
ref
                  Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Maybe a) -> Maybe a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing
                  a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
             loop :: [Stream m a] -> Process m ()
loop [] = () -> Process m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
             loop (Stream m a
p: [Stream m a]
ps) =
               do ProcessId m
pid' <- Process m (ProcessId m)
forall (m :: * -> *). MonadDES m => Process m (ProcessId m)
processId
                  DisposableEvent m
h' <- Event m (DisposableEvent m) -> Process m (DisposableEvent m)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (DisposableEvent m) -> Process m (DisposableEvent m))
-> Event m (DisposableEvent m) -> Process m (DisposableEvent m)
forall a b. (a -> b) -> a -> b
$
                        Signal m () -> (() -> Event m ()) -> Event m (DisposableEvent m)
forall (m :: * -> *) a.
Signal m a -> (a -> Event m ()) -> Event m (DisposableEvent m)
handleSignal (ProcessId m -> Signal m ()
forall (m :: * -> *). MonadDES m => ProcessId m -> Signal m ()
processCancelling ProcessId m
pid) ((() -> Event m ()) -> Event m (DisposableEvent m))
-> (() -> Event m ()) -> Event m (DisposableEvent m)
forall a b. (a -> b) -> a -> b
$ \() ->
                        ProcessId m -> Event m ()
forall (m :: * -> *). MonadDES m => ProcessId m -> Event m ()
cancelProcessWithId ProcessId m
pid'
                  Process m () -> Process m () -> Process m ()
forall (m :: * -> *) a b.
MonadDES m =>
Process m a -> Process m b -> Process m a
finallyProcess (Stream m a -> Process m ()
forall b. Stream m a -> Process m b
writer Stream m a
p) (Process m () -> Process m ()) -> Process m () -> Process m ()
forall a b. (a -> b) -> a -> b
$
                    Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$
                    do DisposableEvent m -> Event m ()
forall (m :: * -> *). DisposableEvent m -> Event m ()
disposeEvent DisposableEvent m
h'
                       Bool
cancelled <- ProcessId m -> Event m Bool
forall (m :: * -> *). MonadDES m => ProcessId m -> Event m Bool
processCancelled ProcessId m
pid
                       Bool -> Event m () -> Event m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
cancelled (Event m () -> Event m ()) -> Event m () -> Event m ()
forall a b. (a -> b) -> a -> b
$
                         do Bool
cancelled' <- ProcessId m -> Event m Bool
forall (m :: * -> *). MonadDES m => ProcessId m -> Event m Bool
processCancelled ProcessId m
pid'
                            Bool -> Event m () -> Event m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
cancelled' (Event m () -> Event m ()) -> Event m () -> Event m ()
forall a b. (a -> b) -> a -> b
$
                              [Char] -> Event m ()
forall a. HasCallStack => [Char] -> a
error [Char]
"Expected the sub-process to be cancelled: failoverStream"
                            Process m () -> Event m ()
forall (m :: * -> *). MonadDES m => Process m () -> Event m ()
runProcess (Process m () -> Event m ()) -> Process m () -> Event m ()
forall a b. (a -> b) -> a -> b
$ [Stream m a] -> Process m ()
loop [Stream m a]
ps
         Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Process m () -> Event m ()
forall (m :: * -> *). MonadDES m => Process m () -> Event m ()
runProcess (Process m () -> Event m ()) -> Process m () -> Event m ()
forall a b. (a -> b) -> a -> b
$ [Stream m a] -> Process m ()
loop [Stream m a]
ps
         Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m a -> Process m (a, Stream m a))
-> Stream m a -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$ Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess Process m a
reader

-- | Return the prefix of the stream of the specified length.
takeStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE takeStream #-}
takeStream :: Int -> Stream m a -> Stream m a
takeStream Int
n Stream m a
s
  | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0    = Stream m a
forall (m :: * -> *) a. MonadDES m => Stream m a
emptyStream
  | Bool
otherwise =
    Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$
    do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
       (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Int -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
Int -> Stream m a -> Stream m a
takeStream (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Stream m a
xs)

-- | Return the longest prefix of the stream of elements that satisfy the predicate.
takeStreamWhile :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
{-# INLINABLE takeStreamWhile #-}
takeStreamWhile :: (a -> Bool) -> Stream m a -> Stream m a
takeStreamWhile a -> Bool
p Stream m a
s =
  Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$
  do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
     if a -> Bool
p a
a
       then (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
takeStreamWhile a -> Bool
p Stream m a
xs)
       else Process m (a, Stream m a)
forall (m :: * -> *) a. MonadDES m => Process m a
neverProcess

-- | Return the longest prefix of the stream of elements that satisfy the computation.
takeStreamWhileM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
{-# INLINABLE takeStreamWhileM #-}
takeStreamWhileM :: (a -> Process m Bool) -> Stream m a -> Stream m a
takeStreamWhileM a -> Process m Bool
p Stream m a
s =
  Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$
  do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
     Bool
f <- a -> Process m Bool
p a
a
     if Bool
f
       then (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, (a -> Process m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m Bool) -> Stream m a -> Stream m a
takeStreamWhileM a -> Process m Bool
p Stream m a
xs)
       else Process m (a, Stream m a)
forall (m :: * -> *) a. MonadDES m => Process m a
neverProcess

-- | Return the suffix of the stream after the specified first elements.
dropStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE dropStream #-}
dropStream :: Int -> Stream m a -> Stream m a
dropStream Int
n Stream m a
s
  | Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0    = Stream m a
s
  | Bool
otherwise =
    Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$
    do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
       Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m a -> Process m (a, Stream m a))
-> Stream m a -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$ Int -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
Int -> Stream m a -> Stream m a
dropStream (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) Stream m a
xs

-- | Return the suffix of the stream of elements remaining after 'takeStreamWhile'.
dropStreamWhile :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
{-# INLINABLE dropStreamWhile #-}
dropStreamWhile :: (a -> Bool) -> Stream m a -> Stream m a
dropStreamWhile a -> Bool
p Stream m a
s =
  Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$
  do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
     if a -> Bool
p a
a
       then Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m a -> Process m (a, Stream m a))
-> Stream m a -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$ (a -> Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
dropStreamWhile a -> Bool
p Stream m a
xs
       else (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
xs)

-- | Return the suffix of the stream of elements remaining after 'takeStreamWhileM'.
dropStreamWhileM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
{-# INLINABLE dropStreamWhileM #-}
dropStreamWhileM :: (a -> Process m Bool) -> Stream m a -> Stream m a
dropStreamWhileM a -> Process m Bool
p Stream m a
s =
  Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$
  do (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
     Bool
f <- a -> Process m Bool
p a
a
     if Bool
f
       then Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream (Stream m a -> Process m (a, Stream m a))
-> Stream m a -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$ (a -> Process m Bool) -> Stream m a -> Stream m a
forall (m :: * -> *) a.
MonadDES m =>
(a -> Process m Bool) -> Stream m a -> Stream m a
dropStreamWhileM a -> Process m Bool
p Stream m a
xs
       else (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Stream m a
xs)

-- | Create the specified number of equivalent clones of the input stream.
cloneStream :: MonadDES m => Int -> Stream m a -> Simulation m [Stream m a]
{-# INLINABLE cloneStream #-}
cloneStream :: Int -> Stream m a -> Simulation m [Stream m a]
cloneStream Int
n Stream m a
s =
  do [FCFSQueue m a]
qs  <- [Int]
-> (Int -> Simulation m (FCFSQueue m a))
-> Simulation m [FCFSQueue m a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [Int
1..Int
n] ((Int -> Simulation m (FCFSQueue m a))
 -> Simulation m [FCFSQueue m a])
-> (Int -> Simulation m (FCFSQueue m a))
-> Simulation m [FCFSQueue m a]
forall a b. (a -> b) -> a -> b
$ \Int
i -> Simulation m (FCFSQueue m a)
forall (m :: * -> *) a. MonadDES m => Simulation m (FCFSQueue m a)
IQ.newFCFSQueue
     FCFSResource m
rs  <- Int -> Simulation m (FCFSResource m)
forall (m :: * -> *).
MonadDES m =>
Int -> Simulation m (FCFSResource m)
newFCFSResource Int
1
     Ref m (Stream m a)
ref <- Stream m a -> Simulation m (Ref m (Stream m a))
forall (m :: * -> *) a. MonadRef m => a -> Simulation m (Ref m a)
newRef Stream m a
s
     let reader :: a -> Queue m sm so a -> Process m a
reader a
m Queue m sm so a
q =
           do Maybe a
a <- Event m (Maybe a) -> Process m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Maybe a) -> Process m (Maybe a))
-> Event m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Queue m sm so a -> Event m (Maybe a)
forall (m :: * -> *) sm so a.
(MonadDES m, DequeueStrategy m sm) =>
Queue m sm so a -> Event m (Maybe a)
IQ.tryDequeue Queue m sm so a
q
              case Maybe a
a of
                Just a
a  -> a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
                Maybe a
Nothing ->
                  FCFSResource m -> Process m a -> Process m a
forall (m :: * -> *) s a.
(MonadDES m, EnqueueStrategy m s) =>
Resource m s -> Process m a -> Process m a
usingResource FCFSResource m
rs (Process m a -> Process m a) -> Process m a -> Process m a
forall a b. (a -> b) -> a -> b
$
                  do Maybe a
a <- Event m (Maybe a) -> Process m (Maybe a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Maybe a) -> Process m (Maybe a))
-> Event m (Maybe a) -> Process m (Maybe a)
forall a b. (a -> b) -> a -> b
$ Queue m sm so a -> Event m (Maybe a)
forall (m :: * -> *) sm so a.
(MonadDES m, DequeueStrategy m sm) =>
Queue m sm so a -> Event m (Maybe a)
IQ.tryDequeue Queue m sm so a
q
                     case Maybe a
a of
                       Just a
a  -> a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
                       Maybe a
Nothing ->
                         do Stream m a
s <- Event m (Stream m a) -> Process m (Stream m a)
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m (Stream m a) -> Process m (Stream m a))
-> Event m (Stream m a) -> Process m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Ref m (Stream m a) -> Event m (Stream m a)
forall (m :: * -> *) a. MonadRef m => Ref m a -> Event m a
readRef Ref m (Stream m a)
ref
                            (a
a, Stream m a
xs) <- Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
                            Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ Ref m (Stream m a) -> Stream m a -> Event m ()
forall (m :: * -> *) a. MonadRef m => Ref m a -> a -> Event m ()
writeRef Ref m (Stream m a)
ref Stream m a
xs
                            [(a, FCFSQueue m a)]
-> ((a, FCFSQueue m a) -> Process m ()) -> Process m ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([a] -> [FCFSQueue m a] -> [(a, FCFSQueue m a)]
forall a b. [a] -> [b] -> [(a, b)]
zip [a
1..] [FCFSQueue m a]
qs) (((a, FCFSQueue m a) -> Process m ()) -> Process m ())
-> ((a, FCFSQueue m a) -> Process m ()) -> Process m ()
forall a b. (a -> b) -> a -> b
$ \(a
i, FCFSQueue m a
q) ->
                              Bool -> Process m () -> Process m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (a
i a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
m) (Process m () -> Process m ()) -> Process m () -> Process m ()
forall a b. (a -> b) -> a -> b
$
                              Event m () -> Process m ()
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
EventLift t m =>
Event m a -> t m a
liftEvent (Event m () -> Process m ()) -> Event m () -> Process m ()
forall a b. (a -> b) -> a -> b
$ FCFSQueue m a -> a -> Event m ()
forall (m :: * -> *) sm so a.
(MonadDES m, EnqueueStrategy m sm, DequeueStrategy m so) =>
Queue m sm so a -> a -> Event m ()
IQ.enqueue FCFSQueue m a
q a
a
                            a -> Process m a
forall (m :: * -> *) a. Monad m => a -> m a
return a
a
     [(Integer, FCFSQueue m a)]
-> ((Integer, FCFSQueue m a) -> Simulation m (Stream m a))
-> Simulation m [Stream m a]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM ([Integer] -> [FCFSQueue m a] -> [(Integer, FCFSQueue m a)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Integer
1..] [FCFSQueue m a]
qs) (((Integer, FCFSQueue m a) -> Simulation m (Stream m a))
 -> Simulation m [Stream m a])
-> ((Integer, FCFSQueue m a) -> Simulation m (Stream m a))
-> Simulation m [Stream m a]
forall a b. (a -> b) -> a -> b
$ \(Integer
i, FCFSQueue m a
q) ->
       Stream m a -> Simulation m (Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream m a -> Simulation m (Stream m a))
-> Stream m a -> Simulation m (Stream m a)
forall a b. (a -> b) -> a -> b
$ Process m a -> Stream m a
forall (m :: * -> *) a. MonadDES m => Process m a -> Stream m a
repeatProcess (Process m a -> Stream m a) -> Process m a -> Stream m a
forall a b. (a -> b) -> a -> b
$ Integer -> FCFSQueue m a -> Process m a
forall a sm so.
(Num a, Enum a, Eq a, DequeueStrategy m sm) =>
a -> Queue m sm so a -> Process m a
reader Integer
i FCFSQueue m a
q

-- | Return a stream of first arrivals after assembling the specified number of elements.
firstArrivalStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE firstArrivalStream #-}
firstArrivalStream :: Int -> Stream m a -> Stream m a
firstArrivalStream Int
n Stream m a
s = ((Int, Maybe a) -> a -> Process m ((Int, Maybe a), Maybe a))
-> (Int, Maybe a) -> Stream m a -> Stream m a
forall (m :: * -> *) acc a b.
MonadDES m =>
(acc -> a -> Process m (acc, Maybe b))
-> acc -> Stream m a -> Stream m b
assembleAccumStream (Int, Maybe a) -> a -> Process m ((Int, Maybe a), Maybe a)
forall (m :: * -> *) a.
Monad m =>
(Int, Maybe a) -> a -> m ((Int, Maybe a), Maybe a)
f (Int
1, Maybe a
forall a. Maybe a
Nothing) Stream m a
s
  where f :: (Int, Maybe a) -> a -> m ((Int, Maybe a), Maybe a)
f (Int
i, Maybe a
a0) a
a =
          let a0' :: Maybe a
a0' = a -> Maybe a
forall a. a -> Maybe a
Just (a -> Maybe a) -> a -> Maybe a
forall a b. (a -> b) -> a -> b
$ a -> Maybe a -> a
forall a. a -> Maybe a -> a
fromMaybe a
a Maybe a
a0
          in if Int
i Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
             then ((Int, Maybe a), Maybe a) -> m ((Int, Maybe a), Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
1, Maybe a
forall a. Maybe a
Nothing), Maybe a
a0')
             else ((Int, Maybe a), Maybe a) -> m ((Int, Maybe a), Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return ((Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, Maybe a
a0'), Maybe a
forall a. Maybe a
Nothing)

-- | Return a stream of last arrivals after assembling the specified number of elements.
lastArrivalStream :: MonadDES m => Int -> Stream m a -> Stream m a
{-# INLINABLE lastArrivalStream #-}
lastArrivalStream :: Int -> Stream m a -> Stream m a
lastArrivalStream Int
n Stream m a
s = (Int -> a -> Process m (Int, Maybe a))
-> Int -> Stream m a -> Stream m a
forall (m :: * -> *) acc a b.
MonadDES m =>
(acc -> a -> Process m (acc, Maybe b))
-> acc -> Stream m a -> Stream m b
assembleAccumStream Int -> a -> Process m (Int, Maybe a)
forall (m :: * -> *) a. Monad m => Int -> a -> m (Int, Maybe a)
f Int
1 Stream m a
s
  where f :: Int -> a -> m (Int, Maybe a)
f Int
i a
a =
          if Int
i Int -> Int -> Int
forall a. Integral a => a -> a -> a
`mod` Int
n Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
          then (Int, Maybe a) -> m (Int, Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
1, a -> Maybe a
forall a. a -> Maybe a
Just a
a)
          else (Int, Maybe a) -> m (Int, Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1, Maybe a
forall a. Maybe a
Nothing)

-- | Assemble an accumulated stream using the supplied function.
assembleAccumStream :: MonadDES m => (acc -> a -> Process m (acc, Maybe b)) -> acc -> Stream m a -> Stream m b
{-# INLINABLE assembleAccumStream #-}
assembleAccumStream :: (acc -> a -> Process m (acc, Maybe b))
-> acc -> Stream m a -> Stream m b
assembleAccumStream acc -> a -> Process m (acc, Maybe b)
f acc
acc Stream m a
s =
  (Maybe b -> b) -> Stream m (Maybe b) -> Stream m b
forall (m :: * -> *) a b.
MonadDES m =>
(a -> b) -> Stream m a -> Stream m b
mapStream Maybe b -> b
forall a. HasCallStack => Maybe a -> a
fromJust (Stream m (Maybe b) -> Stream m b)
-> Stream m (Maybe b) -> Stream m b
forall a b. (a -> b) -> a -> b
$
  (Maybe b -> Bool) -> Stream m (Maybe b) -> Stream m (Maybe b)
forall (m :: * -> *) a.
MonadDES m =>
(a -> Bool) -> Stream m a -> Stream m a
filterStream Maybe b -> Bool
forall a. Maybe a -> Bool
isJust (Stream m (Maybe b) -> Stream m (Maybe b))
-> Stream m (Maybe b) -> Stream m (Maybe b)
forall a b. (a -> b) -> a -> b
$
  (acc -> a -> Process m (acc, Maybe b))
-> acc -> Stream m a -> Stream m (Maybe b)
forall (m :: * -> *) acc a b.
MonadDES m =>
(acc -> a -> Process m (acc, b)) -> acc -> Stream m a -> Stream m b
accumStream acc -> a -> Process m (acc, Maybe b)
f acc
acc Stream m a
s

-- | Show the debug messages with the current simulation time.
traceStream :: MonadDES m
               => Maybe String
               -- ^ the request message
               -> Maybe String
               -- ^ the response message
               -> Stream m a
               -- ^ a stream
               -> Stream m a
{-# INLINABLE traceStream #-}
traceStream :: Maybe [Char] -> Maybe [Char] -> Stream m a -> Stream m a
traceStream Maybe [Char]
request Maybe [Char]
response Stream m a
s = Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a.
MonadDES m =>
Stream m a -> Process m (a, Stream m a)
loop Stream m a
s where
  loop :: Stream m a -> Process m (a, Stream m a)
loop Stream m a
s = do (a
a, Stream m a
xs) <-
                case Maybe [Char]
request of
                  Maybe [Char]
Nothing -> Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
                  Just [Char]
message ->
                    [Char] -> Process m (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a.
MonadDES m =>
[Char] -> Process m a -> Process m a
traceProcess [Char]
message (Process m (a, Stream m a) -> Process m (a, Stream m a))
-> Process m (a, Stream m a) -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$
                    Stream m a -> Process m (a, Stream m a)
forall (m :: * -> *) a. Stream m a -> Process m (a, Stream m a)
runStream Stream m a
s
              case Maybe [Char]
response of
                Maybe [Char]
Nothing -> (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Process m (a, Stream m a)
loop Stream m a
xs)
                Just [Char]
message ->
                  [Char] -> Process m (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a.
MonadDES m =>
[Char] -> Process m a -> Process m a
traceProcess [Char]
message (Process m (a, Stream m a) -> Process m (a, Stream m a))
-> Process m (a, Stream m a) -> Process m (a, Stream m a)
forall a b. (a -> b) -> a -> b
$
                  (a, Stream m a) -> Process m (a, Stream m a)
forall (m :: * -> *) a. Monad m => a -> m a
return (a
a, Process m (a, Stream m a) -> Stream m a
forall (m :: * -> *) a. Process m (a, Stream m a) -> Stream m a
Cons (Process m (a, Stream m a) -> Stream m a)
-> Process m (a, Stream m a) -> Stream m a
forall a b. (a -> b) -> a -> b
$ Stream m a -> Process m (a, Stream m a)
loop Stream m a
xs)