Copyright | Copyright (c) 2009-2016 David Sorokin <david.sorokin@gmail.com> |
---|---|
License | BSD3 |
Maintainer | David Sorokin <david.sorokin@gmail.com> |
Stability | experimental |
Safe Haskell | None |
Language | Haskell2010 |
Tested with: GHC 8.0.1
The processor of simulation data.
- newtype Processor a b = Processor {
- runProcessor :: Stream a -> Stream b
- emptyProcessor :: Processor a b
- arrProcessor :: (a -> Process b) -> Processor a b
- accumProcessor :: (acc -> a -> Process (acc, b)) -> acc -> Processor a b
- withinProcessor :: Process () -> Processor a a
- processorUsingId :: ProcessId -> Processor a b -> Processor a b
- prefetchProcessor :: Processor a a
- delayProcessor :: a -> Processor a a
- bufferProcessor :: (Stream a -> Process ()) -> Stream b -> Processor a b
- bufferProcessorLoop :: (Stream a -> Stream c -> Process ()) -> Stream d -> Processor d (Either e b) -> Processor e c -> Processor a b
- queueProcessor :: (a -> Process ()) -> Process b -> Processor a b
- queueProcessorLoopMerging :: (Stream a -> Stream d -> Stream e) -> (e -> Process ()) -> Process c -> Processor c (Either f b) -> Processor f d -> Processor a b
- queueProcessorLoopSeq :: (a -> Process ()) -> Process c -> Processor c (Either e b) -> Processor e a -> Processor a b
- queueProcessorLoopParallel :: (a -> Process ()) -> Process c -> Processor c (Either e b) -> Processor e a -> Processor a b
- processorSeq :: [Processor a a] -> Processor a a
- processorParallel :: [Processor a b] -> Processor a b
- processorQueuedParallel :: (EnqueueStrategy si, EnqueueStrategy so) => si -> so -> [Processor a b] -> Processor a b
- processorPrioritisingOutputParallel :: (EnqueueStrategy si, PriorityQueueStrategy so po) => si -> so -> [Processor a (po, b)] -> Processor a b
- processorPrioritisingInputParallel :: (PriorityQueueStrategy si pi, EnqueueStrategy so) => si -> so -> [(Stream pi, Processor a b)] -> Processor a b
- processorPrioritisingInputOutputParallel :: (PriorityQueueStrategy si pi, PriorityQueueStrategy so po) => si -> so -> [(Stream pi, Processor a (po, b))] -> Processor a b
- arrivalProcessor :: Processor a (Arrival a)
- joinProcessor :: Process (Processor a b) -> Processor a b
- failoverProcessor :: [Processor a b] -> Processor a b
- channelProcessor :: Channel a b -> Processor a b
- processorChannel :: Processor a b -> Channel a b
- queuedChannelProcessor :: (b -> Event ()) -> Process b -> Channel a b -> Processor a b
- queuedProcessorChannel :: (a -> Event ()) -> Process a -> Processor a b -> Channel a b
- traceProcessor :: Maybe String -> Maybe String -> Processor a b -> Processor a b
Processor Type
newtype Processor a b Source #
Represents a processor of simulation data.
Processor | |
|
Processor Primitives
emptyProcessor :: Processor a b Source #
A processor that never finishes its work producing an emptyStream
.
arrProcessor :: (a -> Process b) -> Processor a b Source #
Create a simple processor by the specified handling function that runs the discontinuous process for each input value to get the output.
accumProcessor :: (acc -> a -> Process (acc, b)) -> acc -> Processor a b Source #
Accumulator that outputs a value determined by the supplied function.
withinProcessor :: Process () -> Processor a a Source #
Involve the computation with side effect when processing a stream of data.
Specifying Identifier
processorUsingId :: ProcessId -> Processor a b -> Processor a b Source #
Create a processor that will use the specified process identifier.
It can be useful to refer to the underlying Process
computation which
can be passivated, interrupted, canceled and so on. See also the
processUsingId
function for more details.
Prefetch and Delay Processors
prefetchProcessor :: Processor a a Source #
This is a prefetch processor that requests for one more data item from the input in advance while the latest item is not yet fully processed in the chain of streams, usually by other processors.
You can think of this as the prefetched processor could place its latest data item in some temporary space for later use, which is very useful for modeling a sequence of separate and independent work places.
delayProcessor :: a -> Processor a a Source #
A processor that delays the input stream by one step using the specified initial value.
Buffer Processor
:: (Stream a -> Process ()) | a separate process to consume the input |
-> Stream b | the resulting stream of data |
-> Processor a b |
Create a buffer processor, where the process from the first argument consumes the input stream but the stream passed in as the second argument and produced usually by some other process is returned as an output. This kind of processor is very useful for modeling the queues.
:: (Stream a -> Stream c -> Process ()) | consume two streams: the input values of type |
-> Stream d | the stream of data that may become results |
-> Processor d (Either e b) | process and then decide what values of type |
-> Processor e c | process in the loop and then return a value
of type |
-> Processor a b |
Like bufferProcessor
but allows creating a loop when some items
can be processed repeatedly. It is very useful for modeling the processors
with queues and loop-backs.
Processing Queues
:: (a -> Process ()) | enqueue the input item and wait while the queue is full if required so that there were no hanging items |
-> Process b | dequeue an output item |
-> Processor a b | the buffering processor |
Return a processor with help of which we can model the queue.
Although the function doesn't refer to the queue directly, its main use case is namely a processing of the queue. The first argument should be the enqueueing operation, while the second argument should be the opposite dequeueing operation.
The reason is as follows. There are many possible combinations how the queues can be modeled. There is no sense to enumerate all them creating a separate function for each case. We can just use combinators to define exactly what we need.
So, the queue can lose the input items if the queue is full, or the input process can suspend while the queue is full, or we can use priorities for enqueueing, storing and dequeueing the items in different combinations. There are so many use cases!
There is a hope that this function along with other similar functions from this
module is sufficient to cover the most important cases. Even if it is not sufficient
then you can use a more generic function bufferProcessor
which this function is
based on. In case of need, you can even write your own function from scratch. It is
quite easy actually.
queueProcessorLoopMerging Source #
:: (Stream a -> Stream d -> Stream e) | merge two streams: the input values of type |
-> (e -> Process ()) | enqueue the input item and wait while the queue is full if required so that there were no hanging items |
-> Process c | dequeue an item for the further processing |
-> Processor c (Either f b) | process and then decide what values of type |
-> Processor f d | process in the loop and then return a value
of type |
-> Processor a b | the buffering processor |
Like queueProcessor
creates a queue processor but with a loop when some items
can be processed and then added to the queue again. Also it allows specifying
how two input streams of data can be merged.
queueProcessorLoopSeq Source #
:: (a -> Process ()) | enqueue the input item and wait while the queue is full if required so that there were no hanging items |
-> Process c | dequeue an item for the further processing |
-> Processor c (Either e b) | process and then decide what values of type |
-> Processor e a | process in the loop and then return a value
of type |
-> Processor a b | the buffering processor |
Like queueProcessorLoopMerging
creates a queue processor with a loop when
some items can be processed and then added to the queue again. Only it sequentially
merges two input streams of data: one stream that come from the external source and
another stream of data returned by the loop. The first stream has a priority over
the second one.
queueProcessorLoopParallel Source #
:: (a -> Process ()) | enqueue the input item and wait while the queue is full if required so that there were no hanging items |
-> Process c | dequeue an item for the further processing |
-> Processor c (Either e b) | process and then decide what values of type |
-> Processor e a | process in the loop and then return a value
of type |
-> Processor a b | the buffering processor |
Like queueProcessorLoopMerging
creates a queue processor with a loop when
some items can be processed and then added to the queue again. Only it runs two
simultaneous processes to enqueue the input streams of data: one stream that come
from the external source and another stream of data returned by the loop.
Sequencing Processors
processorSeq :: [Processor a a] -> Processor a a Source #
Launches the processors sequentially using the prefetchProcessor
between them
to model an autonomous work of each of the processors specified.
Parallelizing Processors
processorParallel :: [Processor a b] -> Processor a b Source #
Launches the processors in parallel consuming the same input stream and producing
a combined output stream. This version applies the FCFS
strategy both for input
and output, which suits the most part of uses cases.
processorQueuedParallel Source #
:: (EnqueueStrategy si, EnqueueStrategy so) | |
=> si | the strategy applied for enqueuing the input data |
-> so | the strategy applied for enqueuing the output data |
-> [Processor a b] | the processors to parallelize |
-> Processor a b | the parallelized processor |
Launches the specified processors in parallel consuming the same input stream and producing a combined output stream.
If you don't know what the enqueue strategies to apply, then
you will probably need FCFS
for the both parameters, or
function processorParallel
that does namely this.
processorPrioritisingOutputParallel Source #
:: (EnqueueStrategy si, PriorityQueueStrategy so po) | |
=> si | the strategy applied for enqueuing the input data |
-> so | the strategy applied for enqueuing the output data |
-> [Processor a (po, b)] | the processors to parallelize |
-> Processor a b | the parallelized processor |
Launches the specified processors in parallel using priorities for combining the output.
processorPrioritisingInputParallel Source #
:: (PriorityQueueStrategy si pi, EnqueueStrategy so) | |
=> si | the strategy applied for enqueuing the input data |
-> so | the strategy applied for enqueuing the output data |
-> [(Stream pi, Processor a b)] | the streams of input priorities and the processors to parallelize |
-> Processor a b | the parallelized processor |
Launches the specified processors in parallel using priorities for consuming the intput.
processorPrioritisingInputOutputParallel Source #
:: (PriorityQueueStrategy si pi, PriorityQueueStrategy so po) | |
=> si | the strategy applied for enqueuing the input data |
-> so | the strategy applied for enqueuing the output data |
-> [(Stream pi, Processor a (po, b))] | the streams of input priorities and the processors to parallelize |
-> Processor a b | the parallelized processor |
Launches the specified processors in parallel using priorities for consuming the input and combining the output.
Arrival Processor
arrivalProcessor :: Processor a (Arrival a) Source #
A processor that adds the information about the time points at which the original stream items were received by demand.
Utilities
joinProcessor :: Process (Processor a b) -> Processor a b Source #
Removes one level of the computation, projecting its bound processor into the outer level.
Failover
failoverProcessor :: [Processor a b] -> Processor a b Source #
Takes the next processor from the list after the current processor fails because of cancelling the underlying process.
Integrating with Signals and Channels
channelProcessor :: Channel a b -> Processor a b Source #
Convert the specified signal transform, i.e. the channel, to a processor.
The processor may return data with delay as the values are requested by demand.
Consider using the arrivalSignal
function to provide with the information
about the time points at which the signal was actually triggered.
The point is that the Stream
used in the Processor
is requested outside,
while the Signal
used in the Channel
is triggered inside. They are different by nature.
The former is passive, while the latter is active.
The resulting processor may be a root of space leak as it uses an internal queue to store
the values received from the input signal. Consider using queuedChannelProcessor
that
allows specifying the bounded queue in case of need.
processorChannel :: Processor a b -> Channel a b Source #
Convert the specified processor to a signal transform, i.e. the channel.
The processor may return data with delay as the values are requested by demand.
Consider using the arrivalSignal
function to provide with the information
about the time points at which the signal was actually triggered.
The point is that the Stream
used in the Processor
is requested outside,
while the Signal
used in the Channel
is triggered inside. They are different by nature.
The former is passive, while the latter is active.
The resulting channel may be a root of space leak as it uses an internal queue to store
the values received from the input stream. Consider using queuedProcessorChannel
that
allows specifying the bounded queue in case of need.
queuedChannelProcessor Source #
Like channelProcessor
but allows specifying an arbitrary queue for storing the signal values,
for example, the bounded queue.
queuedProcessorChannel Source #
Like processorChannel
but allows specifying an arbitrary queue for storing the signal values,
for example, the bounded queue.