Copyright | (c) Tobias Schoofs |
---|---|
License | LGPL |
Stability | experimental |
Portability | portable |
Safe Haskell | None |
Language | Haskell98 |
This module provides the basic patterns client/server, publish and subscribe and pusher/worker (a.k.a. pipeline) as well as a registry, a means to support patterns where one application uses or serves a set of other applications.
Basic patterns can be distinguished by the data exchange defined by their protocol:
- client/server: The client, first, sends data to the server (the request) and, then, the server sends data to this client (the reply);
- publish and subscribe: The publisher sends data to all subscribers (the topic);
- pusher/worker: The pusher sends data to the worker (the request) without receiving a reply.
We call the processing performed by an application on behalf of another application a job. There are three different job types:
- service: must be requested explicitly and includes a message sent from the application that perfoms the service (server) to the application that requests the service (client).
- task: must be requested explicitly, but does not include a reply.
- topic: is sent to registered subscribers without being requested explicitly.
Applications providing a job are generically called providers, applications requesting a job are called consumers. Providers, hence, are servers, workers and publishers, and consumers are clients, pushers and subscribers. Note that this is somewhat different from the data-centric terminology "producer" and "consumer". It is not very useful to distinugish servers from clients or pushers from workers by referring to the distinction of producing or not producing data. It is in fact the pusher that produces data, not the worker. The pusher, however, is the one that requests something from the worker. The task, in this case, is the "good" that is provided by one side and consumed by the other.
This distinction is relevant when we start to think about reliability. Reliability is a relation between a provider and a consumer: The consumer relies on the producer, not the other way round, e.g. a pusher relies on a woker and a client on a server to get the job done.
The interfaces in this library give some guarantees related to reliability, but there are also some pitfalls:
- A client using timeouts can be sure that the requested service has been performed when the reply arrives before the timeout expires. If no reply arrives before timeout expiration, no such claim can be made (in particular not that the service has not been performed). The service may have been performed, but it may have taken more time than expected or it may have been performed, but the reply message has failed to arrive. If the service is idempotent - i.e. calling the service twice has the same effect as calling it once - the client, when the timeout has expired, can just send the request once again; otherwise, it has to use other means to recover from this situation.
- A pusher will never know if the task has been performed correctly, since there is no response from the worker. This is one of the reasons that the pipeline pattern should usually not be used alone, but in the context of a balancer. (You usually want to push a request to a worker through a balancer -- one of the ideas behind pusher/woker is work balancing.) A balancer may request providers to send heartbeats and, this way, minimise the risk of failure. The worker still may fail between a heartbeat and a request and even the fact that it does send hearbeats does not necessarily mean that it is operating correctly. If it is essential for the client to know that all tasks have been performed correctly, other verification means are required.
- Finally, a subscriber will never know whether a publisher is still working correctly or not, if the publisher does not send data periodically. A reliable design would use periodic publishers, i.e. publishers that send data at a constant rate, even if no new data are available. The data update, in this case, would have the effect of a heartbeat.
The library uses a set of headers that must not be used by applications. All internal header keys start and end with two underscores. By avoiding this naming of header keys, application code easily avoids naming conflicts. The headers used in basic patterns are:
- channel: Reply queue (client/server)
- job: The requested job (client/server, pusher/worker and registry)
- type: Request type (registry)
- job-type: Type of job (registry)
- queue: Queue to register (registry)
- hb: Heartbeat specification (registry)
- sc: Status Code (registry)
Synopsis
- data ClientA i o
- clName :: ClientA i o -> String
- withClient :: Con -> String -> JobName -> ReaderDesc i -> WriterDesc o -> (ClientA i o -> IO r) -> IO r
- request :: ClientA i o -> Int -> Type -> [Header] -> o -> IO (Maybe (Message i))
- checkRequest :: ClientA i o -> Int -> IO (Maybe (Message i))
- data ServerA i o
- srvName :: ServerA i o -> String
- withServer :: Con -> String -> ReaderDesc i -> WriterDesc o -> (ServerA i o -> IO r) -> IO r
- reply :: ServerA i o -> Int -> Type -> [Header] -> (Message i -> IO o) -> IO ()
- register :: Con -> JobName -> JobType -> QName -> QName -> Int -> Int -> IO (StatusCode, Int)
- unRegister :: Con -> JobName -> QName -> QName -> Int -> IO StatusCode
- heartbeat :: MVar HB -> Writer () -> JobName -> QName -> IO ()
- data HB
- mkHB :: Int -> IO HB
- withServerThread :: Con -> String -> JobName -> Type -> [Header] -> (Message i -> IO o) -> ReaderDesc i -> WriterDesc o -> RegistryDesc -> OnError -> IO r -> IO r
- type RegistryDesc = (QName, Int, (Int, Int, Int))
- data PusherA o
- pushName :: PusherA o -> String
- withPusher :: Con -> String -> JobName -> WriterDesc o -> (PusherA o -> IO r) -> IO r
- push :: PusherA o -> Type -> [Header] -> o -> IO ()
- withTaskThread :: Con -> String -> JobName -> (Message i -> IO ()) -> ReaderDesc i -> RegistryDesc -> OnError -> IO r -> IO r
- data Registry
- withRegistry :: Con -> String -> QName -> (Int, Int) -> OnError -> (Registry -> IO r) -> IO r
- mapR :: Registry -> JobName -> (Provider -> IO ()) -> IO Bool
- getProvider :: Registry -> JobName -> Int -> IO [Provider]
- showRegistry :: Registry -> IO ()
- data Provider
- prvQ :: Provider -> QName
- data JobType
- data PubA o
- pubName :: PubA o -> String
- withPub :: Con -> String -> JobName -> QName -> OnError -> WriterDesc o -> (PubA o -> IO r) -> IO r
- publish :: PubA o -> Type -> [Header] -> o -> IO ()
- withPubThread :: Con -> String -> JobName -> QName -> Type -> [Header] -> IO o -> WriterDesc o -> Int -> OnError -> IO r -> IO r
- data SubA i
- subName :: SubA i -> String
- withSub :: Con -> String -> JobName -> QName -> Int -> ReaderDesc i -> (SubA i -> IO r) -> IO r
- checkIssue :: SubA i -> Int -> IO (Maybe (Message i))
- withSubThread :: Con -> String -> JobName -> QName -> Int -> ReaderDesc i -> (Message i -> IO ()) -> OnError -> IO r -> IO r
- withSubMVar :: Con -> String -> JobName -> QName -> Int -> ReaderDesc i -> MVar i -> OnError -> IO r -> IO r
- withPubProxy :: Con -> String -> JobName -> QName -> ReaderDesc i -> RegistryDesc -> OnError -> IO r -> IO r
- data PatternsException
- type OnError = SomeException -> String -> IO ()
- data StatusCode
- = OK
- | BadRequest
- | Forbidden
- | NotFound
- | Timeout
- readStatusCode :: String -> Either String StatusCode
- type JobName = String
- type QName = String
- nobody :: OutBound ()
- ignorebody :: InBound ()
- bytesOut :: OutBound ByteString
- bytesIn :: InBound ByteString
- getJobName :: Message m -> IO String
- getJobType :: Message m -> IO JobType
- getQueue :: Message m -> IO String
- getChannel :: Message m -> IO String
- getHB :: Message m -> IO Int
- getSC :: Message m -> IO (Either String StatusCode)
- getHeader :: String -> String -> Message m -> IO String
Client
The client data type, which implements the client side of the client/server protocol.
withClient :: Con -> String -> JobName -> ReaderDesc i -> WriterDesc o -> (ClientA i o -> IO r) -> IO r Source #
The function creates a client that lives within its scope.
Parameters:
Con
: Connection to a Stomp brokerString
: Name of the Client, which can be used for error reporting.JobName
: Name of theService
the client will requestReaderDesc
i: Description of a reader queue; this is the queue through which the server will send its response.WriterDesc
o: Description of a writer queue; this is the queue through which the server is expecting requests.ClientA
i o -> IO r: An application-defined action whose scope defines the client's lifetime
request :: ClientA i o -> Int -> Type -> [Header] -> o -> IO (Maybe (Message i)) Source #
The client will send the request of type o
and wait for the reply until the timeout exprires.
The reply is of type i and is returned as Message
i.
If the timeout expires before the reply has been received,
the function returns Nothing
.
Since servers do not know the clients they are serving,
request
sends the name of its reader queue (the reply queue)
as message header to the server.
Parameters:
checkRequest :: ClientA i o -> Int -> IO (Maybe (Message i)) Source #
This function serves as a "delayed" receiver for the case that the timeout of a request has expired. When using this function, it is assumed that a request has been made, but no response has been received. It can be used in time-critical applications, where the client may use the time between request and reply productively, instead of passively blocking on the reply queue.
Use this function with care! It can be easily abused to break the client/server pattern, when it is called without a request having been made before. If, in this case, timout is -1, the application will block forever.
The function receives those parameters from request
that are related to receiving the reply, i.e.
Type
, [Header
] and o are not passed to checkRequest.
Server
The server data type, which implements the server side of the client/server protocol.
withServer :: Con -> String -> ReaderDesc i -> WriterDesc o -> (ServerA i o -> IO r) -> IO r Source #
The function creates a server that lives within the scope of the application-defined action passed into it.
Parameters:
Con
: Connection to a Stomp brokerString
: Name of the Server, which can be used for error reporting.ReaderDesc
i: Description of a reader queue; this is the queue through which clients are expected to send requests.WriterDesc
o: Description of a writer queue; this is the queue through which a specific client will expect the reply. Note that the server will overwrite the destination of this queue usingwriteAdHoc
; the destination of this queue, hence, is irrelevant.ServerA
i o -> IO r: An application-defined action whose scope defines the server's lifetime
reply :: ServerA i o -> Int -> Type -> [Header] -> (Message i -> IO o) -> IO () Source #
Waits for a client request, calls the application-defined transformer to generate a reply and sends this reply through the reply queue whose name is indicated by a header in the request. The time a server waits for a request may be restricted by the timeout. Typically, you would call reply with timeout set to -1 (wait eternally). There may be situations, however, where it actually makes sense to restrict the waiting time, i.e. to perform some housekeeping in between.
Typically, you call reply in a loop like
forever $ reply srv (-1) nullType [] f
where f is a function of type
Message i -> IO o.
Parameters:
ServerA
i o: The server; note that i is the request queue and o the reply queue.Int
: The timeout in microseconds.Type
: The MIME type of the reply.- [
Header
]: Additional headers to be sent with the reply. Message
i -> IO o: Transforms the request into a reply - this defines the service provided by this application.
Registry
Before we continue the survey of basic patterns,
we have to introduce registries.
Registries are used by some patterns, advanced patterns,
but also publishers,
to use a set of providers that register beforehand
or, in the case of publishers,
to serve a set of consumers that have registered to
the publisher.
A typical example is balancers (majordomo pattern):
Servers or tasks register to a balancer, which
on receiving a request from a client for a certain job,
forwards the request to one of the registered providers
of this job. Internally, the register balances the request,
such that, with more than one provider currently registered,
two consecutive requests will not be served
by the same provider.
Note that registers provide different modes of using providers:
a load balancer will send a request to only one of its providers,
whereas publishers will send the data they produce to all
currently registered consumers.
The difference is defined by the JobType
of a given Job
.
Of course, only providers of the same type may register
for the same job.
Registers provide a queue through which services can register; patterns using registers would provide another queue through which they receive requests. Registers allow for some level of reliability, i.e. registers can ensure with certain probability that providers are available at the time, when a request is made. Therefore, registries may request heartbeats from providers. Heartbeats are negotiated on registration. Note that registries do not send heartbeats back to providers. Providers have to use other strategies to make sure that the registry to which they have registered is actually available.
register :: Con -> JobName -> JobType -> QName -> QName -> Int -> Int -> IO (StatusCode, Int) Source #
Connect to a registry: The caller registers itself at the registry. The owner of the registry will then use the caller depending on its purpose.
Con
: Connection to a Stomp broker;JobName
: The name of the job provided by the caller;JobType
: The type of the job provided by the caller;QName
: The registry's registration queue;QName
: The queue to register; this is the queue the register will actually use (for forwarding requests or whatever it does in this specific case). The registry, internally, usesJobName
together with this queue as a key to identify the provider.- Int: Timeout in microseconds;
- Int: Preferred heartbeat in milliseconds (0 for no heartbeats).
The function returns a tuple of StatusCode
and the heartbeat proposed by the registry
(which may differ from the preferred heartbeat of the caller).
Whenever the StatusCode
is not OK
,
the heartbeat is 0.
If the JobName
is null, the StatusCode
will be BadRequest
.
If the timeout expires, register throws TimeoutX
.
unRegister :: Con -> JobName -> QName -> QName -> Int -> IO StatusCode Source #
Disconnect from a registry: The caller disconnects from a registry to which it has registered before. For the case that the registry is not receiving heartbeats from the caller, it is essential to unregister, when the service is no longer provided. Otherwise, the registry has no way to know that it should not send requests to this provider anymore.
Con
: Connection to a Stomp broker;JobName
: TheJobName
to unregister;QName
: The registry's registration queue ;QName
: The queue to unregister;- Int: The timeout in microseconds.
The function returns a StatusCode
.
If JobName
is null, the StatusCode
will be BadRequest
.
If the timeout expires, the function will throw TimeoutX
.
heartbeat :: MVar HB -> Writer () -> JobName -> QName -> IO () Source #
Send heartbeats:
MVar
HB
: An MVar of typeHB
, this MVar will be used to keep track of when the heartbeat has actually to be sent.Writer
(): The writer through which to send the heartbeat; The queue name of the writer is the registration queue of the registry; note that its type is (): heartbeats are empty messages.JobName
: TheJobName
for which to send heartbeats;QName
: The queue for which to send heartbeats.
The following example shows how to use the registration functions and heartbeats together with a server:
-- The definition of the variables -- reg, jn, rn, tmo -- is out of the scope of this listing; -- their data type and meaning -- can be inferred from the context. withConnection "127.0.0.1" 61613 [] [] $ \c -> withServer c "Test" (q, [], [], iconv) ("unknown", [], [], oconv) $ \s -> do (sc,me) <- if null reg -- if parameter reg is null then return (OK, 0) else register c jn Service reg rn tmo 500 case sc of -- ok ------------------------------ OK -> if me < 0 || me > 5000 -- accept heartbeat from -- 0 (no heartbeats) to -- 5 seconds then do void $ unRegister c jn wn rn tmo throwIO $ UnacceptableHbX me else do hb <- mkHB me m <- newMVar hb let p = if me <= 0 then (-1) else 1000 * me withWriter c "HB" reg [] [] nobody $ \w -> finally (forever $ reply s p t hs transform >> heartbeat m w jn rn) (do -- "don't forget to unregister!" (Frank Zappa) sc <- unRegister c jn wn rn tmo unless (sc == OK) $ throwIO $ NotOKX sc "on unregister") -- not ok --------------------------- e -> throwIO $ NotOKX e "on register"
There is, however, a function that does all of this internally:
withServerThread
.
withServerThread
withServerThread :: Con -> String -> JobName -> Type -> [Header] -> (Message i -> IO o) -> ReaderDesc i -> WriterDesc o -> RegistryDesc -> OnError -> IO r -> IO r Source #
Create a server that works in a background thread:
The background thread (and with it the server)
is running until the action passed in to the function (IO r)
terminates; when it terminates, the background thread is
terminated as well.
withServerThread may connect to a registry
(to serve as a provider of a balancer for instance),
which is automatically handled internally
when a RegistryDesc is passed in with a QName
that is not null.
Con
: Connection to a Stomp broker;String
: The name of the server, used for error reporting;JobName
: The job provided by this serverType
: The MIME Type (passed toreply
)- [
Header
]: Additional headers (passed toreply
) Message
i -> IO o: The core of the reply function: transforming a request of type i into a reply of type oReaderDesc
i: The reader through which requests are expected;WriterDesc
o: The writer through which replies are sent;RegistryDesc
: Describes whether and how to connect to a registry: if the queue name of the registry description is null, the function will not connect to a registry; otherwise it will connect to the registry proposing the best value of theRegistryDesc
as its preferred heartbeat rate; should the heartbeat rate returned by the registry be outside the scope of min and max, withServerThread will terminate withUnacceptableHbX
.OnError
: Error handler- IO r: The function starts a new thread on which the the server is working; the thread from which the function was called continues in this action. Its return value is also the result of withServerThread. When the action terminates, the new thread is terminated internally.
type RegistryDesc = (QName, Int, (Int, Int, Int)) Source #
A helper that shall ease the use of the registers. A registry to which a call wants to connect is described as
- The
QName
through which the registry receives requests; - The
Timeout
in microseconds, i.e. the time the caller will wait before the request fails; - A triple of heartbeat specifications: the best value, i.e. the rate at which the caller prefers to send heartbeats, the minimum rate at which the caller can accept to send heartbeats, the maximum rate at which the caller can accept to send heartbeats. Note that all these values are in milliseconds!
Pusher
The Pusher data type, which implements the consumer side of the pipeline protocol. Note that, when we say "consumer" here, the pusher is actually a data producer, but consumes the effect of having a task done. The pusher can be seen as a client that does not expect a reply.
withPusher :: Con -> String -> JobName -> WriterDesc o -> (PusherA o -> IO r) -> IO r Source #
Create a Pusher
with the lifetime of the action passed in:
Con
: Connection to a Stomp broker;- String: Name of the pusher, which may be used for error reporting;
JobName
: Name of the job requested by this pusher;WriterDesc
o:Writer
queue through which the job request is pushed;- (
PusherA
o -> IO r): Action that defines the lifetime of the pusher; the result r is also the result ofwithPusher
.
Worker
withTaskThread :: Con -> String -> JobName -> (Message i -> IO ()) -> ReaderDesc i -> RegistryDesc -> OnError -> IO r -> IO r Source #
On the other side of the pipeline, there sits a worker waiting for requests. Note that no Worker data type is defined. Instead, there is only a withTaskThread function that, internally, creates a worker acting in a background thread. The rationale is that it does not make too much sense to have a pipeline with only one worker. It is in fact part of the idea of the pipeline pattern that several workers are used through a balancer. withTaskThread implements the interaction with the registry internally and frees the programmer from concerns related to registration. If you really need a single worker, you can call the function with an empty RegistryDesc, i.e. with an empty queue name.
Con
: Connection to a Stomp broker;- String: Name of the worker used for error reporting;
JobName
: Name of the job, the worker provides;- (
Message
i -> IO ()): The job provided by the worker. Note that the function does not return a value: Since workers do not produce a reply, no result is necessary; ReaderDesc
i: Queue through which the worker receives requests;RegistryDesc
: The registry to which the worker connects;- OnError: Error handler;
- IO r: Action that defines the worker's lifetime.
More about Registries
Until now, we have only looked at how to connect to a registry, not at how to use it and what a registry actually is in terms of data types. Well, answering the second question is simple: a registry, from the perspective of the user application, is an opaque data type with a set of functions:
withRegistry :: Con -> String -> QName -> (Int, Int) -> OnError -> (Registry -> IO r) -> IO r Source #
A registry is used through a function that, internally, creates a registry and defines its lifetime in terms of the scope of an action passed in to the function:
Con
: Connection to a Stomp broker;- String: Name of the registry used for error handling;
QName
: Name of the registration queue. It is this queue to whichregister
sends a registration request;- (Int, Int): Minimal and maximal accepted heartbeat interval;
OnError
: Error handler;- (
Registry
-> IO r): The action that defines the registry's lifetime; the result of this action, r, is also the result of withRegistry.
mapR :: Registry -> JobName -> (Provider -> IO ()) -> IO Bool Source #
Map action to Provider
s of job JobName
;
mapping means different things for:
- Serice, Task: action is applied to the first active provider of a list of providers and this provider is then sent to the back of the list, hence, implementing a balancer.
- Topic: action is applied to all providers, hence, implementing a publisher.
Parameters:
Registry
: The registry to use;JobName
: The job to which to apply the action;- (
Provider
-> IO ()): The action to apply.
The function returns False iff the requested job is not available and True otherwise. (Note that a job without providers is removed; when the function returns True, the job, thus, was applied at least once.
getProvider :: Registry -> JobName -> Int -> IO [Provider] Source #
Retrieves n Provider
s of a certain job;
getProvider works, for all JobType
s
according to the work balancer logic, i.e.:
it returns the first n providers of the list for this job
and moves them to the end of the list.
getProvider
is used, for instance, in the Desk pattern.
showRegistry :: Registry -> IO () Source #
This function shows all jobs with all their providers in a registry; the function is intended for debugging only.
A provider is an opaque data type; most of its attributes are used only internally by the registry. Interesting for user applications, however, is the queue that identifies the provider.
JobType: Service, Task or Topic
A typical example of how to use a registry in practice is the balancer pattern, which is shown (without error handling) below:
-- The definition of the variables -- c, n qn, mn, mx, onErr, rq -- is out of the scope of this listing; -- their data type and meaning -- can be inferred from the context. withRegistry c n qn (mn, mx) onErr $ \reg -> withPair c n (rq, [], [], bytesIn) ("unknown", [], [], bytesOut) $ \(r,w) -> forever $ do m <- readQ r -- receive a request jn <- getJobName m -- get the job name from the request t <- mapR reg jn (send2Prov w m) -- apply job unless t $ throwIO $ NoProviderX jn -- throw exception -- when job is not provided where send2Prov w m p = writeAdHoc w (prvQ p) nullType (msgHdrs m) $ msgContent m
User applications, usually, do not need to use registries directly. Registries are used in patterns, namely in Desks, Balancers and in Pubs.
Publisher
withPub :: Con -> String -> JobName -> QName -> OnError -> WriterDesc o -> (PubA o -> IO r) -> IO r Source #
Create a publisher with the lifetime of the scope
of the user action passed in.
The publisher, internally, creates a registry
to which subscribers will connect to obtain the topic data.
The registry will not expect heartbeats from subscribers,
since the dependability relation is the other way round:
the publisher does not depend on subscribers,
but subscribers depend on a publisher.
The publisher, usually, does not send heartbeats either.
For exceptions to this rule, see withPubProxy
.
Con
: Connect to a Stomp broker;- String: Name of the publisher used for error reporting;
JobName
: The name of the topic;QName
: Name of the registration queue (seewithRegistry
);OnError
: Error Handler passed to the registry;WriterDesc
: Queue through which data are published; note that the queue name is irrelevant. The publisher will send data to the queues of registered subscribers (seemapR
);PubA
-> IO r: Action that defines the lifetime of the publisher; the result (r) is also the result of withPub.
withPubThread :: Con -> String -> JobName -> QName -> Type -> [Header] -> IO o -> WriterDesc o -> Int -> OnError -> IO r -> IO r Source #
Create a publisher that works in a background thread publishing periodically at a monotonic rate, i.e. it creates data and publishes them, computes the difference of the publication rate minus the time needed to create and publish the data and will then suspend the thread for this period. For a publication rate of p microseconds, the thread will be delayed for p - x microseconds, if x corresponds to the time that was spent on creating and publishing the data.
The precision depends of course on your system and its current workload. For most cases, this will be equal to just suspending the thread for the publication rate.
Parameters:
Con
: Connection to a Stomp broker;- String: Name of the publisher used for error reporting;
JobName
: Name of the topic;QName
: Registration queue;- Type: MIME Type of the published message;
- [
Header
]: Additional headers to be sent with the message; - IO o: Action to create the message content;
WriterDesc
o: Queue through which the message will be published (remember, however, that the queue name is irrelevant);- Int: Publication rate in microseconds;
OnError
: Error handler for the registry and the publisher;- IO r: Action that defines the lifetime of the publisher; The result r is also the result of withPubThread.
Subscriber
withSub :: Con -> String -> JobName -> QName -> Int -> ReaderDesc i -> (SubA i -> IO r) -> IO r Source #
Create a subscriber with the lifetime of the user action passed in. The subscriber will internally connect to a publisher's registry and receive data as long as it stays connected.
Con
: Connection to a Stomp broker;- String: Subscriber name useful for error reporting;
JobName
: Subscribed topic;QName
: Queue of a registry to connect to (thePub
s registration queue!)- Int: Registration timeout in microseconds;
ReaderDesc
: This is the queue through which the subscriber will receive data.SubA
i -> IO r: Action that defines the lifetime of the subscriber. Its result r is also the result of withSub.
withSubThread :: Con -> String -> JobName -> QName -> Int -> ReaderDesc i -> (Message i -> IO ()) -> OnError -> IO r -> IO r Source #
Create a subscriber that works in a background thread; Whenever data are available, an application callback passed in to the function is called with the message that has arrived.
Con
: Connection to a Stomp broker;- String: Subscriber name used for error reporting;
JobName
: Subscribed topic;QName
: The publisher's registration queue;- Int: Registration timeout in microseconds;
ReaderDesc
i: Queue through which the subscriber shall receive data;Message
i -> IO (): Application callback;OnError
: Error handler;- IO r: Action that defines the lifetime of the subscriber; the result r is also the result of withSubThread.
withSubMVar :: Con -> String -> JobName -> QName -> Int -> ReaderDesc i -> MVar i -> OnError -> IO r -> IO r Source #
Create a subscriber that works in a background thread
and updates an MVar, whenever new data are available;
the function is in fact a special case of withSubThread
,
where the application callback updates an MVar.
Note that the MVar must not be empty when the function
is called, otherwise, it will block on modifying the MVar.
Con
: Connection to a Stomp broker;- String: Subscriber name used for error reporting;
JobName
: Subscribed topic;QName
: The publisher's registration queue;- Int: Registration timeout in microseconds;
ReaderDesc
i: Queue through which the subscriber shall receive data;MVar
i: MVar to update;OnError
: Error handler;- IO r: Action that defines the lifetime of the subscriber; the result r is also the result of withSubMVar.
Heartbeats for Pub
withPubProxy :: Con -> String -> JobName -> QName -> ReaderDesc i -> RegistryDesc -> OnError -> IO r -> IO r Source #
Unlike servers and workers, publishers have no interface to connect internally to a registry. The rationale for this is that publishers do not need load balancers or similar means that would require registration. As a consequence, there is no means to send heartbeats internally. Sometimes, however, the need to connect to a registry may arise. The Desk pattern is an example where it makes sense to register a publisher. But then, there is no means to internally send heartbeats proving that the publisher is still alive. For this case, a simple solution for periodic publishers is available: a heartbeat proxy that is implemented as a subscriber receiving data from the publisher and sending a heartbeat on every dataset that arrives.
This function provides a proxy that internally connects to a registry on behalf of a publisher and sends heartbeats.
Con
: Connection to a Stomp broker;- String: Name of the proxy used for error reporting;
JobName
: Name of the topic, the publisher provides;QName
: Registration queue of the publisher - this is the queue to which the internal subscriber connects;ReaderDesc
i: The queue through which the internal subscriber receives data;RegistryDesc
: The other registry - it is this registry to which the proxy will send heartbeats;OnError
: Error Handler;- IO r: Action that definex the proxy's lifetime; its result r is also the result of withPubProxy.
Exceptions and Error Handling
data PatternsException Source #
Patterns Exception
TimeoutX String | Timout expired |
BadStatusCodeX String | Invalid status code |
NotOKX StatusCode String | Status code other than OK |
HeaderX String String | Error on Header identified by the first String |
MissingHbX String | Thrown on missing heartbeat (after tolerance of 10 missing heartbeats) |
UnacceptableHbX Int | Heartbeat proposed by registry out of acceptable range |
NoProviderX String | No provider for the requested job available |
AppX String | Application-defined exception |
Instances
Eq PatternsException Source # | |
Defined in Types (==) :: PatternsException -> PatternsException -> Bool # (/=) :: PatternsException -> PatternsException -> Bool # | |
Read PatternsException Source # | |
Defined in Types | |
Show PatternsException Source # | |
Defined in Types showsPrec :: Int -> PatternsException -> ShowS # show :: PatternsException -> String # showList :: [PatternsException] -> ShowS # | |
Exception PatternsException Source # | |
Defined in Types |
type OnError = SomeException -> String -> IO () Source #
Error Handler:
SomeException
: Exception that led the invocation;- String: Name of the entity (client name, server name, etc.)
data StatusCode Source #
Status code to communicate the state of a request between two applications. The wire format is inspired by HTTP status codes:
- OK (200): Everything is fine
- BadRequest (400): Syntax error in the request message
- Forbidden (403): Not used
- NotFound (404): For the requested job no provider is available
- Timeout (408): Timeout expired
Instances
Eq StatusCode Source # | |
Defined in Types (==) :: StatusCode -> StatusCode -> Bool # (/=) :: StatusCode -> StatusCode -> Bool # | |
Read StatusCode Source # | |
Defined in Types readsPrec :: Int -> ReadS StatusCode # readList :: ReadS [StatusCode] # readPrec :: ReadPrec StatusCode # readListPrec :: ReadPrec [StatusCode] # | |
Show StatusCode Source # | |
Defined in Types showsPrec :: Int -> StatusCode -> ShowS # show :: StatusCode -> String # showList :: [StatusCode] -> ShowS # |
readStatusCode :: String -> Either String StatusCode Source #
Safe StatusCode parser
(StatusCode
is instance of Read
,
but read
would cause an error on an invalid StatusCode)
Useful Types and Helpers
ignorebody :: InBound () Source #
InBound converter for messages of type ()
bytesOut :: OutBound ByteString Source #
OutBound converter for messages of type ByteString
bytesIn :: InBound ByteString Source #
InBound converter for messages of type ByteString
getJobName :: Message m -> IO String Source #
Get Job name from headers (and throw an exception if the header does not exist)
getJobType :: Message m -> IO JobType Source #
Get Job type from headers (and throw an exception if the header does not exist or contains an invalid value)
getQueue :: Message m -> IO String Source #
Get Queue name from headers (and throw an exception if the header does not exist)
getChannel :: Message m -> IO String Source #
Get Reply queue (channel) from headers (and throw an exception if the header does not exist)
getHB :: Message m -> IO Int Source #
Get Heartbeat specification from headers (and throw an exception if the header does not exist or if its value is not numeric)
getSC :: Message m -> IO (Either String StatusCode) Source #
Get Status code from headers (and throw an exception if the header does not exist)
getHeader :: String -> String -> Message m -> IO String Source #
Get Generic function to retrieve a header value (and throw an exception if the header does not exist):
- String: Key of the wanted header
- String: Error message in case there is no such header
Message
m: The message whose headers we want to search