patterns-0.0.3: Common patterns in message-oriented applications

Safe HaskellSafe-Infered

Network.Mom.Patterns.Device

Contents

Synopsis

Device Services

withDevice :: Context -> String -> Parameter -> Timeout -> [PollEntry] -> InBound o -> OutBound o -> OnError_ -> (Parameter -> OnTimeout) -> (Parameter -> Transformer o) -> (Service -> IO a) -> IO aSource

Starts a device and executes an action that receives a Service to control the device

Parameters:

  • Context - The ZMQ context
  • String - The device name
  • Parameter - The initial value of the control parameter
  • Timeout - The polling timeout: < 0 - listens eternally, 0 - returns immediately, > 0 - timeout in microseconds; when the timeout expires, the OnTimeout action is invoked.
  • PollEntry - List of PollEntry; the device will polll over all list members and direct streams to a subset of this list determined by the stream transformer.
  • InBound - in-bound converter; the stream is presented to the transformer as chunks of type o.
  • OutBound - out-bound converter
  • OnError_ - Error handler
  • Parameter -> OnTimeout - Action to perform on timeout
  • Parameter -> Transformer - The stream transformer
  • Service -> IO () - The action to invoke, when the device has been started; The Service is used to control the device.

withQueue :: Context -> String -> (AccessPoint, LinkType) -> (AccessPoint, LinkType) -> OnError_ -> (Service -> IO a) -> IO aSource

Starts a queue; a queue connects clients with a dealer (XDealer), i.e. a load balancer for requests, and servers with a router (XRouter) that routes responses back to the client.

Parameters:

  • Context: the ZMQ Context
  • String: the queue name
  • (AccessPoint, LinkType): the access point of the dealer (XDealer) and its link type; you usually want to bind the dealer so that many clients can connect to it.
  • (AccessPoint, LinkType): the access point of the router (XRouter); and its link type; you usually want to bind the router so that many servers can connect to it.
  • OnError_: the error handler
  • Service -> IO (): the action to run

withQueue is implemented by means of withDevice as:

  
      withQueue ctx name (dealer, ld) (router, lr) onerr act = 
        withDevice ctx name noparam (-1)
              [pollEntry "clients" XDealer dealer ld [],
               pollEntry "server"  XRouter router lr []]
              return return onerr (_ -> return ()) (_ -> putThrough) act

withForwarder :: Context -> String -> [Topic] -> (AccessPoint, LinkType) -> (AccessPoint, LinkType) -> OnError_ -> (Service -> IO a) -> IO aSource

Starts a Forwarder; a forwarder connects a publisher and its subscribers. Note that the forwarder uses a subscriber (XSub) to conntect to the publisher and a publisher (XPub) to bind the subscribers.

Parameters:

  • Context: the ZMQ Context
  • String: the forwarder name
  • Topic: the subscription topic
  • (AccessPoint, AccessPoint): the access points; the first is the subscriber (XSub), the second is the publisher (XPub); this rule is not enforced by the type system; you have to take care of it on your own!
  • OnError_: the error handler
  • Service -> IO (): the action to run

withForwarder is implemented by means of withDevice as:

  
      withForwarder ctx name topics (sub, pub) onerr act = 
        withDevice ctx name noparam (-1)
              [pollEntry "subscriber" XSub router Connect topics,
               pollEntry "publisher"  XPub dealer Bind    []]
              return return onerr (_ -> return ()) (_ -> putThrough) act

withPipeline :: Context -> String -> (AccessPoint, LinkType) -> (AccessPoint, LinkType) -> OnError_ -> (Service -> IO a) -> IO aSource

Starts a pipeline; a pipeline connects a pipe and its workers. Note that the pipeline uses a puller (XPull) to conntect to the pipe and a pipe (XPipe) to bind the pullers.

Parameters:

  • Context: the ZMQ Context
  • String: the pipeline name
  • (AccessPoint, LinkType): the access point of the puller (XPull) and its link type; you usually want to connect the puller to one pipe so that it appears as one puller among others, to which the pipe may send jobs.
  • (AccessPoint, LinkType): the access point of the pipe (XPipe); and its link type; you usually want to bind the pipe so that many pullers can connect to it.
  • OnError_: the error handler
  • Service -> IO (): the action to run

withPipeline is implemented by means of withDevice as:

  
      withPipeline ctx name topics (puller, l1) (pusher, l2) onerr act =
        withDevice ctx name noparam (-1)
              [pollEntry "pull"  XPull puller l1 [],
               pollEntry "push"  XPush pusher l2 []]
              return return onerr (_ -> return ()) (_ -> putThrough) act

Polling

data PollEntry Source

A poll entry describes how to handle an AccessPoint

pollEntry :: Identifier -> AccessType -> AccessPoint -> LinkType -> [Topic] -> PollEntrySource

Creates a PollEntry;

Parameters:

Access Types

data AccessType Source

Defines the type of a PollEntry; the names of the constructors are similar to ZMQ socket types but with some differences to keep the terminology in line with basic patterns. The leading "X" stands for "Access" (not for "eXtended" as in XRep and XReq).

Constructors

XServer

Represents a server and expects connections from clients; should be used with Bind; corresponds to ZMQ Socket Type Rep

XClient

Represents a client and connects to a server; should be used with Connect; corresponds to ZMQ Socket Type Req

XDealer

Represents a load balancer, expecting connections from clients; should be used with Bind; corresponds to ZMQ Socket Type XRep

XRouter

Represents a router expecting connections from servers; should be used with Bind; corresponds to ZMQ Socket Type XReq

XPub

Represents a publisher; should be used with Bind; corresponds to ZMQ Socket Type Pub

XSub

Represents a subscriber; should be used with Connect; corresponds to ZMQ Socket Type Sub

XPipe

Represents a Pipe; should be used with Bind; corresponds to ZMQ Socket Type Push

XPull

Represents a Puller; should be used with Connect; corresponds to ZMQ Socket Type Pull

XPeer

Represents a Peer; corresponding peers must use complementing LinkType; corresponds to ZMQ Socket Type Pair

Device Service Commands

addDevice :: Service -> PollEntry -> IO ()Source

Adds a PollEntry to a device; the Service, of course, must be a device, the command is otherwise ignored.

remDevice :: Service -> Identifier -> IO ()Source

Removes a PollEntry from a device; the Service, of course, must be a device, the command is otherwise ignored.

changeTimeout :: Service -> Timeout -> IO ()Source

Changes the timeout of a device; the Service, of course, must be a device, the command is otherwise ignored.

Streamer

data Streamer o Source

Holds information on streams and the current state of the device; streamers are passed to transformers.

getStreamSource :: Streamer o -> IdentifierSource

Retrieves the identifier of the source of the current stream

filterTargets :: Streamer o -> (Identifier -> Bool) -> [Identifier]Source

Filters target streams; the function resembles filter of List: it receives the property of an Identifier; if a PollEntry has this property, it is added to the result set.

The function is intended to select targets for an out-going stream, typically based on the identifier of the source stream. The following example selects all poll entries, but the source:

     broadcast :: Streamer o -> [Identifier]
     broadcast s = filterTargets s notSource
       where notSource = (/=) (getStreamSource s)

Transformer

type Transformer o = Streamer o -> Seq o -> Iteratee o IO ()Source

A transformer is an Iteratee to transform streams. It receives two arguments:

  • a Streamer which provides information on access points;
  • a Sequence which may be used to store chunks of an incoming stream before they are sent to the target.

Streamer and sequence keep track of the current transformation. The streamer knows where the stream comes from and may be queried about other streams in the device.

putThrough :: Transformer aSource

Transformer that passes messages one-to-one to all poll entries but the current source

ignoreStream :: Transformer aSource

Transformer that ignores the remainder of the current stream; it is usually used to terminate a transformer.

continueHere :: Transformer aSource

Transformer that does nothing but continuing the transformer, from which it is called and, hence, is identical to return (); it is usually passed to a transformer combinator, like emit, to continue processing right here instead of recursing into another transformer.

Transformer Combinators

The following functions are building blocks for defining transformers. The building blocks operate on sequences, stream targets and transformers. They manipulate streams, send them to targets and enter a transformer.

emit :: Streamer o -> [Identifier] -> Seq o -> Transformer o -> Iteratee o IO ()Source

Sends all sequence elements to the targets identified by the list of Identifier and terminates the outgoing stream. The transformation continues with the transformer passed in and an empty sequence.

emitPart :: Streamer o -> [Identifier] -> Seq o -> Transformer o -> Iteratee o IO ()Source

Sends all sequence elements to the targets identified by the list of Identifier, but unlike emit, does not terminate the outgoing stream. The transformation continues with the transformer passed in and an empty sequence.

Note that all outgoing streams, once started, have to be terminated before the transformer ends. Otherwise, a protocol error will occur.

pass :: Streamer o -> [Identifier] -> o -> Bool -> Transformer o -> Iteratee o IO ()Source

Sends one element (o) to the targets and continues with an empty sequence; the Boolean parameter determines whether this is the last message to send.

Note that all outgoing streams, once started, have to be terminated before the transformer ends. Otherwise, a protocol error will occur.

passBy :: Streamer o -> [Identifier] -> o -> Seq o -> Transformer o -> Iteratee o IO ()Source

Sends one element (o) to the targets, but, unlike pass, passes the sequence to the transformer. passBy does not terminate the outgoing stream.

end :: Streamer o -> [Identifier] -> o -> Iteratee o IO ()Source

Terminates the outgoing stream by sending the new element as last segment to all targets and ends the transformer by ignoring the rest of the incoming stream.

absorb :: Streamer o -> o -> Seq o -> Transformer o -> Iteratee o IO ()Source

Adds a new element to the sequence and calls the transformer without sending anything

merge :: Monoid o => Streamer o -> o -> Seq o -> Transformer o -> Iteratee o IO ()Source

Merges the new element with the last element of the sequence; if the sequence is currently empty, the new element will be its only member. Merged elements appear as one element of the sequence in the continuation of the transformation. The type o must be a Monoid, i.e., it must implement mappend and mempty. The function does not send anything.

Helpers

type Identifier = StringSource

A device identifier is just a plain String

type OnTimeout = IO ()Source

A timeout action is just an IO action without arguments