streaming-concurrency-0.3.1.0: Concurrency support for the streaming ecosystem

CopyrightIvan Lazar Miljenovic
LicenseMIT
MaintainerIvan.Miljenovic@gmail.com
Safe HaskellNone
LanguageHaskell2010

Streaming.Concurrent

Contents

Description

Consider a physical desk for someone that has to deal with correspondence.

A typical system is to have two baskets/trays: one for incoming papers that still needs to be processed, and another for outgoing papers that have already been processed.

We use this metaphor for dealing with Buffers: data is fed into one using the InBasket (until the buffer indicates that it has had enough) and taken out from the OutBasket.

Synopsis

Buffers

data Buffer a Source #

Buffer specifies how to buffer messages between our InBasket and our OutBasket.

unbounded :: Buffer a Source #

Store an unbounded number of messages in a FIFO queue.

bounded :: Int -> Buffer a Source #

Store a bounded number of messages, specified by the Int argument.

A buffer size <= 0 will result in a permanently empty buffer, which could result in a system that hangs.

latest :: a -> Buffer a Source #

Only store the "latest" message, beginning with an initial value.

This buffer is never empty nor full; as such, it is up to the caller to ensure they only take as many values as they need (e.g. using print . readStreamBasket as the final parameter to withBuffer will -- after all other values are processed -- keep printing the last value over and over again).

newest :: Int -> Buffer a Source #

Like bounded, but sendMsg never fails (the buffer is never full). Instead, old elements are discard to make room for new elements.

As with bounded, providing a size <= 0 will result in no values being provided to the buffer, thus no values being read and hence the system will most likely hang.

Using a buffer

withBuffer :: (MonadMask m, MonadBaseControl IO m) => Buffer a -> (InBasket a -> m i) -> (OutBasket a -> m r) -> m r Source #

Use a buffer to asynchronously communicate.

Two functions are taken as parameters:

  • How to provide input to the buffer (the result of this is discarded)
  • How to take values from the buffer

As soon as one function indicates that it is complete then the other is terminated. This is safe: trying to write data to a closed buffer will not achieve anything.

However, reading a buffer that has not indicated that it is closed (e.g. waiting on an action to complete to be able to provide the next value) but contains no values will block.

withBufferedTransform Source #

Arguments

:: (MonadMask m, MonadBaseControl IO m) 
=> Int

How many concurrent computations to run.

-> (OutBasket a -> InBasket b -> m ab)

What to do with each individual concurrent computation; result is ignored.

-> (InBasket a -> m i)

Provide initial data; result is ignored.

-> (OutBasket b -> m r) 
-> m r 

Use buffers to concurrently transform the provided data.

In essence, this is a demultiplexer -> multiplexer transformation: the incoming data is split into n individual segments, the results of which are then merged back together again.

Note: ordering of elements in the output is undeterministic.

Since: 0.2.0.0

newtype InBasket a Source #

An exhaustible sink of values.

sendMsg returns False if the sink is exhausted.

Constructors

InBasket 

Fields

newtype OutBasket a Source #

An exhaustible source of values.

receiveMsg returns Nothing if the source is exhausted.

Constructors

OutBasket 

Fields

Stream support

writeStreamBasket :: MonadBase IO m => Stream (Of a) m r -> InBasket a -> m () Source #

Write a single stream to a buffer.

Type written to make it easier if this is the only stream being written to the buffer.

withStreamBasket :: MonadBase IO m => OutBasket a -> (Stream (Of a) m () -> r) -> r Source #

Read the output of a buffer into a stream.

Since: 0.2.0.0

withMergedStreams :: (MonadMask m, MonadBaseControl IO m, MonadBase IO n, Foldable t) => Buffer a -> t (Stream (Of a) m v) -> (Stream (Of a) n () -> m r) -> m r Source #

Concurrently merge multiple streams together.

The resulting order is unspecified.

Note that the monad of the resultant Stream can be different from the final result.

Since: 0.2.0.0

Mapping

These functions provide (concurrency-based rather than parallelism-based) pseudo-equivalents to parMap.

Note however that in practice, these seem to be no better than - and indeed often worse - than using map and mapM. A benchmarking suite is available with this library that tries to compare different scenarios.

These implementations try to be relatively conservative in terms of memory usage; it is possible to get better performance by using an unbounded Buffer but if you feed elements into a Buffer much faster than you can consume them then memory usage will increase.

The "Primitives" available below can assist you with defining your own custom mapping function in conjunction with withBufferedTransform.

withStreamMap Source #

Arguments

:: (MonadMask m, MonadBaseControl IO m, MonadBase IO n) 
=> Int

How many concurrent computations to run.

-> (a -> b) 
-> Stream (Of a) m i 
-> (Stream (Of b) n () -> m r) 
-> m r 

Concurrently map a function over all elements of a Stream.

Note: ordering of elements in the output is undeterministic.

Since: 0.2.0.0

withStreamMapM Source #

Arguments

:: (MonadMask m, MonadBaseControl IO m, MonadBase IO n) 
=> Int

How many concurrent computations to run.

-> (a -> m b) 
-> Stream (Of a) m i 
-> (Stream (Of b) n () -> m r) 
-> m r 

Concurrently map a monadic function over all elements of a Stream.

Note: ordering of elements in the output is undeterministic.

Since: 0.2.0.0

withStreamTransform Source #

Arguments

:: (MonadMask m, MonadBaseControl IO m, MonadBase IO n) 
=> Int

How many concurrent computations to run.

-> (Stream (Of a) m () -> Stream (Of b) m t) 
-> Stream (Of a) m i 
-> (Stream (Of b) n () -> m r) 
-> m r 

Concurrently split the provided stream into n streams and transform them all using the provided function.

Note: ordering of elements in the output is undeterministic.

Since: 0.2.0.0

Primitives

joinBuffers :: MonadBase IO m => (a -> b) -> OutBasket a -> InBasket b -> m () Source #

Take an item out of one Buffer, apply a function to it and then place it into another 'Buffer.

Since: 0.3.1.0

joinBuffersM :: MonadBase IO m => (a -> m b) -> OutBasket a -> InBasket b -> m () Source #

As with joinBuffers but apply a monadic function.

Since: 0.3.1.0

joinBuffersStream :: MonadBase IO m => (Stream (Of a) m () -> Stream (Of b) m t) -> OutBasket a -> InBasket b -> m () Source #

As with joinBuffers but read and write the values as Streams.

Since: 0.3.1.0