Safe Haskell | Safe-Infered |
---|
- withDevice :: Context -> String -> Parameter -> Timeout -> [PollEntry] -> InBound o -> OutBound o -> OnError_ -> (Parameter -> OnTimeout) -> (Parameter -> Transformer o) -> (Service -> IO a) -> IO a
- withQueue :: Context -> String -> (AccessPoint, LinkType) -> (AccessPoint, LinkType) -> OnError_ -> (Service -> IO a) -> IO a
- withForwarder :: Context -> String -> [Topic] -> (AccessPoint, LinkType) -> (AccessPoint, LinkType) -> OnError_ -> (Service -> IO a) -> IO a
- withPipeline :: Context -> String -> (AccessPoint, LinkType) -> (AccessPoint, LinkType) -> OnError_ -> (Service -> IO a) -> IO a
- data PollEntry
- pollEntry :: Identifier -> AccessType -> AccessPoint -> LinkType -> [Topic] -> PollEntry
- data AccessType
- addDevice :: Service -> PollEntry -> IO ()
- remDevice :: Service -> Identifier -> IO ()
- changeTimeout :: Service -> Timeout -> IO ()
- data Streamer o
- getStreamSource :: Streamer o -> Identifier
- filterTargets :: Streamer o -> (Identifier -> Bool) -> [Identifier]
- type Transformer o = Streamer o -> Seq o -> Iteratee o IO ()
- putThrough :: Transformer a
- ignoreStream :: Transformer a
- continueHere :: Transformer a
- emit :: Streamer o -> [Identifier] -> Seq o -> Transformer o -> Iteratee o IO ()
- emitPart :: Streamer o -> [Identifier] -> Seq o -> Transformer o -> Iteratee o IO ()
- pass :: Streamer o -> [Identifier] -> o -> Bool -> Transformer o -> Iteratee o IO ()
- passBy :: Streamer o -> [Identifier] -> o -> Seq o -> Transformer o -> Iteratee o IO ()
- end :: Streamer o -> [Identifier] -> o -> Iteratee o IO ()
- absorb :: Streamer o -> o -> Seq o -> Transformer o -> Iteratee o IO ()
- merge :: Monoid o => Streamer o -> o -> Seq o -> Transformer o -> Iteratee o IO ()
- type Identifier = String
- type OnTimeout = IO ()
Device Services
withDevice :: Context -> String -> Parameter -> Timeout -> [PollEntry] -> InBound o -> OutBound o -> OnError_ -> (Parameter -> OnTimeout) -> (Parameter -> Transformer o) -> (Service -> IO a) -> IO aSource
Starts a device and executes an action that receives a Service
to control the device
Parameters:
-
Context
- The ZMQ context -
String
- The device name -
Parameter
- The initial value of the control parameter -
Timeout
- The polling timeout: < 0 - listens eternally, 0 - returns immediately, > 0 - timeout in microseconds; when the timeout expires, theOnTimeout
action is invoked. -
PollEntry
- List ofPollEntry
; the device will polll over all list members and direct streams to a subset of this list determined by the stream transformer. -
InBound
- in-bound converter; the stream is presented to the transformer as chunks of type o. -
OutBound
- out-bound converter -
OnError_
- Error handler -
Parameter
->OnTimeout
- Action to perform on timeout -
Parameter
->Transformer
- The stream transformer -
Service
-> IO () - The action to invoke, when the device has been started; TheService
is used to control the device.
withQueue :: Context -> String -> (AccessPoint, LinkType) -> (AccessPoint, LinkType) -> OnError_ -> (Service -> IO a) -> IO aSource
Starts a queue;
a queue connects clients with a dealer (XDealer
),
i.e. a load balancer for requests,
and servers with a router (XRouter
) that routes responses
back to the client.
Parameters:
-
Context
: the ZMQ Context -
String
: the queue name - (
AccessPoint
,LinkType
): the access point of the dealer (XDealer
) and its link type; you usually want to bind the dealer so that many clients can connect to it. - (
AccessPoint
,LinkType
): the access point of the router (XRouter
); and its link type; you usually want to bind the router so that many servers can connect to it. -
OnError_
: the error handler -
Service
-> IO (): the action to run
withQueue
is implemented by means of withDevice
as:
withQueue ctx name (dealer, ld) (router, lr) onerr act = withDevice ctx name noparam (-1) [pollEntry "clients" XDealer dealer ld [], pollEntry "server" XRouter router lr []] return return onerr (_ -> return ()) (_ -> putThrough) act
withForwarder :: Context -> String -> [Topic] -> (AccessPoint, LinkType) -> (AccessPoint, LinkType) -> OnError_ -> (Service -> IO a) -> IO aSource
Starts a Forwarder;
a forwarder connects a publisher and its subscribers.
Note that the forwarder uses a subscriber (XSub
)
to conntect to the publisher and
a publisher (XPub
) to bind the subscribers.
Parameters:
-
Context
: the ZMQ Context -
String
: the forwarder name -
Topic
: the subscription topic - (
AccessPoint
,AccessPoint
): the access points; the first is the subscriber (XSub
), the second is the publisher (XPub
); this rule is not enforced by the type system; you have to take care of it on your own! -
OnError_
: the error handler -
Service
-> IO (): the action to run
withForwarder
is implemented by means of withDevice
as:
withForwarder ctx name topics (sub, pub) onerr act = withDevice ctx name noparam (-1) [pollEntry "subscriber" XSub router Connect topics, pollEntry "publisher" XPub dealer Bind []] return return onerr (_ -> return ()) (_ -> putThrough) act
withPipeline :: Context -> String -> (AccessPoint, LinkType) -> (AccessPoint, LinkType) -> OnError_ -> (Service -> IO a) -> IO aSource
Starts a pipeline;
a pipeline connects a pipe
and its workers.
Note that the pipeline uses a puller (XPull
)
to conntect to the pipe and
a pipe (XPipe
) to bind the pullers.
Parameters:
-
Context
: the ZMQ Context -
String
: the pipeline name - (
AccessPoint
,LinkType
): the access point of the puller (XPull
) and its link type; you usually want to connect the puller to one pipe so that it appears as one puller among others, to which the pipe may send jobs. - (
AccessPoint
,LinkType
): the access point of the pipe (XPipe
); and its link type; you usually want to bind the pipe so that many pullers can connect to it. -
OnError_
: the error handler -
Service
-> IO (): the action to run
withPipeline
is implemented by means of withDevice
as:
withPipeline ctx name topics (puller, l1) (pusher, l2) onerr act = withDevice ctx name noparam (-1) [pollEntry "pull" XPull puller l1 [], pollEntry "push" XPush pusher l2 []] return return onerr (_ -> return ()) (_ -> putThrough) act
Polling
A poll entry describes how to handle an AccessPoint
pollEntry :: Identifier -> AccessType -> AccessPoint -> LinkType -> [Topic] -> PollEntrySource
Creates a PollEntry
;
Parameters:
-
Identifier
: identifies anAccessPoint
; the identifier shall be unique within the device. -
AccessType
: theAccessType
of thisAccessPoint
-
AccessPoint
: theAccessPoint
-
LinkType
: how to link to thisAccessPoint
- [
Topic
]: The subscription topics - ignored for all poll entries, but those withAccessType
XSub
Access Types
data AccessType Source
Defines the type of a PollEntry
;
the names of the constructors are similar to ZMQ socket types
but with some differences to keep the terminology in line
with basic patterns.
The leading "X" stands for "Access"
(not for "eXtended" as in XRep and XReq).
XServer | Represents a server and expects connections from clients;
should be used with |
XClient | Represents a client and connects to a server;
should be used with |
XDealer | Represents a load balancer,
expecting connections from clients;
should be used with |
XRouter | Represents a router
expecting connections from servers;
should be used with |
XPub | Represents a publisher;
should be used with |
XSub | Represents a subscriber;
should be used with |
XPipe | Represents a Pipe;
should be used with |
XPull | Represents a Puller;
should be used with |
XPeer | Represents a Peer;
corresponding peers must use complementing |
Device Service Commands
changeTimeout :: Service -> Timeout -> IO ()Source
Changes the timeout of a device;
the Service
, of course, must be a device,
the command is otherwise ignored.
Streamer
Holds information on streams and the current state of the device; streamers are passed to transformers.
getStreamSource :: Streamer o -> IdentifierSource
Retrieves the identifier of the source of the current stream
filterTargets :: Streamer o -> (Identifier -> Bool) -> [Identifier]Source
Filters target streams;
the function resembles filter of List
:
it receives the property of an Identifier
;
if a PollEntry
has this property, it is added to the result set.
The function is intended to select targets for an out-going stream, typically based on the identifier of the source stream. The following example selects all poll entries, but the source:
broadcast :: Streamer o -> [Identifier] broadcast s = filterTargets s notSource where notSource = (/=) (getStreamSource s)
Transformer
type Transformer o = Streamer o -> Seq o -> Iteratee o IO ()Source
A transformer is an Iteratee
to transform streams.
It receives two arguments:
- a
Streamer
which provides information on access points; - a
Sequence
which may be used to store chunks of an incoming stream before they are sent to the target.
Streamer and sequence keep track of the current transformation. The streamer knows where the stream comes from and may be queried about other streams in the device.
putThrough :: Transformer aSource
Transformer that passes messages one-to-one to all poll entries but the current source
ignoreStream :: Transformer aSource
Transformer that ignores the remainder of the current stream; it is usually used to terminate a transformer.
continueHere :: Transformer aSource
Transformer that
does nothing but continuing the transformer, from which it is called
and, hence, is identical to return ();
it is usually passed to a transformer combinator,
like emit
, to continue processing right here
instead of recursing into another transformer.
Transformer Combinators
The following functions are building blocks for defining transformers. The building blocks operate on sequences, stream targets and transformers. They manipulate streams, send them to targets and enter a transformer.
emit :: Streamer o -> [Identifier] -> Seq o -> Transformer o -> Iteratee o IO ()Source
Sends all sequence elements to the targets identified
by the list of Identifier
and terminates the outgoing stream.
The transformation continues with the transformer
passed in and an empty sequence.
emitPart :: Streamer o -> [Identifier] -> Seq o -> Transformer o -> Iteratee o IO ()Source
Sends all sequence elements to the targets identified
by the list of Identifier
, but unlike emit
,
does not terminate the outgoing stream.
The transformation continues with the transformer
passed in and an empty sequence.
Note that all outgoing streams, once started, have to be terminated before the transformer ends. Otherwise, a protocol error will occur.
pass :: Streamer o -> [Identifier] -> o -> Bool -> Transformer o -> Iteratee o IO ()Source
Sends one element (o) to the targets and continues with an empty sequence; the Boolean parameter determines whether this is the last message to send.
Note that all outgoing streams, once started, have to be terminated before the transformer ends. Otherwise, a protocol error will occur.
passBy :: Streamer o -> [Identifier] -> o -> Seq o -> Transformer o -> Iteratee o IO ()Source
end :: Streamer o -> [Identifier] -> o -> Iteratee o IO ()Source
Terminates the outgoing stream by sending the new element as last segment to all targets and ends the transformer by ignoring the rest of the incoming stream.
absorb :: Streamer o -> o -> Seq o -> Transformer o -> Iteratee o IO ()Source
Adds a new element to the sequence and calls the transformer without sending anything
merge :: Monoid o => Streamer o -> o -> Seq o -> Transformer o -> Iteratee o IO ()Source
Merges the new element with the last element of the sequence;
if the sequence is currently empty, the new element
will be its only member.
Merged elements appear as one element of the sequence
in the continuation of the transformation.
The type o must be a Monoid
, i.e.,
it must implement mappend and mempty.
The function does not send anything.
Helpers
type Identifier = StringSource
A device identifier is just a plain String