aivika-2.1: A multi-paradigm simulation library

Stabilityexperimental
MaintainerDavid Sorokin <david.sorokin@gmail.com>
Safe HaskellSafe-Inferred

Simulation.Aivika.Processor

Contents

Description

Tested with: GHC 7.8.3

The processor of simulation data.

Synopsis

Processor Type

newtype Processor a b Source

Represents a processor of simulation data.

Constructors

Processor 

Fields

runProcessor :: Stream a -> Stream b

Run the processor.

Instances

Category Processor 
Arrow Processor 
ArrowZero Processor 
ArrowPlus Processor 
ArrowChoice Processor 

Processor Primitives

emptyProcessor :: Processor a bSource

A processor that never finishes its work producing an emptyStream.

arrProcessor :: (a -> Process b) -> Processor a bSource

Create a simple processor by the specified handling function that runs the discontinuous process for each input value to get the output.

accumProcessor :: (acc -> a -> Process (acc, b)) -> acc -> Processor a bSource

Accumulator that outputs a value determined by the supplied function.

Specifying Identifier

processorUsingId :: ProcessId -> Processor a b -> Processor a bSource

Create a processor that will use the specified process identifier. It can be useful to refer to the underlying Process computation which can be passivated, interrupted, canceled and so on. See also the processUsingId function for more details.

Prefetch and Delay Processors

prefetchProcessor :: Processor a aSource

This is a prefetch processor that requests for one more data item from the input in advance while the latest item is not yet fully processed in the chain of streams, usually by other processors.

You can think of this as the prefetched processor could place its latest data item in some temporary space for later use, which is very useful for modeling a sequence of separate and independent work places.

delayProcessor :: a -> Processor a aSource

A processor that delays the input stream by one step using the specified initial value.

Buffer Processor

bufferProcessorSource

Arguments

:: (Stream a -> Process ())

a separate process to consume the input

-> Stream b

the resulting stream of data

-> Processor a b 

Create a buffer processor, where the process from the first argument consumes the input stream but the stream passed in as the second argument and produced usually by some other process is returned as an output. This kind of processor is very useful for modeling the queues.

bufferProcessorLoopSource

Arguments

:: (Stream a -> Stream c -> Process ())

consume two streams: the input values of type a and the values of type c returned by the loop

-> Stream d

the stream of data that may become results

-> Processor d (Either e b)

process and then decide what values of type e should be processed in the loop (this is a condition)

-> Processor e c

process in the loop and then return a value of type c to the input again (this is a loop body)

-> Processor a b 

Like bufferProcessor but allows creating a loop when some items can be processed repeatedly. It is very useful for modeling the processors with queues and loop-backs.

Processing Queues

queueProcessorSource

Arguments

:: (a -> Process ())

enqueue the input item and wait while the queue is full if required so that there were no hanging items

-> Process b

dequeue an output item

-> Processor a b

the buffering processor

Return a processor with help of which we can model the queue.

Although the function doesn't refer to the queue directly, its main use case is namely a processing of the queue. The first argument should be the enqueueing operation, while the second argument should be the opposite dequeueing operation.

The reason is as follows. There are many possible combinations how the queues can be modeled. There is no sense to enumerate all them creating a separate function for each case. We can just use combinators to define exactly what we need.

So, the queue can lose the input items if the queue is full, or the input process can suspend while the queue is full, or we can use priorities for enqueueing, storing and dequeueing the items in different combinations. There are so many use cases!

There is a hope that this function along with other similar functions from this module is sufficient to cover the most important cases. Even if it is not sufficient then you can use a more generic function bufferProcessor which this function is based on. In case of need, you can even write your own function from scratch. It is quite easy actually.

queueProcessorLoopMergingSource

Arguments

:: (Stream a -> Stream d -> Stream e)

merge two streams: the input values of type a and the values of type d returned by the loop

-> (e -> Process ())

enqueue the input item and wait while the queue is full if required so that there were no hanging items

-> Process c

dequeue an item for the further processing

-> Processor c (Either f b)

process and then decide what values of type f should be processed in the loop (this is a condition)

-> Processor f d

process in the loop and then return a value of type d to the queue again (this is a loop body)

-> Processor a b

the buffering processor

Like queueProcessor creates a queue processor but with a loop when some items can be processed and then added to the queue again. Also it allows specifying how two input streams of data can be merged.

queueProcessorLoopSeqSource

Arguments

:: (a -> Process ())

enqueue the input item and wait while the queue is full if required so that there were no hanging items

-> Process c

dequeue an item for the further processing

-> Processor c (Either e b)

process and then decide what values of type e should be processed in the loop (this is a condition)

-> Processor e a

process in the loop and then return a value of type a to the queue again (this is a loop body)

-> Processor a b

the buffering processor

Like queueProcessorLoopMerging creates a queue processor with a loop when some items can be processed and then added to the queue again. Only it sequentially merges two input streams of data: one stream that come from the external source and another stream of data returned by the loop. The first stream has a priority over the second one.

queueProcessorLoopParallelSource

Arguments

:: (a -> Process ())

enqueue the input item and wait while the queue is full if required so that there were no hanging items

-> Process c

dequeue an item for the further processing

-> Processor c (Either e b)

process and then decide what values of type e should be processed in the loop (this is a condition)

-> Processor e a

process in the loop and then return a value of type a to the queue again (this is a loop body)

-> Processor a b

the buffering processor

Like queueProcessorLoopMerging creates a queue processor with a loop when some items can be processed and then added to the queue again. Only it runs two simultaneous processes to enqueue the input streams of data: one stream that come from the external source and another stream of data returned by the loop.

Sequencing Processors

processorSeq :: [Processor a a] -> Processor a aSource

Launches the processors sequentially using the prefetchProcessor between them to model an autonomous work of each of the processors specified.

Parallelizing Processors

processorParallel :: [Processor a b] -> Processor a bSource

Launches the processors in parallel consuming the same input stream and producing a combined output stream. This version applies the FCFS strategy both for input and output, which suits the most part of uses cases.

processorQueuedParallelSource

Arguments

:: (EnqueueStrategy si, EnqueueStrategy so) 
=> si

the strategy applied for enqueuing the input data

-> so

the strategy applied for enqueuing the output data

-> [Processor a b]

the processors to parallelize

-> Processor a b

the parallelized processor

Launches the specified processors in parallel consuming the same input stream and producing a combined output stream.

If you don't know what the enqueue strategies to apply, then you will probably need FCFS for the both parameters, or function processorParallel that does namely this.

processorPrioritisingOutputParallelSource

Arguments

:: (EnqueueStrategy si, PriorityQueueStrategy so po) 
=> si

the strategy applied for enqueuing the input data

-> so

the strategy applied for enqueuing the output data

-> [Processor a (po, b)]

the processors to parallelize

-> Processor a b

the parallelized processor

Launches the specified processors in parallel using priorities for combining the output.

processorPrioritisingInputParallelSource

Arguments

:: (PriorityQueueStrategy si pi, EnqueueStrategy so) 
=> si

the strategy applied for enqueuing the input data

-> so

the strategy applied for enqueuing the output data

-> [(Stream pi, Processor a b)]

the streams of input priorities and the processors to parallelize

-> Processor a b

the parallelized processor

Launches the specified processors in parallel using priorities for consuming the intput.

processorPrioritisingInputOutputParallelSource

Arguments

:: (PriorityQueueStrategy si pi, PriorityQueueStrategy so po) 
=> si

the strategy applied for enqueuing the input data

-> so

the strategy applied for enqueuing the output data

-> [(Stream pi, Processor a (po, b))]

the streams of input priorities and the processors to parallelize

-> Processor a b

the parallelized processor

Launches the specified processors in parallel using priorities for consuming the input and combining the output.

Arrival Processor

arrivalProcessor :: Processor a (Arrival a)Source

A processor that adds the information about the time points at which the original stream items were received by demand.

Integrating with Signals

signalProcessor :: (Signal a -> Signal b) -> Processor a bSource

Convert the specified signal transform to a processor.

The processor may return data with delay as the values are requested by demand. Consider using the arrivalSignal function to provide with the information about the time points at which the signal was actually triggered.

The point is that the Stream used in the Processor is requested outside, while the Signal is triggered inside. They are different by nature. The former is passive, while the latter is active.

Cancel the processor's process to unsubscribe from the signals provided.

processorSignaling :: Processor a b -> Signal a -> Process (Signal b)Source

Convert the specified processor to a signal transform.

The processor may return data with delay as the values are requested by demand. Consider using the arrivalSignal function to provide with the information about the time points at which the signal was actually triggered.

The point is that the Stream used in the Processor is requested outside, while the Signal is triggered inside. They are different by nature. The former is passive, while the latter is active.

Cancel the returned process to unsubscribe from the signal specified.