streaming-concurrency-0.1.0.0: Concurrency support for the streaming ecosystem

CopyrightIvan Lazar Miljenovic
LicenseMIT
MaintainerIvan.Miljenovic@gmail.com
Safe HaskellNone
LanguageHaskell2010

Streaming.Concurrent.Lifted

Contents

Description

This module defines variants of those in Streaming.Concurrent for use with the Withable class, found in the streaming-with package.

Synopsis

Buffers

data Buffer a Source #

Buffer specifies how to buffer messages between our InBasket and our OutBasket.

unbounded :: Buffer a Source #

Store an unbounded number of messages in a FIFO queue.

bounded :: Int -> Buffer a Source #

Store a bounded number of messages, specified by the Int argument.

A buffer size <= 0 will result in a permanently empty buffer, which could result in a system that hang.

latest :: a -> Buffer a Source #

Only store the "latest" message, beginning with an initial value.

This buffer is never empty nor full; as such, it is up to the caller to ensure they only take as many values as they need (e.g. using print . readStreamBasket as the final parameter to withBuffer will -- after all other values are processed -- keep printing the last value over and over again).

newest :: Int -> Buffer a Source #

Like bounded, but sendMsg never fails (the buffer is never full). Instead, old elements are discard to make room for new elements.

As with bounded, providing a size <= 0 will result in no values being provided to the buffer, thus no values being read and hence the system will most likely hang.

Using a buffer

withBuffer :: (Withable w, MonadBaseControl IO (WithMonad w)) => Buffer a -> (InBasket a -> WithMonad w i) -> w (OutBasket a) Source #

Use a buffer to asynchronously communicate.

Two functions are taken as parameters:

  • How to provide input to the buffer (the result of this is discarded)
  • How to take values from the buffer

As soon as one function indicates that it is complete then the other is terminated. This is safe: trying to write data to a closed buffer will not achieve anything.

However, reading a buffer that has not indicated that it is closed (e.g. waiting on an action to complete to be able to provide the next value) but contains no values will block.

newtype InBasket a Source #

An exhaustible sink of values.

sendMsg returns False if the sink is exhausted.

Constructors

InBasket 

Fields

newtype OutBasket a Source #

An exhaustible source of values.

receiveMsg returns Nothing if the source is exhausted.

Constructors

OutBasket 

Fields

Stream support

writeStreamBasket :: (Withable w, MonadBase IO (WithMonad w)) => Stream (Of a) (WithMonad w) r -> InBasket a -> w () Source #

Write a single stream to a buffer.

Type written to make it easier if this is the only stream being written to the buffer.

readStreamBasket :: (Withable w, MonadBase IO m) => OutBasket a -> w (Stream (Of a) m ()) Source #

Read the output of a buffer into a stream.

Note that there is no requirement that m ~ WithMonad w.

mergeStreams :: (Withable w, MonadBaseControl IO (WithMonad w), MonadBase IO m, Foldable t) => Buffer a -> t (Stream (Of a) (WithMonad w) v) -> w (Stream (Of a) m ()) Source #

Concurrently merge multiple streams together.

The resulting order is unspecified.

ByteString support