stomp-patterns-0.5.0: Stompl MOM Stomp Patterns
Copyright(c) Tobias Schoofs
LicenseLGPL
Stabilityexperimental
Portabilityportable
Safe HaskellNone
LanguageHaskell98

Network.Mom.Stompl.Patterns.Basic

Description

This module provides the basic patterns client/server, publish and subscribe and pusher/worker (a.k.a. pipeline) as well as a registry, a means to support patterns where one application uses or serves a set of other applications.

Basic patterns can be distinguished by the data exchange defined by their protocol:

  • client/server: The client, first, sends data to the server (the request) and, then, the server sends data to this client (the reply);
  • publish and subscribe: The publisher sends data to all subscribers (the topic);
  • pusher/worker: The pusher sends data to the worker (the request) without receiving a reply.

We call the processing performed by an application on behalf of another application a job. There are three different job types:

  • service: must be requested explicitly and includes a message sent from the application that perfoms the service (server) to the application that requests the service (client).
  • task: must be requested explicitly, but does not include a reply.
  • topic: is sent to registered subscribers without being requested explicitly.

Applications providing a job are generically called providers, applications requesting a job are called consumers. Providers, hence, are servers, workers and publishers, and consumers are clients, pushers and subscribers. Note that this is somewhat different from the data-centric terminology "producer" and "consumer". It is not very useful to distinugish servers from clients or pushers from workers by referring to the distinction of producing or not producing data. It is in fact the pusher that produces data, not the worker. The pusher, however, is the one that requests something from the worker. The task, in this case, is the "good" that is provided by one side and consumed by the other.

This distinction is relevant when we start to think about reliability. Reliability is a relation between a provider and a consumer: The consumer relies on the producer, not the other way round, e.g. a pusher relies on a woker and a client on a server to get the job done.

The interfaces in this library give some guarantees related to reliability, but there are also some pitfalls:

  • A client using timeouts can be sure that the requested service has been performed when the reply arrives before the timeout expires. If no reply arrives before timeout expiration, no such claim can be made (in particular not that the service has not been performed). The service may have been performed, but it may have taken more time than expected or it may have been performed, but the reply message has failed to arrive. If the service is idempotent - i.e. calling the service twice has the same effect as calling it once - the client, when the timeout has expired, can just send the request once again; otherwise, it has to use other means to recover from this situation.
  • A pusher will never know if the task has been performed correctly, since there is no response from the worker. This is one of the reasons that the pipeline pattern should usually not be used alone, but in the context of a balancer. (You usually want to push a request to a worker through a balancer -- one of the ideas behind pusher/woker is work balancing.) A balancer may request providers to send heartbeats and, this way, minimise the risk of failure. The worker still may fail between a heartbeat and a request and even the fact that it does send hearbeats does not necessarily mean that it is operating correctly. If it is essential for the client to know that all tasks have been performed correctly, other verification means are required.
  • Finally, a subscriber will never know whether a publisher is still working correctly or not, if the publisher does not send data periodically. A reliable design would use periodic publishers, i.e. publishers that send data at a constant rate, even if no new data are available. The data update, in this case, would have the effect of a heartbeat.

The library uses a set of headers that must not be used by applications. All internal header keys start and end with two underscores. By avoiding this naming of header keys, application code easily avoids naming conflicts. The headers used in basic patterns are:

  • channel: Reply queue (client/server)
  • job: The requested job (client/server, pusher/worker and registry)
  • type: Request type (registry)
  • job-type: Type of job (registry)
  • queue: Queue to register (registry)
  • hb: Heartbeat specification (registry)
  • sc: Status Code (registry)
Synopsis

Client

data ClientA i o Source #

The client data type, which implements the client side of the client/server protocol.

clName :: ClientA i o -> String Source #

Access to the client name

withClient :: Con -> String -> JobName -> ReaderDesc i -> WriterDesc o -> (ClientA i o -> IO r) -> IO r Source #

The function creates a client that lives within its scope.

Parameters:

  • Con: Connection to a Stomp broker
  • String: Name of the Client, which can be used for error reporting.
  • JobName: Name of the Service the client will request
  • ReaderDesc i: Description of a reader queue; this is the queue through which the server will send its response.
  • WriterDesc o: Description of a writer queue; this is the queue through which the server is expecting requests.
  • ClientA i o -> IO r: An application-defined action whose scope defines the client's lifetime

request :: ClientA i o -> Int -> Type -> [Header] -> o -> IO (Maybe (Message i)) Source #

The client will send the request of type o and wait for the reply until the timeout exprires. The reply is of type i and is returned as Message i. If the timeout expires before the reply has been received, the function returns Nothing.

Since servers do not know the clients they are serving, request sends the name of its reader queue (the reply queue) as message header to the server.

Parameters:

  • ClientA i o: The client; note that i is the type of the reply, o is the type of the request.
  • Int: The timeout in microseconds.
  • Type: The MIME type of the request.
  • [Header]: List of additional headers to be sent with the request.
  • o: The request

checkRequest :: ClientA i o -> Int -> IO (Maybe (Message i)) Source #

This function serves as a "delayed" receiver for the case that the timeout of a request has expired. When using this function, it is assumed that a request has been made, but no response has been received. It can be used in time-critical applications, where the client may use the time between request and reply productively, instead of passively blocking on the reply queue.

Use this function with care! It can be easily abused to break the client/server pattern, when it is called without a request having been made before. If, in this case, timout is -1, the application will block forever.

The function receives those parameters from request that are related to receiving the reply, i.e. Type, [Header] and o are not passed to checkRequest.

Server

data ServerA i o Source #

The server data type, which implements the server side of the client/server protocol.

srvName :: ServerA i o -> String Source #

Access to the server name

withServer :: Con -> String -> ReaderDesc i -> WriterDesc o -> (ServerA i o -> IO r) -> IO r Source #

The function creates a server that lives within the scope of the application-defined action passed into it.

Parameters:

  • Con: Connection to a Stomp broker
  • String: Name of the Server, which can be used for error reporting.
  • ReaderDesc i: Description of a reader queue; this is the queue through which clients are expected to send requests.
  • WriterDesc o: Description of a writer queue; this is the queue through which a specific client will expect the reply. Note that the server will overwrite the destination of this queue using writeAdHoc; the destination of this queue, hence, is irrelevant.
  • ServerA i o -> IO r: An application-defined action whose scope defines the server's lifetime

reply :: ServerA i o -> Int -> Type -> [Header] -> (Message i -> IO o) -> IO () Source #

Waits for a client request, calls the application-defined transformer to generate a reply and sends this reply through the reply queue whose name is indicated by a header in the request. The time a server waits for a request may be restricted by the timeout. Typically, you would call reply with timeout set to -1 (wait eternally). There may be situations, however, where it actually makes sense to restrict the waiting time, i.e. to perform some housekeeping in between.

Typically, you call reply in a loop like

forever $ reply srv (-1) nullType [] f

where f is a function of type

Message i -> IO o.

Parameters:

  • ServerA i o: The server; note that i is the request queue and o the reply queue.
  • Int: The timeout in microseconds.
  • Type: The MIME type of the reply.
  • [Header]: Additional headers to be sent with the reply.
  • Message i -> IO o: Transforms the request into a reply - this defines the service provided by this application.

Registry

Before we continue the survey of basic patterns, we have to introduce registries. Registries are used by some patterns, advanced patterns, but also publishers, to use a set of providers that register beforehand or, in the case of publishers, to serve a set of consumers that have registered to the publisher. A typical example is balancers (majordomo pattern): Servers or tasks register to a balancer, which on receiving a request from a client for a certain job, forwards the request to one of the registered providers of this job. Internally, the register balances the request, such that, with more than one provider currently registered, two consecutive requests will not be served by the same provider. Note that registers provide different modes of using providers: a load balancer will send a request to only one of its providers, whereas publishers will send the data they produce to all currently registered consumers. The difference is defined by the JobType of a given Job. Of course, only providers of the same type may register for the same job.

Registers provide a queue through which services can register; patterns using registers would provide another queue through which they receive requests. Registers allow for some level of reliability, i.e. registers can ensure with certain probability that providers are available at the time, when a request is made. Therefore, registries may request heartbeats from providers. Heartbeats are negotiated on registration. Note that registries do not send heartbeats back to providers. Providers have to use other strategies to make sure that the registry to which they have registered is actually available.

register :: Con -> JobName -> JobType -> QName -> QName -> Int -> Int -> IO (StatusCode, Int) Source #

Connect to a registry: The caller registers itself at the registry. The owner of the registry will then use the caller depending on its purpose.

  • Con: Connection to a Stomp broker;
  • JobName: The name of the job provided by the caller;
  • JobType: The type of the job provided by the caller;
  • QName: The registry's registration queue;
  • QName: The queue to register; this is the queue the register will actually use (for forwarding requests or whatever it does in this specific case). The registry, internally, uses JobName together with this queue as a key to identify the provider.
  • Int: Timeout in microseconds;
  • Int: Preferred heartbeat in milliseconds (0 for no heartbeats).

The function returns a tuple of StatusCode and the heartbeat proposed by the registry (which may differ from the preferred heartbeat of the caller). Whenever the StatusCode is not OK, the heartbeat is 0. If the JobName is null, the StatusCode will be BadRequest. If the timeout expires, register throws TimeoutX.

unRegister :: Con -> JobName -> QName -> QName -> Int -> IO StatusCode Source #

Disconnect from a registry: The caller disconnects from a registry to which it has registered before. For the case that the registry is not receiving heartbeats from the caller, it is essential to unregister, when the service is no longer provided. Otherwise, the registry has no way to know that it should not send requests to this provider anymore.

  • Con: Connection to a Stomp broker;
  • JobName: The JobName to unregister;
  • QName: The registry's registration queue ;
  • QName: The queue to unregister;
  • Int: The timeout in microseconds.

The function returns a StatusCode. If JobName is null, the StatusCode will be BadRequest. If the timeout expires, the function will throw TimeoutX.

heartbeat :: MVar HB -> Writer () -> JobName -> QName -> IO () Source #

Send heartbeats:

  • MVar HB: An MVar of type HB, this MVar will be used to keep track of when the heartbeat has actually to be sent.
  • Writer (): The writer through which to send the heartbeat; The queue name of the writer is the registration queue of the registry; note that its type is (): heartbeats are empty messages.
  • JobName: The JobName for which to send heartbeats;
  • QName: The queue for which to send heartbeats.

data HB Source #

Heartbeat controller type

mkHB :: Int -> IO HB Source #

Create a heartbeat controller; receives the heartbeat in milliseconds.

The following example shows how to use the registration functions and heartbeats together with a server:

-- The definition of the variables
-- reg, jn, rn, tmo
-- is out of the scope of this listing;
-- their data type and meaning 
-- can be inferred from the context.

 withConnection "127.0.0.1" 61613 [] [] $ \c -> 
   withServer c "Test" (q,            [], [], iconv)
                       ("unknown",    [], [], oconv) $ \s -> do
     (sc,me) <- if null reg -- if parameter reg is null
                  then return (OK, 0)
                  else register c jn Service reg rn tmo 500
     case sc of
       -- ok ------------------------------
       OK -> 
         if me < 0 || me > 5000 -- accept heartbeat  from 
                                -- 0 (no heartbeats) to
                                -- 5 seconds
           then do void $ unRegister c jn wn rn tmo
                   throwIO $ UnacceptableHbX me
           else do hb <- mkHB me
                   m  <- newMVar hb 
                   let p = if me <= 0 then (-1) else 1000 * me 
                   withWriter c "HB" reg [] [] nobody $ \w -> 
                     finally (forever $
                       reply s p t hs transform >> heartbeat m w jn rn) (do
                       -- "don't forget to unregister!" (Frank Zappa)
                       sc <- unRegister c jn wn rn tmo
                             unless (sc == OK) $ 
                               throwIO $ NotOKX sc "on unregister")
       -- not ok ---------------------------
       e -> throwIO $ NotOKX e "on register"

There is, however, a function that does all of this internally: withServerThread.

withServerThread

withServerThread :: Con -> String -> JobName -> Type -> [Header] -> (Message i -> IO o) -> ReaderDesc i -> WriterDesc o -> RegistryDesc -> OnError -> IO r -> IO r Source #

Create a server that works in a background thread: The background thread (and with it the server) is running until the action passed in to the function (IO r) terminates; when it terminates, the background thread is terminated as well. withServerThread may connect to a registry (to serve as a provider of a balancer for instance), which is automatically handled internally when a RegistryDesc is passed in with a QName that is not null.

  • Con: Connection to a Stomp broker;
  • String: The name of the server, used for error reporting;
  • JobName: The job provided by this server
  • Type: The MIME Type (passed to reply)
  • [Header]: Additional headers (passed to reply)
  • Message i -> IO o: The core of the reply function: transforming a request of type i into a reply of type o
  • ReaderDesc i: The reader through which requests are expected;
  • WriterDesc o: The writer through which replies are sent;
  • RegistryDesc: Describes whether and how to connect to a registry: if the queue name of the registry description is null, the function will not connect to a registry; otherwise it will connect to the registry proposing the best value of the RegistryDesc as its preferred heartbeat rate; should the heartbeat rate returned by the registry be outside the scope of min and max, withServerThread will terminate with UnacceptableHbX.
  • OnError: Error handler
  • IO r: The function starts a new thread on which the the server is working; the thread from which the function was called continues in this action. Its return value is also the result of withServerThread. When the action terminates, the new thread is terminated internally.

type RegistryDesc = (QName, Int, (Int, Int, Int)) Source #

A helper that shall ease the use of the registers. A registry to which a call wants to connect is described as

  • The QName through which the registry receives requests;
  • The Timeout in microseconds, i.e. the time the caller will wait before the request fails;
  • A triple of heartbeat specifications: the best value, i.e. the rate at which the caller prefers to send heartbeats, the minimum rate at which the caller can accept to send heartbeats, the maximum rate at which the caller can accept to send heartbeats. Note that all these values are in milliseconds!

Pusher

data PusherA o Source #

The Pusher data type, which implements the consumer side of the pipeline protocol. Note that, when we say "consumer" here, the pusher is actually a data producer, but consumes the effect of having a task done. The pusher can be seen as a client that does not expect a reply.

pushName :: PusherA o -> String Source #

Access to the pusher's name

withPusher :: Con -> String -> JobName -> WriterDesc o -> (PusherA o -> IO r) -> IO r Source #

Create a Pusher with the lifetime of the action passed in:

  • Con: Connection to a Stomp broker;
  • String: Name of the pusher, which may be used for error reporting;
  • JobName: Name of the job requested by this pusher;
  • WriterDesc o: Writer queue through which the job request is pushed;
  • (PusherA o -> IO r): Action that defines the lifetime of the pusher; the result r is also the result of withPusher.

push :: PusherA o -> Type -> [Header] -> o -> IO () Source #

Push a Job:

  • PusherA o: The pusher to be used;
  • Type: The MIME Type of the message to be sent;
  • [Header]: The headers to be sent with the message;
  • o: The message contents.

Worker

withTaskThread :: Con -> String -> JobName -> (Message i -> IO ()) -> ReaderDesc i -> RegistryDesc -> OnError -> IO r -> IO r Source #

On the other side of the pipeline, there sits a worker waiting for requests. Note that no Worker data type is defined. Instead, there is only a withTaskThread function that, internally, creates a worker acting in a background thread. The rationale is that it does not make too much sense to have a pipeline with only one worker. It is in fact part of the idea of the pipeline pattern that several workers are used through a balancer. withTaskThread implements the interaction with the registry internally and frees the programmer from concerns related to registration. If you really need a single worker, you can call the function with an empty RegistryDesc, i.e. with an empty queue name.

  • Con: Connection to a Stomp broker;
  • String: Name of the worker used for error reporting;
  • JobName: Name of the job, the worker provides;
  • (Message i -> IO ()): The job provided by the worker. Note that the function does not return a value: Since workers do not produce a reply, no result is necessary;
  • ReaderDesc i: Queue through which the worker receives requests;
  • RegistryDesc: The registry to which the worker connects;
  • OnError: Error handler;
  • IO r: Action that defines the worker's lifetime.

More about Registries

Until now, we have only looked at how to connect to a registry, not at how to use it and what a registry actually is in terms of data types. Well, answering the second question is simple: a registry, from the perspective of the user application, is an opaque data type with a set of functions:

data Registry Source #

Registry: An opaque data type

withRegistry :: Con -> String -> QName -> (Int, Int) -> OnError -> (Registry -> IO r) -> IO r Source #

A registry is used through a function that, internally, creates a registry and defines its lifetime in terms of the scope of an action passed in to the function:

  • Con: Connection to a Stomp broker;
  • String: Name of the registry used for error handling;
  • QName: Name of the registration queue. It is this queue to which register sends a registration request;
  • (Int, Int): Minimal and maximal accepted heartbeat interval;
  • OnError: Error handler;
  • (Registry -> IO r): The action that defines the registry's lifetime; the result of this action, r, is also the result of withRegistry.

mapR :: Registry -> JobName -> (Provider -> IO ()) -> IO Bool Source #

Map action to Providers of job JobName; mapping means different things for:

  • Serice, Task: action is applied to the first active provider of a list of providers and this provider is then sent to the back of the list, hence, implementing a balancer.
  • Topic: action is applied to all providers, hence, implementing a publisher.

Parameters:

  • Registry: The registry to use;
  • JobName: The job to which to apply the action;
  • (Provider -> IO ()): The action to apply.

The function returns False iff the requested job is not available and True otherwise. (Note that a job without providers is removed; when the function returns True, the job, thus, was applied at least once.

getProvider :: Registry -> JobName -> Int -> IO [Provider] Source #

Retrieves n Providers of a certain job; getProvider works, for all JobTypes according to the work balancer logic, i.e.: it returns the first n providers of the list for this job and moves them to the end of the list. getProvider is used, for instance, in the Desk pattern.

  • Registry: The registry in use;
  • JobName: The job for which the caller needs a provider;
  • Int: The number n of providers to retrieve; if less than n providers are available for this job, all available providers will be returned, but no error event is created.

showRegistry :: Registry -> IO () Source #

This function shows all jobs with all their providers in a registry; the function is intended for debugging only.

data Provider Source #

A provider is an opaque data type; most of its attributes are used only internally by the registry. Interesting for user applications, however, is the queue that identifies the provider.

Instances

Instances details
Eq Provider Source # 
Instance details

Defined in Registry

Show Provider Source # 
Instance details

Defined in Registry

prvQ :: Provider -> QName Source #

Queue through which the job is provided

data JobType Source #

JobType: Service, Task or Topic

Constructors

Service 
Task 
Topic 

Instances

Instances details
Eq JobType Source # 
Instance details

Defined in Registry

Methods

(==) :: JobType -> JobType -> Bool #

(/=) :: JobType -> JobType -> Bool #

Show JobType Source # 
Instance details

Defined in Registry

A typical example of how to use a registry in practice is the balancer pattern, which is shown (without error handling) below:

-- The definition of the variables
-- c, n qn, mn, mx, onErr, rq
-- is out of the scope of this listing;
-- their data type and meaning 
-- can be inferred from the context.

withRegistry c n qn (mn, mx) onErr $ \reg ->
  withPair c n (rq,        [], [], bytesIn) 
               ("unknown", [], [], bytesOut) $ \(r,w) -> 
    forever $ do
      m  <- readQ r        -- receive a request
      jn <- getJobName m   -- get the job name from the request 
      t  <- mapR reg jn (send2Prov w m)   -- apply job
      unless t $ throwIO $ NoProviderX jn -- throw exception
                                          -- when job is not provided
where send2Prov w m p = writeAdHoc w (prvQ p) nullType 
                                     (msgHdrs m) $ msgContent m

User applications, usually, do not need to use registries directly. Registries are used in patterns, namely in Desks, Balancers and in Pubs.

Publisher

data PubA o Source #

The publisher data type

pubName :: PubA o -> String Source #

Access to the name of the publisher

withPub :: Con -> String -> JobName -> QName -> OnError -> WriterDesc o -> (PubA o -> IO r) -> IO r Source #

Create a publisher with the lifetime of the scope of the user action passed in. The publisher, internally, creates a registry to which subscribers will connect to obtain the topic data. The registry will not expect heartbeats from subscribers, since the dependability relation is the other way round: the publisher does not depend on subscribers, but subscribers depend on a publisher. The publisher, usually, does not send heartbeats either. For exceptions to this rule, see withPubProxy.

  • Con: Connect to a Stomp broker;
  • String: Name of the publisher used for error reporting;
  • JobName: The name of the topic;
  • QName: Name of the registration queue (see withRegistry);
  • OnError: Error Handler passed to the registry;
  • WriterDesc: Queue through which data are published; note that the queue name is irrelevant. The publisher will send data to the queues of registered subscribers (see mapR);
  • PubA -> IO r: Action that defines the lifetime of the publisher; the result (r) is also the result of withPub.

publish :: PubA o -> Type -> [Header] -> o -> IO () Source #

Publish data of type o:

  • PubA o: Publisher to use;
  • Type: MIME Type of the message to be sent;
  • [Header]: Additional headers to be sent with the message;
  • o: The message content.

withPubThread :: Con -> String -> JobName -> QName -> Type -> [Header] -> IO o -> WriterDesc o -> Int -> OnError -> IO r -> IO r Source #

Create a publisher that works in a background thread publishing periodically at a monotonic rate, i.e. it creates data and publishes them, computes the difference of the publication rate minus the time needed to create and publish the data and will then suspend the thread for this period. For a publication rate of p microseconds, the thread will be delayed for p - x microseconds, if x corresponds to the time that was spent on creating and publishing the data.

The precision depends of course on your system and its current workload. For most cases, this will be equal to just suspending the thread for the publication rate.

Parameters:

  • Con: Connection to a Stomp broker;
  • String: Name of the publisher used for error reporting;
  • JobName: Name of the topic;
  • QName: Registration queue;
  • Type: MIME Type of the published message;
  • [Header]: Additional headers to be sent with the message;
  • IO o: Action to create the message content;
  • WriterDesc o: Queue through which the message will be published (remember, however, that the queue name is irrelevant);
  • Int: Publication rate in microseconds;
  • OnError: Error handler for the registry and the publisher;
  • IO r: Action that defines the lifetime of the publisher; The result r is also the result of withPubThread.

Subscriber

data SubA i Source #

Subscriber data type

subName :: SubA i -> String Source #

Access to the subscriber name

withSub :: Con -> String -> JobName -> QName -> Int -> ReaderDesc i -> (SubA i -> IO r) -> IO r Source #

Create a subscriber with the lifetime of the user action passed in. The subscriber will internally connect to a publisher's registry and receive data as long as it stays connected.

  • Con: Connection to a Stomp broker;
  • String: Subscriber name useful for error reporting;
  • JobName: Subscribed topic;
  • QName: Queue of a registry to connect to (the Pubs registration queue!)
  • Int: Registration timeout in microseconds;
  • ReaderDesc: This is the queue through which the subscriber will receive data.
  • SubA i -> IO r: Action that defines the lifetime of the subscriber. Its result r is also the result of withSub.

checkIssue :: SubA i -> Int -> IO (Maybe (Message i)) Source #

Check if data have been arrived for this subscriber; if data are available before the timeout expires, the function results in Just (Message i); if the timeout expires first, the result is Nothing.

  • SubA i: The subscriber to check
  • Int: Timeout in microseconds

withSubThread :: Con -> String -> JobName -> QName -> Int -> ReaderDesc i -> (Message i -> IO ()) -> OnError -> IO r -> IO r Source #

Create a subscriber that works in a background thread; Whenever data are available, an application callback passed in to the function is called with the message that has arrived.

  • Con: Connection to a Stomp broker;
  • String: Subscriber name used for error reporting;
  • JobName: Subscribed topic;
  • QName: The publisher's registration queue;
  • Int: Registration timeout in microseconds;
  • ReaderDesc i: Queue through which the subscriber shall receive data;
  • Message i -> IO (): Application callback;
  • OnError: Error handler;
  • IO r: Action that defines the lifetime of the subscriber; the result r is also the result of withSubThread.

withSubMVar :: Con -> String -> JobName -> QName -> Int -> ReaderDesc i -> MVar i -> OnError -> IO r -> IO r Source #

Create a subscriber that works in a background thread and updates an MVar, whenever new data are available; the function is in fact a special case of withSubThread, where the application callback updates an MVar. Note that the MVar must not be empty when the function is called, otherwise, it will block on modifying the MVar.

  • Con: Connection to a Stomp broker;
  • String: Subscriber name used for error reporting;
  • JobName: Subscribed topic;
  • QName: The publisher's registration queue;
  • Int: Registration timeout in microseconds;
  • ReaderDesc i: Queue through which the subscriber shall receive data;
  • MVar i: MVar to update;
  • OnError: Error handler;
  • IO r: Action that defines the lifetime of the subscriber; the result r is also the result of withSubMVar.

Heartbeats for Pub

withPubProxy :: Con -> String -> JobName -> QName -> ReaderDesc i -> RegistryDesc -> OnError -> IO r -> IO r Source #

Unlike servers and workers, publishers have no interface to connect internally to a registry. The rationale for this is that publishers do not need load balancers or similar means that would require registration. As a consequence, there is no means to send heartbeats internally. Sometimes, however, the need to connect to a registry may arise. The Desk pattern is an example where it makes sense to register a publisher. But then, there is no means to internally send heartbeats proving that the publisher is still alive. For this case, a simple solution for periodic publishers is available: a heartbeat proxy that is implemented as a subscriber receiving data from the publisher and sending a heartbeat on every dataset that arrives.

This function provides a proxy that internally connects to a registry on behalf of a publisher and sends heartbeats.

  • Con: Connection to a Stomp broker;
  • String: Name of the proxy used for error reporting;
  • JobName: Name of the topic, the publisher provides;
  • QName: Registration queue of the publisher - this is the queue to which the internal subscriber connects;
  • ReaderDesc i: The queue through which the internal subscriber receives data;
  • RegistryDesc: The other registry - it is this registry to which the proxy will send heartbeats;
  • OnError: Error Handler;
  • IO r: Action that definex the proxy's lifetime; its result r is also the result of withPubProxy.

Exceptions and Error Handling

data PatternsException Source #

Patterns Exception

Constructors

TimeoutX String

Timout expired

BadStatusCodeX String

Invalid status code

NotOKX StatusCode String

Status code other than OK

HeaderX String String

Error on Header identified by the first String

MissingHbX String

Thrown on missing heartbeat (after tolerance of 10 missing heartbeats)

UnacceptableHbX Int

Heartbeat proposed by registry out of acceptable range

NoProviderX String

No provider for the requested job available

AppX String

Application-defined exception

type OnError = SomeException -> String -> IO () Source #

Error Handler:

  • SomeException: Exception that led the invocation;
  • String: Name of the entity (client name, server name, etc.)

data StatusCode Source #

Status code to communicate the state of a request between two applications. The wire format is inspired by HTTP status codes:

  • OK (200): Everything is fine
  • BadRequest (400): Syntax error in the request message
  • Forbidden (403): Not used
  • NotFound (404): For the requested job no provider is available
  • Timeout (408): Timeout expired

Instances

Instances details
Eq StatusCode Source # 
Instance details

Defined in Types

Read StatusCode Source # 
Instance details

Defined in Types

Show StatusCode Source # 
Instance details

Defined in Types

readStatusCode :: String -> Either String StatusCode Source #

Safe StatusCode parser (StatusCode is instance of Read, but read would cause an error on an invalid StatusCode)

Useful Types and Helpers

type JobName = String Source #

Name of a service, task or topic

type QName = String Source #

Name of a Stomp queue

nobody :: OutBound () Source #

OutBound converter for messages of type ()

ignorebody :: InBound () Source #

InBound converter for messages of type ()

bytesOut :: OutBound ByteString Source #

OutBound converter for messages of type ByteString

bytesIn :: InBound ByteString Source #

InBound converter for messages of type ByteString

getJobName :: Message m -> IO String Source #

Get Job name from headers (and throw an exception if the header does not exist)

getJobType :: Message m -> IO JobType Source #

Get Job type from headers (and throw an exception if the header does not exist or contains an invalid value)

getQueue :: Message m -> IO String Source #

Get Queue name from headers (and throw an exception if the header does not exist)

getChannel :: Message m -> IO String Source #

Get Reply queue (channel) from headers (and throw an exception if the header does not exist)

getHB :: Message m -> IO Int Source #

Get Heartbeat specification from headers (and throw an exception if the header does not exist or if its value is not numeric)

getSC :: Message m -> IO (Either String StatusCode) Source #

Get Status code from headers (and throw an exception if the header does not exist)

getHeader :: String -> String -> Message m -> IO String Source #

Get Generic function to retrieve a header value (and throw an exception if the header does not exist):

  • String: Key of the wanted header
  • String: Error message in case there is no such header
  • Message m: The message whose headers we want to search