| Copyright | Copyright (c) 2009-2017 David Sorokin <david.sorokin@gmail.com> | 
|---|---|
| License | BSD3 | 
| Maintainer | David Sorokin <david.sorokin@gmail.com> | 
| Stability | experimental | 
| Safe Haskell | None | 
| Language | Haskell2010 | 
Simulation.Aivika.Trans.Stream
Contents
Description
Tested with: GHC 8.0.1
The infinite stream of data in time.
Synopsis
- newtype Stream m a = Cons {}
- emptyStream :: MonadDES m => Stream m a
- mergeStreams :: MonadDES m => Stream m a -> Stream m a -> Stream m a
- mergeQueuedStreams :: (MonadDES m, EnqueueStrategy m s) => s -> Stream m a -> Stream m a -> Stream m a
- mergePriorityStreams :: (MonadDES m, PriorityQueueStrategy m s p) => s -> Stream m (p, a) -> Stream m (p, a) -> Stream m a
- concatStreams :: MonadDES m => [Stream m a] -> Stream m a
- concatQueuedStreams :: (MonadDES m, EnqueueStrategy m s) => s -> [Stream m a] -> Stream m a
- concatPriorityStreams :: (MonadDES m, PriorityQueueStrategy m s p) => s -> [Stream m (p, a)] -> Stream m a
- splitStream :: MonadDES m => Int -> Stream m a -> Simulation m [Stream m a]
- splitStreamQueueing :: (MonadDES m, EnqueueStrategy m s) => s -> Int -> Stream m a -> Simulation m [Stream m a]
- splitStreamPrioritising :: (MonadDES m, PriorityQueueStrategy m s p) => s -> [Stream m p] -> Stream m a -> Simulation m [Stream m a]
- splitStreamFiltering :: MonadDES m => [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
- splitStreamFilteringQueueing :: (MonadDES m, EnqueueStrategy m s) => s -> [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a]
- streamUsingId :: MonadDES m => ProcessId m -> Stream m a -> Stream m a
- prefetchStream :: MonadDES m => Stream m a -> Stream m a
- delayStream :: MonadDES m => a -> Stream m a -> Stream m a
- arrivalStream :: MonadDES m => Stream m a -> Stream m (Arrival a)
- memoStream :: MonadDES m => Stream m a -> Simulation m (Stream m a)
- zipStreamSeq :: MonadDES m => Stream m a -> Stream m b -> Stream m (a, b)
- zipStreamParallel :: MonadDES m => Stream m a -> Stream m b -> Stream m (a, b)
- zip3StreamSeq :: MonadDES m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
- zip3StreamParallel :: MonadDES m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c)
- unzipStream :: MonadDES m => Stream m (a, b) -> Simulation m (Stream m a, Stream m b)
- streamSeq :: MonadDES m => [Stream m a] -> Stream m [a]
- streamParallel :: MonadDES m => [Stream m a] -> Stream m [a]
- consumeStream :: MonadDES m => (a -> Process m ()) -> Stream m a -> Process m ()
- sinkStream :: MonadDES m => Stream m a -> Process m ()
- repeatProcess :: MonadDES m => Process m a -> Stream m a
- mapStream :: MonadDES m => (a -> b) -> Stream m a -> Stream m b
- mapStreamM :: MonadDES m => (a -> Process m b) -> Stream m a -> Stream m b
- accumStream :: MonadDES m => (acc -> a -> Process m (acc, b)) -> acc -> Stream m a -> Stream m b
- apStream :: MonadDES m => Stream m (a -> b) -> Stream m a -> Stream m b
- apStreamM :: MonadDES m => Stream m (a -> Process m b) -> Stream m a -> Stream m b
- filterStream :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
- filterStreamM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
- takeStream :: MonadDES m => Int -> Stream m a -> Stream m a
- takeStreamWhile :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
- takeStreamWhileM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
- dropStream :: MonadDES m => Int -> Stream m a -> Stream m a
- dropStreamWhile :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a
- dropStreamWhileM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a
- singletonStream :: MonadDES m => a -> Stream m a
- joinStream :: MonadDES m => Process m (Stream m a) -> Stream m a
- failoverStream :: MonadDES m => [Stream m a] -> Stream m a
- signalStream :: MonadDES m => Signal m a -> Composite m (Stream m a)
- streamSignal :: MonadDES m => Stream m a -> Composite m (Signal m a)
- queuedSignalStream :: MonadDES m => (a -> Event m ()) -> Process m a -> Signal m a -> Composite m (Stream m a)
- leftStream :: MonadDES m => Stream m (Either a b) -> Stream m a
- rightStream :: MonadDES m => Stream m (Either a b) -> Stream m b
- replaceLeftStream :: MonadDES m => Stream m (Either a b) -> Stream m c -> Stream m (Either c b)
- replaceRightStream :: MonadDES m => Stream m (Either a b) -> Stream m c -> Stream m (Either a c)
- partitionEitherStream :: MonadDES m => Stream m (Either a b) -> Simulation m (Stream m a, Stream m b)
- cloneStream :: MonadDES m => Int -> Stream m a -> Simulation m [Stream m a]
- firstArrivalStream :: MonadDES m => Int -> Stream m a -> Stream m a
- lastArrivalStream :: MonadDES m => Int -> Stream m a -> Stream m a
- assembleAccumStream :: MonadDES m => (acc -> a -> Process m (acc, Maybe b)) -> acc -> Stream m a -> Stream m b
- traceStream :: MonadDES m => Maybe String -> Maybe String -> Stream m a -> Stream m a
Stream Type
Represents an infinite stream of data in time, some kind of never-ending cons cell.
Merging and Splitting Stream
emptyStream :: MonadDES m => Stream m a Source #
An empty stream that never returns data.
mergeStreams :: MonadDES m => Stream m a -> Stream m a -> Stream m a Source #
Merge two streams applying the FCFS strategy for enqueuing the input data.
Arguments
| :: (MonadDES m, EnqueueStrategy m s) | |
| => s | the strategy applied for enqueuing the input data | 
| -> Stream m a | the fist input stream | 
| -> Stream m a | the second input stream | 
| -> Stream m a | the output combined stream | 
Merge two streams.
If you don't know what the strategy to apply, then you probably
 need the FCFS strategy, or function mergeStreams that
 does namely this.
Arguments
| :: (MonadDES m, PriorityQueueStrategy m s p) | |
| => s | the strategy applied for enqueuing the input data | 
| -> Stream m (p, a) | the fist input stream | 
| -> Stream m (p, a) | the second input stream | 
| -> Stream m a | the output combined stream | 
Merge two priority streams.
concatStreams :: MonadDES m => [Stream m a] -> Stream m a Source #
Concatenate the input streams applying the FCFS strategy and
 producing one output stream.
Arguments
| :: (MonadDES m, EnqueueStrategy m s) | |
| => s | the strategy applied for enqueuing the input data | 
| -> [Stream m a] | the input stream | 
| -> Stream m a | the combined output stream | 
Concatenate the input streams producing one output stream.
If you don't know what the strategy to apply, then you probably
 need the FCFS strategy, or function concatStreams that
 does namely this.
concatPriorityStreams Source #
Arguments
| :: (MonadDES m, PriorityQueueStrategy m s p) | |
| => s | the strategy applied for enqueuing the input data | 
| -> [Stream m (p, a)] | the input stream | 
| -> Stream m a | the combined output stream | 
Concatenate the input priority streams producing one output stream.
splitStream :: MonadDES m => Int -> Stream m a -> Simulation m [Stream m a] Source #
Split the input stream into the specified number of output streams
 after applying the FCFS strategy for enqueuing the output requests.
Arguments
| :: (MonadDES m, EnqueueStrategy m s) | |
| => s | the strategy applied for enqueuing the output requests | 
| -> Int | the number of output streams | 
| -> Stream m a | the input stream | 
| -> Simulation m [Stream m a] | the splitted output streams | 
Split the input stream into the specified number of output streams.
If you don't know what the strategy to apply, then you probably
 need the FCFS strategy, or function splitStream that
 does namely this.
splitStreamPrioritising Source #
Arguments
| :: (MonadDES m, PriorityQueueStrategy m s p) | |
| => s | the strategy applied for enqueuing the output requests | 
| -> [Stream m p] | the streams of priorities | 
| -> Stream m a | the input stream | 
| -> Simulation m [Stream m a] | the splitted output streams | 
Split the input stream into a list of output streams using the specified priorities.
splitStreamFiltering :: MonadDES m => [a -> Event m Bool] -> Stream m a -> Simulation m [Stream m a] Source #
Split the input stream into the specified number of output streams
 after filtering and applying the FCFS strategy for enqueuing the output requests.
splitStreamFilteringQueueing Source #
Arguments
| :: (MonadDES m, EnqueueStrategy m s) | |
| => s | the strategy applied for enqueuing the output requests | 
| -> [a -> Event m Bool] | the filters for output streams | 
| -> Stream m a | the input stream | 
| -> Simulation m [Stream m a] | the splitted output streams | 
Split the input stream into the specified number of output streams after filtering.
If you don't know what the strategy to apply, then you probably
 need the FCFS strategy, or function splitStreamFiltering that
 does namely this.
Specifying Identifier
streamUsingId :: MonadDES m => ProcessId m -> Stream m a -> Stream m a Source #
Create a stream that will use the specified process identifier.
 It can be useful to refer to the underlying Process computation which
 can be passivated, interrupted, canceled and so on. See also the
 processUsingId function for more details.
Prefetching and Delaying Stream
prefetchStream :: MonadDES m => Stream m a -> Stream m a Source #
Prefetch the input stream requesting for one more data item in advance while the last received item is not yet fully processed in the chain of streams, usually by the processors.
You can think of this as the prefetched stream could place its latest data item in some temporary space for later use, which is very useful for modeling a sequence of separate and independent work places.
delayStream :: MonadDES m => a -> Stream m a -> Stream m a Source #
Delay the stream by one step using the specified initial value.
Stream Arriving
arrivalStream :: MonadDES m => Stream m a -> Stream m (Arrival a) Source #
Transform a stream so that the resulting stream returns a sequence of arrivals saving the information about the time points at which the original stream items were received by demand.
Memoizing, Zipping and Uzipping Stream
memoStream :: MonadDES m => Stream m a -> Simulation m (Stream m a) Source #
Memoize the stream so that it would always return the same data within the simulation run.
zipStreamSeq :: MonadDES m => Stream m a -> Stream m b -> Stream m (a, b) Source #
Zip two streams trying to get data sequentially.
zipStreamParallel :: MonadDES m => Stream m a -> Stream m b -> Stream m (a, b) Source #
Zip two streams trying to get data as soon as possible, launching the sub-processes in parallel.
zip3StreamSeq :: MonadDES m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c) Source #
Zip three streams trying to get data sequentially.
zip3StreamParallel :: MonadDES m => Stream m a -> Stream m b -> Stream m c -> Stream m (a, b, c) Source #
Zip three streams trying to get data as soon as possible, launching the sub-processes in parallel.
unzipStream :: MonadDES m => Stream m (a, b) -> Simulation m (Stream m a, Stream m b) Source #
Unzip the stream.
streamSeq :: MonadDES m => [Stream m a] -> Stream m [a] Source #
To form each new portion of data for the output stream, read data sequentially from the input streams.
This is a generalization of zipStreamSeq.
streamParallel :: MonadDES m => [Stream m a] -> Stream m [a] Source #
To form each new portion of data for the output stream, read data from the input streams in parallel.
This is a generalization of zipStreamParallel.
Consuming and Sinking Stream
consumeStream :: MonadDES m => (a -> Process m ()) -> Stream m a -> Process m () Source #
Consume the stream. It returns a process that infinitely reads data from the stream and then redirects them to the provided function. It is useful for modeling the process of enqueueing data in the queue from the input stream.
sinkStream :: MonadDES m => Stream m a -> Process m () Source #
Sink the stream. It returns a process that infinitely reads data from the stream. The resulting computation can be a moving force to simulate the whole system of the interconnected streams and processors.
Useful Combinators
repeatProcess :: MonadDES m => Process m a -> Stream m a Source #
Return a stream of values generated by the specified process.
mapStream :: MonadDES m => (a -> b) -> Stream m a -> Stream m b Source #
Map the stream according the specified function.
mapStreamM :: MonadDES m => (a -> Process m b) -> Stream m a -> Stream m b Source #
Compose the stream.
accumStream :: MonadDES m => (acc -> a -> Process m (acc, b)) -> acc -> Stream m a -> Stream m b Source #
Accumulator that outputs a value determined by the supplied function.
apStream :: MonadDES m => Stream m (a -> b) -> Stream m a -> Stream m b Source #
Sequential application.
apStreamM :: MonadDES m => Stream m (a -> Process m b) -> Stream m a -> Stream m b Source #
Sequential application.
filterStream :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a Source #
Filter only those data values that satisfy to the specified predicate.
filterStreamM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a Source #
Filter only those data values that satisfy to the specified predicate.
takeStream :: MonadDES m => Int -> Stream m a -> Stream m a Source #
Return the prefix of the stream of the specified length.
takeStreamWhile :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a Source #
Return the longest prefix of the stream of elements that satisfy the predicate.
takeStreamWhileM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a Source #
Return the longest prefix of the stream of elements that satisfy the computation.
dropStream :: MonadDES m => Int -> Stream m a -> Stream m a Source #
Return the suffix of the stream after the specified first elements.
dropStreamWhile :: MonadDES m => (a -> Bool) -> Stream m a -> Stream m a Source #
Return the suffix of the stream of elements remaining after takeStreamWhile.
dropStreamWhileM :: MonadDES m => (a -> Process m Bool) -> Stream m a -> Stream m a Source #
Return the suffix of the stream of elements remaining after takeStreamWhileM.
singletonStream :: MonadDES m => a -> Stream m a Source #
Return a stream consisting of exactly one element and inifinite tail.
joinStream :: MonadDES m => Process m (Stream m a) -> Stream m a Source #
Removes one level of the computation, projecting its bound stream into the outer level.
Failover
failoverStream :: MonadDES m => [Stream m a] -> Stream m a Source #
Takes the next stream from the list after the current stream fails because of cancelling the underlying process.
Integrating with Signals
signalStream :: MonadDES m => Signal m a -> Composite m (Stream m a) Source #
Return a stream of values triggered by the specified signal.
Since the time at which the values of the stream are requested for may differ from
 the time at which the signal is triggered, it can be useful to apply the arrivalSignal
 function to add the information about the time points at which the signal was 
 actually received.
The point is that the Stream is requested outside, while the Signal is triggered
 inside. They are different by nature. The former is passive, while the latter is active.
The resulting stream may be a root of space leak as it uses an internal unbounded queue to store
 the values received from the signal. The oldest value is dequeued each time we request
 the stream and it is returned within the computation. Consider using queuedSignalStream that
 allows specifying the bounded queue in case of need.
streamSignal :: MonadDES m => Stream m a -> Composite m (Signal m a) Source #
Return a computation of the signal that triggers values from the specified stream,
 each time the next value of the stream is received within the underlying Process 
 computation.
Arguments
| :: MonadDES m | |
| => (a -> Event m ()) | enqueue | 
| -> Process m a | dequeue | 
| -> Signal m a | the input signal | 
| -> Composite m (Stream m a) | the output stream | 
Like signalStream but allows specifying an arbitrary queue instead of the unbounded queue.
Utilities
rightStream :: MonadDES m => Stream m (Either a b) -> Stream m b Source #
The stream of Right values.
replaceLeftStream :: MonadDES m => Stream m (Either a b) -> Stream m c -> Stream m (Either c b) Source #
Replace the Left values.
replaceRightStream :: MonadDES m => Stream m (Either a b) -> Stream m c -> Stream m (Either a c) Source #
Replace the Right values.
partitionEitherStream :: MonadDES m => Stream m (Either a b) -> Simulation m (Stream m a, Stream m b) Source #
Partition the stream of Either values into two streams.
Assemblying Streams
cloneStream :: MonadDES m => Int -> Stream m a -> Simulation m [Stream m a] Source #
Create the specified number of equivalent clones of the input stream.
firstArrivalStream :: MonadDES m => Int -> Stream m a -> Stream m a Source #
Return a stream of first arrivals after assembling the specified number of elements.
lastArrivalStream :: MonadDES m => Int -> Stream m a -> Stream m a Source #
Return a stream of last arrivals after assembling the specified number of elements.
assembleAccumStream :: MonadDES m => (acc -> a -> Process m (acc, Maybe b)) -> acc -> Stream m a -> Stream m b Source #
Assemble an accumulated stream using the supplied function.