unliftio-messagebox-2.0.0: Fast and robust message queues for concurrent processes
Safe HaskellNone
LanguageHaskell2010

UnliftIO.MessageBox

Description

Fast and robust message queues for concurrent processes.

Processes of an application can exchange message using Message Boxes.

This library is meant to be a wrapper around a well tested and benchmarked subset of unagi-chan for applications using unliftio.

In addition to the basic functionality, i.e. _Message Boxes_, there is a very little bit of type level magic dust in UnliftIO.MessageBox.Command that helps to write code that sends a message and expects the receiving process to send a reply.

This module re-exports most of the library.

Synopsis

Documentation

class IsInput input where Source #

A type class for input types. A common interface for delivering messages.

Minimal complete definition

deliver

Methods

deliver :: MonadUnliftIO m => input a -> a -> m Bool Source #

Send a message. Take whatever time it takes. Depending on the implementation, this might be a non-blocking operation. Return if the operation was successful.

NOTE: False may sporadically be returned, especially when there is a lot of load, so please make sure to build your application in such a way, that it anticipates failure.

deliver_ :: MonadUnliftIO m => input a -> a -> m () Source #

See deliver but with () as return value. If deliver fails, it fails silently.

Instances

Instances details
IsInput UnlimitedBoxInput Source #

A blocking instance that invokes deliver.

Instance details

Defined in UnliftIO.MessageBox.Unlimited

IsInput WaitingInput Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

Methods

deliver :: MonadUnliftIO m => WaitingInput a -> a -> m Bool Source #

deliver_ :: MonadUnliftIO m => WaitingInput a -> a -> m () Source #

IsInput NonBlockingInput Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

IsInput BlockingInput Source #

A blocking instance that invokes deliver.

Instance details

Defined in UnliftIO.MessageBox.Limited

Methods

deliver :: MonadUnliftIO m => BlockingInput a -> a -> m Bool Source #

deliver_ :: MonadUnliftIO m => BlockingInput a -> a -> m () Source #

IsInput i => IsInput (CatchAllInput i) Source # 
Instance details

Defined in UnliftIO.MessageBox.CatchAll

Methods

deliver :: MonadUnliftIO m => CatchAllInput i a -> a -> m Bool Source #

deliver_ :: MonadUnliftIO m => CatchAllInput i a -> a -> m () Source #

class IsInput (Input box) => IsMessageBox box where Source #

A type class for msgBox types. A common interface for receiving messages.

Minimal complete definition

receive, tryReceive, newInput

Associated Types

type Input box :: Type -> Type Source #

Type of the corresponding input

Methods

receive :: MonadUnliftIO m => box a -> m (Maybe a) Source #

Receive a message. Take whatever time it takes. Return Just the value or Nothing when an error occurred.

NOTE: Nothing may sporadically be returned, especially when there is a lot of load, so please make sure to build your application in such a way, that it anticipates failure.

tryReceive :: MonadUnliftIO m => box a -> m (Future a) Source #

Return a Future that can be used to wait for the arrival of the next message. NOTE: Each future value represents the next slot in the queue so one future corresponds to exactly that message (should it arrive) and if that future value is dropped, that message will be lost!

receiveAfter Source #

Arguments

:: MonadUnliftIO m 
=> box a

Message box

-> Int

Time in micro seconds to wait until the action is invoked.

-> m (Maybe a) 

Wait for an incoming message or return Nothing.

The default implementation uses tryReceive to get a Future on which awaitFuture inside a timeout is called.

Instances might override this with more performant implementations especially non-blocking Unagi channel based implementation.

NOTE: Nothing may sporadically be returned, especially when there is a lot of load, so please make sure to build your application in such a way, that it anticipates failure.

newInput :: MonadUnliftIO m => box a -> m (Input box a) Source #

Create a new input that enqueus messages, which are received by the box

Instances

Instances details
IsMessageBox UnlimitedBox Source #

A blocking instance that invokes receive.

Instance details

Defined in UnliftIO.MessageBox.Unlimited

Associated Types

type Input UnlimitedBox :: Type -> Type Source #

IsMessageBox WaitingBox Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

Associated Types

type Input WaitingBox :: Type -> Type Source #

IsMessageBox NonBlockingBox Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

Associated Types

type Input NonBlockingBox :: Type -> Type Source #

IsMessageBox BlockingBox Source #

A blocking instance that invokes receive.

Instance details

Defined in UnliftIO.MessageBox.Limited

Associated Types

type Input BlockingBox :: Type -> Type Source #

IsMessageBox box => IsMessageBox (CatchAllBox box) Source # 
Instance details

Defined in UnliftIO.MessageBox.CatchAll

Associated Types

type Input (CatchAllBox box) :: Type -> Type Source #

class (IsMessageBox (MessageBox argument), IsInput (Input (MessageBox argument))) => IsMessageBoxArg argument where Source #

Types that configure and allow the creation of a MessageBox.

Create IsMessageBox instances from a parameter. Types that determine MessageBox values.

For a limited message box this might be the limit of the message queue.

Associated Types

type MessageBox argument :: Type -> Type Source #

The message box that can be created from the message box argument

Methods

getConfiguredMessageLimit :: argument -> Maybe Int Source #

Return a message limit.

NOTE: This method was added for unit tests. Although the method is totally valid, it might not be super useful in production code. Also note that the naming follows the rule: Reserve short names for entities that are used often.

newMessageBox :: MonadUnliftIO m => argument -> m (MessageBox argument a) Source #

Create a new msgBox according to the argument. This is required to receive a message. NOTE: Only one process may receive on an msgBox.

Instances

Instances details
IsMessageBoxArg BlockingUnlimited Source # 
Instance details

Defined in UnliftIO.MessageBox.Unlimited

Associated Types

type MessageBox BlockingUnlimited :: Type -> Type Source #

IsMessageBoxArg WaitingBoxLimit Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

Associated Types

type MessageBox WaitingBoxLimit :: Type -> Type Source #

IsMessageBoxArg NonBlockingBoxLimit Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

Associated Types

type MessageBox NonBlockingBoxLimit :: Type -> Type Source #

IsMessageBoxArg BlockingBoxLimit Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

Associated Types

type MessageBox BlockingBoxLimit :: Type -> Type Source #

IsMessageBoxArg cfg => IsMessageBoxArg (CatchAllArg cfg) Source # 
Instance details

Defined in UnliftIO.MessageBox.CatchAll

Associated Types

type MessageBox (CatchAllArg cfg) :: Type -> Type Source #

handleMessage :: (MonadUnliftIO m, IsMessageBox box) => box message -> (message -> m b) -> m (Maybe b) Source #

Receive a message and apply a function to it.

data WaitingInput a Source #

An input for a BlockingBox that will block for not much more than the given timeout when the message box is full.

Constructors

WaitingInput !Int !(BlockingInput a) 

Instances

Instances details
IsInput WaitingInput Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

Methods

deliver :: MonadUnliftIO m => WaitingInput a -> a -> m Bool Source #

deliver_ :: MonadUnliftIO m => WaitingInput a -> a -> m () Source #

data WaitingBox a Source #

A BlockingBox an a WaitingBoxLimit for the IsMessageBox instance.

Instances

Instances details
IsMessageBox WaitingBox Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

Associated Types

type Input WaitingBox :: Type -> Type Source #

type Input WaitingBox Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

data WaitingBoxLimit Source #

A IsMessageBoxArg instance wrapping the BlockingBox with independently configurable timeouts for receive and deliver.

newtype NonBlockingInput a Source #

A wrapper around BlockingInput with a non-blocking IsInput instance.

deliver will enqueue the message or return False immediately, if the message box already contains more messages than it's limit allows.

Instances

Instances details
IsInput NonBlockingInput Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

data NonBlockingBox a Source #

A BlockingBox wrapper for non-blocking IsMessageBox instances.

The difference to the BlockingBox instance is that deliver immediately returns if the message box limit is surpassed.

newtype NonBlockingBoxLimit Source #

A BlockingBoxLimit wrapper for non-blocking IsMessageBoxArg instances.

Instances

Instances details
Eq NonBlockingBoxLimit Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

Ord NonBlockingBoxLimit Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

Show NonBlockingBoxLimit Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

IsMessageBoxArg NonBlockingBoxLimit Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

Associated Types

type MessageBox NonBlockingBoxLimit :: Type -> Type Source #

type MessageBox NonBlockingBoxLimit Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

data BlockingInput a Source #

A message queue into which messages can be enqued by, e.g. tryToDeliver. Messages can be received from an BlockingBox.

The Input is the counter part of a BlockingBox.

Instances

Instances details
IsInput BlockingInput Source #

A blocking instance that invokes deliver.

Instance details

Defined in UnliftIO.MessageBox.Limited

Methods

deliver :: MonadUnliftIO m => BlockingInput a -> a -> m Bool Source #

deliver_ :: MonadUnliftIO m => BlockingInput a -> a -> m () Source #

data BlockingBox a Source #

A message queue out of which messages can by received.

This is the counter part of Input. Can be used for reading messages.

Messages can be received by receive or tryReceive.

Instances

Instances details
IsMessageBox BlockingBox Source #

A blocking instance that invokes receive.

Instance details

Defined in UnliftIO.MessageBox.Limited

Associated Types

type Input BlockingBox :: Type -> Type Source #

type Input BlockingBox Source # 
Instance details

Defined in UnliftIO.MessageBox.Limited

newtype BlockingBoxLimit Source #

Contains the (vague) limit of messages that a BlockingBox can buffer, i.e. that deliver can put into a BlockingInput of a BlockingBox.

data MessageLimit Source #

Message Limit

The message limit must be a reasonable small positive integer that is also a power of two. This stems from the fact that Unagi is used under the hood.

The limit is a lower bound.

messageLimitToInt :: MessageLimit -> Int Source #

Convert a MessageLimit to the Int representation.

data UnlimitedBoxInput a Source #

A message queue into which messages can be enqued by, e.g. deliver. Messages can be received from an UnlimitedBox.

The UnlimitedBoxInput is the counter part of a UnlimitedBox.

Instances

Instances details
IsInput UnlimitedBoxInput Source #

A blocking instance that invokes deliver.

Instance details

Defined in UnliftIO.MessageBox.Unlimited

data UnlimitedBox a Source #

A message queue out of which messages can by received.

This is the counter part of Input. Can be used for reading messages.

Messages can be received by receive or tryReceive.

Instances

Instances details
IsMessageBox UnlimitedBox Source #

A blocking instance that invokes receive.

Instance details

Defined in UnliftIO.MessageBox.Unlimited

Associated Types

type Input UnlimitedBox :: Type -> Type Source #

type Input UnlimitedBox Source # 
Instance details

Defined in UnliftIO.MessageBox.Unlimited

newtype CatchAllInput i a Source #

A wrapper around values that are instances of IsInput.

Constructors

CatchAllInput (i a) 

Instances

Instances details
IsInput i => IsInput (CatchAllInput i) Source # 
Instance details

Defined in UnliftIO.MessageBox.CatchAll

Methods

deliver :: MonadUnliftIO m => CatchAllInput i a -> a -> m Bool Source #

deliver_ :: MonadUnliftIO m => CatchAllInput i a -> a -> m () Source #

newtype CatchAllBox box a Source #

A wrapper around values that are instances of IsMessageBox.

The Input type will be wrapped using CatchAllInput.

Constructors

CatchAllBox (box a) 

Instances

Instances details
IsMessageBox box => IsMessageBox (CatchAllBox box) Source # 
Instance details

Defined in UnliftIO.MessageBox.CatchAll

Associated Types

type Input (CatchAllBox box) :: Type -> Type Source #

type Input (CatchAllBox box) Source # 
Instance details

Defined in UnliftIO.MessageBox.CatchAll

newtype CatchAllArg cfg Source #

A wrapper around values that are instances of IsMessageBoxArg. The factory wraps the result of the delegated newMessageBox invocation into a CatchAllBox.

Constructors

CatchAllArg cfg 

Instances

Instances details
Eq cfg => Eq (CatchAllArg cfg) Source # 
Instance details

Defined in UnliftIO.MessageBox.CatchAll

Methods

(==) :: CatchAllArg cfg -> CatchAllArg cfg -> Bool #

(/=) :: CatchAllArg cfg -> CatchAllArg cfg -> Bool #

Ord cfg => Ord (CatchAllArg cfg) Source # 
Instance details

Defined in UnliftIO.MessageBox.CatchAll

Methods

compare :: CatchAllArg cfg -> CatchAllArg cfg -> Ordering #

(<) :: CatchAllArg cfg -> CatchAllArg cfg -> Bool #

(<=) :: CatchAllArg cfg -> CatchAllArg cfg -> Bool #

(>) :: CatchAllArg cfg -> CatchAllArg cfg -> Bool #

(>=) :: CatchAllArg cfg -> CatchAllArg cfg -> Bool #

max :: CatchAllArg cfg -> CatchAllArg cfg -> CatchAllArg cfg #

min :: CatchAllArg cfg -> CatchAllArg cfg -> CatchAllArg cfg #

Show cfg => Show (CatchAllArg cfg) Source # 
Instance details

Defined in UnliftIO.MessageBox.CatchAll

Methods

showsPrec :: Int -> CatchAllArg cfg -> ShowS #

show :: CatchAllArg cfg -> String #

showList :: [CatchAllArg cfg] -> ShowS #

IsMessageBoxArg cfg => IsMessageBoxArg (CatchAllArg cfg) Source # 
Instance details

Defined in UnliftIO.MessageBox.CatchAll

Associated Types

type MessageBox (CatchAllArg cfg) :: Type -> Type Source #

type MessageBox (CatchAllArg cfg) Source # 
Instance details

Defined in UnliftIO.MessageBox.CatchAll

data AsyncReply r Source #

The result of callAsync. Use waitForReply or tryTakeReply.

Instances

Instances details
Typeable r => Show (AsyncReply r) Source # 
Instance details

Defined in UnliftIO.MessageBox.Command

data CommandError where Source #

The failures that the receiver of a Return Command, i.e. a Blocking, can communicate to the caller, in order to indicate that processing a request did not or will not lead to the result the caller is blocked waiting for.

Constructors

CouldNotEnqueueCommand :: !CallId -> CommandError

Failed to enqueue a Blocking Command Message into the corresponding Input

BlockingCommandFailure :: !CallId -> CommandError

The request has failed for reasons.

BlockingCommandTimedOut :: !CallId -> CommandError

Timeout waiting for the result.

Instances

Instances details
Eq CommandError Source # 
Instance details

Defined in UnliftIO.MessageBox.Command

Show CommandError Source # 
Instance details

Defined in UnliftIO.MessageBox.Command

data ReplyBox a Source #

This is like Input, it can be used by the receiver of a Blocking to either send a reply using reply or to fail/abort the request using sendRequestError

data Message apiTag where Source #

A message valid for some user defined apiTag.

The apiTag tag (phantom-) type defines the messages allowed here, declared by the instance of Command for apiTag.

Constructors

Blocking :: Show (Command apiTag ('Return result)) => Command apiTag ('Return result) -> ReplyBox result -> Message apiTag

Wraps a Command with a ReturnType of Return result.

Such a message can formed by using call.

A Blocking contains a ReplyBox that can be used to send the reply to the other process blocking on call

NonBlocking :: Show (Command apiTag 'FireAndForget) => Command apiTag 'FireAndForget -> Message apiTag

If the Command has a ReturnType of FireAndForget it has fire-and-forget semantics.

The smart constructor cast can be used to this message.

Instances

Instances details
Show (Message apiTag) Source # 
Instance details

Defined in UnliftIO.MessageBox.Command

Methods

showsPrec :: Int -> Message apiTag -> ShowS #

show :: Message apiTag -> String #

showList :: [Message apiTag] -> ShowS #

data ReturnType where Source #

Indicates if a Command requires the receiver to send a reply or not.

Constructors

FireAndForget :: ReturnType

Indicates that a Command value is sent _one-way_.

Values of a Command instance with FireAndForget as second parameter indicate that the sender should not expect any direct answer from the recepient.

Return :: Type -> ReturnType

Indicates that a Command value requires the receiver to send a reply of the given type.

Values of a Command instance with Return as second parameter are received wrapped into a Blocking.

data family Command apiTag :: ReturnType -> Type Source #

This family allows to encode imperative commands.

The clauses of a Command define the commands that a process should execute.

Every clause may specify an individual ReturnType that declares if and what response is valid for a message.

For example:

type LampId = Int

data instance Command LightControl r where
  GetLamps :: Command LigthControl (Return [LampId])
  SwitchOn :: LampId -> Command LigthControl FireAndForget

data LightControl -- the phantom type

The type index of the Command family is the uninhabited LightControl type. .

The second type parameter indicates if a message requires the receiver to send a reply back to the blocked and waiting sender, or if no reply is necessary.

cast :: (MonadUnliftIO m, IsInput o, Show (Command apiTag 'FireAndForget)) => o (Message apiTag) -> Command apiTag 'FireAndForget -> m Bool Source #

Enqueue a NonBlocking Message into an Input. This is just for symetry to call, this is equivalent to: input -> MessageBox.tryToDeliver input . NonBlocking

The

call :: (HasCallIdCounter env, MonadReader env m, MonadUnliftIO m, IsInput input, Show (Command apiTag ('Return result))) => input (Message apiTag) -> Command apiTag ('Return result) -> Int -> m (Either CommandError result) Source #

Enqueue a Blocking Message into an IsInput and wait for the response.

If message delivery failed, return Left CouldNotEnqueueCommand.

If no reply was given by the receiving process (using replyTo) within a given duration, return Left BlockingCommandTimedOut.

Important: The given timeout starts after deliver has returned, if deliver blocks and delays, call might take longer than the specified timeout.

The receiving process can either delegate the call using delegateCall or reply to the call by using: replyTo.

replyTo :: MonadUnliftIO m => ReplyBox a -> a -> m () Source #

This is called from the callback contained in the Blocking Message.

When handling a Blocking Message the ReplyBox contained in the message contains the TMVar for the result, and this function puts the result into it.

delegateCall :: (MonadUnliftIO m, IsInput o, Show (Command apiTag ('Return r))) => o (Message apiTag) -> Command apiTag ('Return r) -> ReplyBox r -> m Bool Source #

Pass on the call to another process.

Used to implement dispatcher processes.

Returns True if the deliver operation was successful.

callAsync :: (HasCallIdCounter env, MonadReader env m, MonadUnliftIO m, IsInput o, Show (Command apiTag ('Return result))) => o (Message apiTag) -> Command apiTag ('Return result) -> m (Maybe (AsyncReply result)) Source #

Enqueue a Blocking Message into an IsInput.

If the call to deliver fails, return Nothing otherwise Just the AsyncReply.

The receiving process must use replyTo with the ReplyBox received along side the Command in the Blocking.

waitForReply Source #

Arguments

:: MonadUnliftIO m 
=> Int

The time in micro seconds to wait before returning Left BlockingCommandTimedOut

-> AsyncReply result 
-> m (Either CommandError result) 

Wait for the reply of a Blocking Message sent by callAsync.

tryTakeReply :: MonadUnliftIO m => AsyncReply result -> m (Maybe (Either CommandError result)) Source #

If a reply for an callAsync operation is available return it, otherwise return Nothing.

class HasCallIdCounter env where Source #

Class of environment records containing a CounterVar for CallIds.

newtype CallId Source #

An identifier value every command send by calls.

Constructors

MkCallId Int 

Instances

Instances details
Eq CallId Source # 
Instance details

Defined in UnliftIO.MessageBox.Util.CallId

Methods

(==) :: CallId -> CallId -> Bool #

(/=) :: CallId -> CallId -> Bool #

Ord CallId Source # 
Instance details

Defined in UnliftIO.MessageBox.Util.CallId

Show CallId Source # 
Instance details

Defined in UnliftIO.MessageBox.Util.CallId

HasCallIdCounter (CounterVar CallId) Source # 
Instance details

Defined in UnliftIO.MessageBox.Util.CallId

takeNext :: (MonadReader env m, HasCallIdCounter env, MonadUnliftIO m) => m CallId Source #

Increment and get a new CallId.

class HasCounterVar a env | env -> a where Source #

A type class for MonadReader based applications.

Methods

getCounterVar :: env -> CounterVar a Source #

Instances

Instances details
HasCounterVar (t :: k) (CounterVar t) Source # 
Instance details

Defined in UnliftIO.MessageBox.Util.Fresh

fresh :: forall a env m. (MonadReader env m, MonadIO m, HasCounterVar a env, Coercible a Int) => m a Source #

A threadsafe atomic a

Atomically increment and get the value of the Counter for type a that must be present in the env.

incrementAndGet :: forall a m. (MonadIO m, Coercible a Int) => CounterVar a -> m a Source #

Atomically increment and get the value of the Counter for type a that must be present in the env.

newCounterVar :: forall a m. MonadIO m => m (CounterVar a) Source #

Create a new CounterVar starting at 0.

newtype Future a Source #

A wrapper around an IO action that returns value in the future.

Constructors

Future (IO (Maybe a)) 

tryNow :: MonadUnliftIO m => Future a -> m (Maybe a) Source #

Return Just the value or Nothing, when the value is not available yet.

Once the value is available, that value will be returned everytime this function is invoked.

awaitFuture :: MonadUnliftIO m => Future b -> m b Source #

Poll a Future until the value is present.