-- Hoogle documentation, generated by Haddock
-- See Hoogle, http://www.haskell.org/hoogle/
-- | Stompl MOM Stomp Patterns
--
@package stomp-patterns
@version 0.1.0
-- | This module provides the basic patterns client/server, publish
-- and subscribe and pusher/worker (a.k.a. pipeline) as well as a
-- registry, a means to support patterns where one application
-- uses or serves a set of other applications.
--
-- Basic patterns can be distinguished by the data exchange defined by
-- their protocol:
--
--
-- - client/server: The client, first, sends data to the server (the
-- request) and, then, the server sends data to this client (the
-- reply);
-- - publish and subscribe: The publisher sends data to all subscribers
-- (the topic);
-- - pusher/worker: The pusher sends data to the worker (the request)
-- without receiving a reply.
--
--
-- We call the processing performed by an application on behalf of
-- another application a job. There are three different job types:
--
--
-- - service: must be requested explicitly and includes a message sent
-- from the application that perfoms the service (server) to the
-- application that requests the service (client).
-- - task: must be requested explicitly, but does not include a
-- reply.
-- - topic: is sent to registered subscribers without being requested
-- explicitly.
--
--
-- Applications providing a job are generically called providers,
-- applications requesting a job are called consumers. Providers, hence,
-- are servers, workers and publishers, and consumers are clients,
-- pushers and subscribers. Note that this is somewhat different from the
-- data-centric terminology "producer" and "consumer". It is not very
-- useful to distinugish servers from clients or pushers from workers by
-- referring to the distinction of producing or not producing data. It is
-- in fact the pusher that produces data, not the worker. The pusher,
-- however, is the one that requests something from the worker. The task,
-- in this case, is the "good" that is provided by one side and consumed
-- by the other.
--
-- This distinction is relevant when we start to think about
-- reliability. Reliability is a relation between a provider and a
-- consumer: The consumer relies on the producer, not the other way
-- round, e.g. a pusher relies on a woker and a client on a server
-- to get the job done.
--
-- The interfaces in this library give some guarantees related to
-- reliability, but there are also some pitfalls:
--
--
-- - A client using timeouts can be sure that the requested service has
-- been performed when the reply arrives before the timeout expires. If
-- no reply arrives before timeout expiration, no such claim can be made
-- (in particular not that the service has not been performed). The
-- service may have been performed, but it may have taken more time than
-- expected or it may have been performed, but the reply message has
-- failed to arrive. If the service is idempotent - i.e.
-- calling the service twice has the same effect as calling it once - the
-- client, when the timeout has expired, can just send the request once
-- again; otherwise, it has to use other means to recover from this
-- situation.
-- - A pusher will never know if the task has been performed correctly,
-- since there is no response from the worker. This is one of the reasons
-- that the pipeline pattern should usually not be used alone, but in the
-- context of a balancer. (You usually want to push a request to a worker
-- through a balancer -- one of the ideas behind pusher/woker is work
-- balancing.) A balancer may request providers to send heartbeats
-- and, this way, minimise the risk of failure. The worker still may fail
-- between a heartbeat and a request and even the fact that it does send
-- hearbeats does not necessarily mean that it is operating correctly. If
-- it is essential for the client to know that all tasks have been
-- performed correctly, other verification means are required.
-- - Finally, a subscriber will never know whether a publisher is still
-- working correctly or not, if the publisher does not send data
-- periodically. A reliable design would use periodic publishers,
-- i.e. publishers that send data at a constant rate, even if no
-- new data are available. The data update, in this case, would have the
-- effect of a heartbeat.
--
--
-- The library uses a set of headers that must not be used by
-- applications. All internal header keys start and end with two
-- underscores. By avoiding this naming of header keys, application code
-- easily avoids naming conflicts. The headers used in basic patterns
-- are:
--
--
-- - channel: Reply queue (client/server)
-- - job: The requested job (client/server, pusher/worker
-- and registry)
-- - type: Request type (registry)
-- - job-type: Type of job (registry)
-- - queue: Queue to register (registry)
-- - hb: Heartbeat specification (registry)
-- - sc: Status Code (registry)
--
module Network.Mom.Stompl.Patterns.Basic
-- | The client data type, which implements the client side of the
-- client/server protocol.
data ClientA i o
-- | Access to the client name
clName :: ClientA i o -> String
-- | The function creates a client that lives within its scope.
--
-- Parameters:
--
--
-- - Con: Connection to a Stomp broker
-- - String: Name of the Client, which can be used for error
-- reporting.
-- - JobName: Name of the Service the client will
-- request
-- - ReaderDesc i: Description of a reader queue; this is the
-- queue through which the server will send its response.
-- - WriterDesc o: Description of a writer queue; this is the
-- queue through which the server is expecting requests.
-- - ClientA i o -> IO r: An application-defined action whose
-- scope defines the client's lifetime
--
withClient :: Con -> String -> JobName -> ReaderDesc i -> WriterDesc o -> (ClientA i o -> IO r) -> IO r
-- | The client will send the request of type o and wait for the
-- reply until the timeout exprires. The reply is of type i and is
-- returned as Message i. If the timeout expires before the
-- reply has been received, the function returns Nothing.
--
-- Since servers do not know the clients they are serving, request
-- sends the name of its reader queue (the reply queue) as message
-- header to the server.
--
-- Parameters:
--
--
-- - ClientA i o: The client; note that i is the type of the
-- reply, o is the type of the request.
-- - Int: The timeout in microseconds.
-- - Type: The MIME type of the request.
-- - [Header]: List of additional headers to be sent with the
-- request.
-- - o: The request
--
request :: ClientA i o -> Int -> Type -> [Header] -> o -> IO (Maybe (Message i))
-- | This function serves as a "delayed" receiver for the case that the
-- timeout of a request has expired. When using this function, it is
-- assumed that a request has been made, but no response has been
-- received. It can be used in time-critical applications, where the
-- client may use the time between request and reply productively,
-- instead of passively blocking on the reply queue.
--
-- Use this function with care! It can be easily abused to break the
-- client/server pattern, when it is called without a request having been
-- made before. If, in this case, timout is -1, the
-- application will block forever.
--
-- The function receives those parameters from request that are
-- related to receiving the reply, i.e. Type,
-- [Header] and o are not passed to checkRequest.
checkRequest :: ClientA i o -> Int -> IO (Maybe (Message i))
-- | The server data type, which implements the server side of the
-- client/server protocol.
data ServerA i o
-- | Access to the server name
srvName :: ServerA i o -> String
-- | The function creates a server that lives within the scope of the
-- application-defined action passed into it.
--
-- Parameters:
--
--
-- - Con: Connection to a Stomp broker
-- - String: Name of the Server, which can be used for error
-- reporting.
-- - ReaderDesc i: Description of a reader queue; this is the
-- queue through which clients are expected to send requests.
-- - WriterDesc o: Description of a writer queue; this is the
-- queue through which a specific client will expect the reply. Note that
-- the server will overwrite the destination of this queue using
-- writeAdHoc; the destination of this queue, hence, is
-- irrelevant.
-- - ServerA i o -> IO r: An application-defined action whose
-- scope defines the server's lifetime
--
withServer :: Con -> String -> ReaderDesc i -> WriterDesc o -> (ServerA i o -> IO r) -> IO r
-- | Waits for a client request, calls the application-defined transformer
-- to generate a reply and sends this reply through the reply queue whose
-- name is indicated by a header in the request. The time a server waits
-- for a request may be restricted by the timeout. Typically, you would
-- call reply with timeout set to -1 (wait eternally).
-- There may be situations, however, where it actually makes sense to
-- restrict the waiting time, i.e. to perform some housekeeping in
-- between.
--
-- Typically, you call reply in a loop like
--
--
-- forever $ reply srv (-1) nullType [] f
--
--
-- where f is a function of type
--
--
-- Message i -> IO o.
--
--
-- Parameters:
--
--
-- - ServerA i o: The server; note that i is the request queue
-- and o the reply queue.
-- - Int: The timeout in microseconds.
-- - Type: The MIME type of the reply.
-- - [Header]: Additional headers to be sent with the
-- reply.
-- - Message i -> IO o: Transforms the request into a reply -
-- this defines the service provided by this application.
--
reply :: ServerA i o -> Int -> Type -> [Header] -> (Message i -> IO o) -> IO ()
-- | Connect to a registry: The caller registers itself at the registry.
-- The owner of the registry will then use the caller depending on its
-- purpose.
--
--
-- - Con: Connection to a Stomp broker;
-- - JobName: The name of the job provided by the caller;
-- - JobType: The type of the job provided by the caller;
-- - QName: The registry's registration queue;
-- - QName: The queue to register; this is the queue the
-- register will actually use (for forwarding requests or whatever it
-- does in this specific case). The registry, internally, uses
-- JobName together with this queue as a key to identify
-- the provider.
-- - Int: Timeout in microseconds;
-- - Int: Preferred heartbeat in milliseconds (0 for no
-- heartbeats).
--
--
-- The function returns a tuple of StatusCode and the heartbeat
-- proposed by the registry (which may differ from the preferred
-- heartbeat of the caller). Whenever the StatusCode is not
-- OK, the heartbeat is 0. If the JobName is null, the
-- StatusCode will be BadRequest. If the timeout expires,
-- register throws TimeoutX.
register :: Con -> JobName -> JobType -> QName -> QName -> Int -> Int -> IO (StatusCode, Int)
-- | Disconnect from a registry: The caller disconnects from a registry to
-- which it has registered before. For the case that the registry is not
-- receiving heartbeats from the caller, it is essential to unregister,
-- when the service is no longer provided. Otherwise, the registry has no
-- way to know that it should not send requests to this provider anymore.
--
--
-- - Con: Connection to a Stomp broker;
-- - JobName: The JobName to unregister;
-- - QName: The registry's registration queue ;
-- - QName: The queue to unregister;
-- - Int: The timeout in microseconds.
--
--
-- The function returns a StatusCode. If JobName is null,
-- the StatusCode will be BadRequest. If the timeout
-- expires, the function will throw TimeoutX.
unRegister :: Con -> JobName -> QName -> QName -> Int -> IO StatusCode
-- | Send heartbeats:
--
--
-- - MVar HB: An MVar of type HB, this MVar will
-- be used to keep track of when the heartbeat has actually to be
-- sent.
-- - Writer (): The writer through which to send the heartbeat;
-- The queue name of the writer is the registration queue of the
-- registry; note that its type is (): heartbeats are empty
-- messages.
-- - JobName: The JobName for which to send
-- heartbeats;
-- - QName: The queue for which to send heartbeats.
--
heartbeat :: MVar HB -> Writer () -> JobName -> QName -> IO ()
-- | Heartbeat controller type
data HB
-- | Create a heartbeat controller; receives the heartbeat in milliseconds.
mkHB :: Int -> IO HB
-- | Create a server that works in a background thread: The background
-- thread (and with it the server) is running until the action passed in
-- to the function (IO r) terminates; when it terminates, the background
-- thread is terminated as well. withServerThread may connect to a
-- registry (to serve as a provider of a balancer for instance), which is
-- automatically handled internally when a RegistryDesc is passed in with
-- a QName that is not null.
--
--
-- - Con: Connection to a Stomp broker;
-- - String: The name of the server, used for error
-- reporting;
-- - JobName: The job provided by this server
-- - Type: The MIME Type (passed to reply)
-- - [Header]: Additional headers (passed to reply)
-- - Message i -> IO o: The core of the reply function:
-- transforming a request of type i into a reply of type
-- o
-- - ReaderDesc i: The reader through which requests are
-- expected;
-- - WriterDesc o: The writer through which replies are
-- sent;
-- - RegistryDesc: Describes whether and how to connect to a
-- registry: if the queue name of the registry description is null, the
-- function will not connect to a registry; otherwise it will connect to
-- the registry proposing the best value of the RegistryDesc as
-- its preferred heartbeat rate; should the heartbeat rate returned by
-- the registry be outside the scope of min and max,
-- withServerThread will terminate with
-- UnacceptableHbX.
-- - OnError: Error handler
-- - IO r: The function starts a new thread on which the the server is
-- working; the thread from which the function was called continues in
-- this action. Its return value is also the result of
-- withServerThread. When the action terminates, the new thread is
-- terminated internally.
--
withServerThread :: Con -> String -> JobName -> Type -> [Header] -> (Message i -> IO o) -> ReaderDesc i -> WriterDesc o -> RegistryDesc -> OnError -> IO r -> IO r
-- | A helper that shall ease the use of the registers. A registry to which
-- a call wants to connect is described as
--
--
-- - The QName through which the registry receives
-- requests;
-- - The Timeout in microseconds, i.e. the time the
-- caller will wait before the request fails;
-- - A triple of heartbeat specifications: the best value,
-- i.e. the rate at which the caller prefers to send heartbeats,
-- the minimum rate at which the caller can accept to send
-- heartbeats, the maximum rate at which the caller can accept to
-- send heartbeats. Note that all these values are in milliseconds!
--
type RegistryDesc = (QName, Int, (Int, Int, Int))
-- | The Pusher data type, which implements the consumer side of the
-- pipeline protocol. Note that, when we say "consumer" here, the pusher
-- is actually a data producer, but consumes the effect of having a task
-- done. The pusher can be seen as a client that does not expect a reply.
data PusherA o
-- | Access to the pusher's name
pushName :: PusherA o -> String
-- | Create a Pusher with the lifetime of the action passed in:
--
--
-- - Con: Connection to a Stomp broker;
-- - String: Name of the pusher, which may be used for error
-- reporting;
-- - JobName: Name of the job requested by this pusher;
-- - WriterDesc o: Writer queue through which the job
-- request is pushed;
-- - (PusherA o -> IO r): Action that defines the lifetime of
-- the pusher; the result r is also the result of
-- withPusher.
--
withPusher :: Con -> String -> JobName -> WriterDesc o -> (PusherA o -> IO r) -> IO r
-- | Push a Job:
--
--
-- - PusherA o: The pusher to be used;
- Type: The
-- MIME Type of the message to be sent;
- [Header]: The
-- headers to be sent with the message;
- o: The message
-- contents.
--
push :: PusherA o -> Type -> [Header] -> o -> IO ()
-- | On the other side of the pipeline, there sits a worker waiting for
-- requests. Note that no Worker data type is defined. Instead,
-- there is only a withTaskThread function that, internally,
-- creates a worker acting in a background thread. The rationale is that
-- it does not make too much sense to have a pipeline with only one
-- worker. It is in fact part of the idea of the pipeline pattern that
-- several workers are used through a balancer. withTaskThread
-- implements the interaction with the registry internally and frees the
-- programmer from concerns related to registration. If you really need a
-- single worker, you can call the function with an empty RegistryDesc,
-- i.e. with an empty queue name.
--
--
-- - Con: Connection to a Stomp broker;
-- - String: Name of the worker used for error reporting;
-- - JobName: Name of the job, the worker provides;
-- - (Message i -> IO ()): The job provided by the worker.
-- Note that the function does not return a value: Since workers do not
-- produce a reply, no result is necessary;
-- - ReaderDesc i: Queue through which the worker receives
-- requests;
-- - RegistryDesc: The registry to which the worker
-- connects;
-- - OnError: Error handler;
-- - IO r: Action that defines the worker's lifetime.
--
withTaskThread :: Con -> String -> JobName -> (Message i -> IO ()) -> ReaderDesc i -> RegistryDesc -> OnError -> IO r -> IO r
-- | Registry: An opaque data type
data Registry
-- | A registry is used through a function that, internally, creates a
-- registry and defines its lifetime in terms of the scope of an action
-- passed in to the function:
--
--
-- - Con: Connection to a Stomp broker;
-- - String: Name of the registry used for error handling;
-- - QName: Name of the registration queue. It is this queue to
-- which register sends a registration request;
-- - (Int, Int): Minimal and maximal accepted heartbeat interval;
-- - OnError: Error handler;
-- - (Registry -> IO r): The action that defines the
-- registry's lifetime; the result of this action, r, is also the
-- result of withRegistry.
--
withRegistry :: Con -> String -> QName -> (Int, Int) -> OnError -> (Registry -> IO r) -> IO r
-- | Map action to Providers of job JobName; mapping means
-- different things for:
--
--
-- - Serice, Task: action is applied to the first active provider of a
-- list of providers and this provider is then sent to the back of the
-- list, hence, implementing a balancer.
-- - Topic: action is applied to all providers, hence, implementing a
-- publisher.
--
--
-- Parameters:
--
--
-- - Registry: The registry to use;
-- - JobName: The job to which to apply the action;
-- - (Provider -> IO ()): The action to apply.
--
--
-- The function returns False iff the requested job is not available and
-- True otherwise. (Note that a job without providers is removed; when
-- the function returns True, the job, thus, was applied at least once.
mapR :: Registry -> JobName -> (Provider -> IO ()) -> IO Bool
-- | Retrieves n Providers of a certain job; getProvider
-- works, for all JobTypes according to the work balancer logic,
-- i.e.: it returns the first n providers of the list for this job
-- and moves them to the end of the list. getProvider is used, for
-- instance, in the Desk pattern.
--
--
-- - Registry: The registry in use;
-- - JobName: The job for which the caller needs a
-- provider;
-- - Int: The number n of providers to retrieve; if less than
-- n providers are available for this job, all available providers
-- will be returned, but no error event is created.
--
getProvider :: Registry -> JobName -> Int -> IO [Provider]
-- | This function shows all jobs with all their providers in a registry;
-- the function is intended for debugging only.
showRegistry :: Registry -> IO ()
-- | A provider is an opaque data type; most of its attributes are used
-- only internally by the registry. Interesting for user applications,
-- however, is the queue that identifies the provider.
data Provider
-- | Queue through which the job is provided
prvQ :: Provider -> QName
-- | JobType: Service, Task or Topic
data JobType
Service :: JobType
Task :: JobType
Topic :: JobType
-- | The publisher data type
data PubA o
-- | Access to the name of the publisher
pubName :: PubA o -> String
-- | Create a publisher with the lifetime of the scope of the user action
-- passed in. The publisher, internally, creates a registry to which
-- subscribers will connect to obtain the topic data. The registry will
-- not expect heartbeats from subscribers, since the dependability
-- relation is the other way round: the publisher does not depend on
-- subscribers, but subscribers depend on a publisher. The publisher,
-- usually, does not send heartbeats either. For exceptions to this rule,
-- see withPubProxy.
--
--
-- - Con: Connect to a Stomp broker;
-- - String: Name of the publisher used for error reporting;
-- - JobName: The name of the topic;
-- - QName: Name of the registration queue (see
-- withRegistry);
-- - OnError: Error Handler passed to the registry;
-- - WriterDesc: Queue through which data are published; note
-- that the queue name is irrelevant. The publisher will send data to the
-- queues of registered subscribers (see mapR);
-- - PubA -> IO r: Action that defines the lifetime of the
-- publisher; the result (r) is also the result of
-- withPub.
--
withPub :: Con -> String -> JobName -> QName -> OnError -> WriterDesc o -> (PubA o -> IO r) -> IO r
-- | Publish data of type o:
--
--
-- - PubA o: Publisher to use;
-- - Type: MIME Type of the message to be sent;
-- - [Header]: Additional headers to be sent with the
-- message;
-- - o: The message content.
--
publish :: PubA o -> Type -> [Header] -> o -> IO ()
-- | Create a publisher that works in a background thread publishing
-- periodically at a monotonic rate, i.e. it creates data and
-- publishes them, computes the difference of the publication rate minus
-- the time needed to create and publish the data and will then suspend
-- the thread for this period. For a publication rate of p
-- microseconds, the thread will be delayed for p - x
-- microseconds, if x corresponds to the time that was spent on
-- creating and publishing the data.
--
-- The precision depends of course on your system and its current
-- workload. For most cases, this will be equal to just suspending the
-- thread for the publication rate.
--
-- Parameters:
--
--
-- - Con: Connection to a Stomp broker;
-- - String: Name of the publisher used for error reporting;
-- - JobName: Name of the topic;
-- - QName: Registration queue;
-- - Type: MIME Type of the published message;
-- - [Header]: Additional headers to be sent with the
-- message;
-- - IO o: Action to create the message content;
-- - WriterDesc o: Queue through which the message will be
-- published (remember, however, that the queue name is irrelevant);
-- - Int: Publication rate in microseconds;
-- - OnError: Error handler for the registry and the
-- publisher;
-- - IO r: Action that defines the lifetime of the publisher; The
-- result r is also the result of withPubThread.
--
withPubThread :: Con -> String -> JobName -> QName -> Type -> [Header] -> IO o -> WriterDesc o -> Int -> OnError -> IO r -> IO r
-- | Subscriber data type
data SubA i
-- | Access to the subscriber name
subName :: SubA i -> String
-- | Create a subscriber with the lifetime of the user action passed in.
-- The subscriber will internally connect to a publisher's registry and
-- receive data as long as it stays connected.
--
--
-- - Con: Connection to a Stomp broker;
-- - String: Subscriber name useful for error reporting;
-- - JobName: Subscribed topic;
-- - QName: Queue of a registry to connect to (the Pubs
-- registration queue!)
-- - Int: Registration timeout in microseconds;
-- - ReaderDesc: This is the queue through which the subscriber
-- will receive data.
-- - SubA i -> IO r: Action that defines the lifetime of the
-- subscriber. Its result r is also the result of
-- withSub.
--
withSub :: Con -> String -> JobName -> QName -> Int -> ReaderDesc i -> (SubA i -> IO r) -> IO r
-- | Check if data have been arrived for this subscriber; if data are
-- available before the timeout expires, the function results in
-- Just (Message i); if the timeout expires first, the
-- result is Nothing.
--
--
-- - SubA i: The subscriber to check
-- - Int: Timeout in microseconds
--
checkIssue :: SubA i -> Int -> IO (Maybe (Message i))
-- | Create a subscriber that works in a background thread; Whenever data
-- are available, an application callback passed in to the function is
-- called with the message that has arrived.
--
--
-- - Con: Connection to a Stomp broker;
-- - String: Subscriber name used for error reporting;
-- - JobName: Subscribed topic;
-- - QName: The publisher's registration queue;
-- - Int: Registration timeout in microseconds;
-- - ReaderDesc i: Queue through which the subscriber shall
-- receive data;
-- - Message i -> IO (): Application callback;
-- - OnError: Error handler;
-- - IO r: Action that defines the lifetime of the subscriber; the
-- result r is also the result of withSubThread.
--
withSubThread :: Con -> String -> JobName -> QName -> Int -> ReaderDesc i -> (Message i -> IO ()) -> OnError -> IO r -> IO r
-- | Create a subscriber that works in a background thread and updates an
-- MVar, whenever new data are available; the function is in fact a
-- special case of withSubThread, where the application callback
-- updates an MVar. Note that the MVar must not be empty when the
-- function is called, otherwise, it will block on modifying the MVar.
--
--
-- - Con: Connection to a Stomp broker;
-- - String: Subscriber name used for error reporting;
-- - JobName: Subscribed topic;
-- - QName: The publisher's registration queue;
-- - Int: Registration timeout in microseconds;
-- - ReaderDesc i: Queue through which the subscriber shall
-- receive data;
-- - MVar i: MVar to update;
-- - OnError: Error handler;
-- - IO r: Action that defines the lifetime of the subscriber; the
-- result r is also the result of withSubMVar.
--
withSubMVar :: Con -> String -> JobName -> QName -> Int -> ReaderDesc i -> MVar i -> OnError -> IO r -> IO r
-- | Unlike servers and workers, publishers have no interface to connect
-- internally to a registry. The rationale for this is that publishers do
-- not need load balancers or similar means that would require
-- registration. As a consequence, there is no means to send heartbeats
-- internally. Sometimes, however, the need to connect to a registry may
-- arise. The Desk pattern is an example where it makes sense to register
-- a publisher. But then, there is no means to internally send heartbeats
-- proving that the publisher is still alive. For this case, a simple
-- solution for periodic publishers is available: a heartbeat proxy that
-- is implemented as a subscriber receiving data from the publisher and
-- sending a heartbeat on every dataset that arrives.
--
-- This function provides a proxy that internally connects to a registry
-- on behalf of a publisher and sends heartbeats.
--
--
-- - Con: Connection to a Stomp broker;
-- - String: Name of the proxy used for error reporting;
-- - JobName: Name of the topic, the publisher provides;
-- - QName: Registration queue of the publisher - this is the
-- queue to which the internal subscriber connects;
-- - ReaderDesc i: The queue through which the internal
-- subscriber receives data;
-- - RegistryDesc: The other registry - it is this registry to
-- which the proxy will send heartbeats;
-- - OnError: Error Handler;
-- - IO r: Action that definex the proxy's lifetime; its result
-- r is also the result of withPubProxy.
--
withPubProxy :: Con -> String -> JobName -> QName -> ReaderDesc i -> RegistryDesc -> OnError -> IO r -> IO r
-- | Patterns Exception
data PatternsException
-- | Timout expired
TimeoutX :: String -> PatternsException
-- | Invalid status code
BadStatusCodeX :: String -> PatternsException
-- | Status code other than OK
NotOKX :: StatusCode -> String -> PatternsException
-- | Error on Header identified by the first String
HeaderX :: String -> String -> PatternsException
-- | Thrown on missing heartbeat (after tolerance of 10 missing heartbeats)
MissingHbX :: String -> PatternsException
-- | Heartbeat proposed by registry out of acceptable range
UnacceptableHbX :: Int -> PatternsException
-- | No provider for the requested job available
NoProviderX :: String -> PatternsException
-- | Application-defined exception
AppX :: String -> PatternsException
-- | Error Handler:
--
--
-- - SomeException: Exception that led the invocation;
-- - String: Name of the entity (client name, server name,
-- etc.)
--
type OnError = SomeException -> String -> IO ()
-- | Status code to communicate the state of a request between two
-- applications. The wire format is inspired by HTTP status codes:
--
--
-- - OK (200): Everything is fine
-- - BadRequest (400): Syntax error in the request message
-- - Forbidden (403): Not used
-- - NotFound (404): For the requested job no provider is
-- available
-- - Timeout (408): Timeout expired
--
data StatusCode
OK :: StatusCode
BadRequest :: StatusCode
Forbidden :: StatusCode
NotFound :: StatusCode
Timeout :: StatusCode
-- | Safe StatusCode parser (StatusCode is instance of Read,
-- but read would cause an error on an invalid StatusCode)
readStatusCode :: String -> Either String StatusCode
-- | Name of a service, task or topic
type JobName = String
-- | Name of a Stomp queue
type QName = String
-- | OutBound converter for messages of type ()
nobody :: OutBound ()
-- | InBound converter for messages of type ()
ignorebody :: InBound ()
-- | OutBound converter for messages of type ByteString
bytesOut :: OutBound ByteString
-- | InBound converter for messages of type ByteString
bytesIn :: InBound ByteString
-- | Get Job name from headers (and throw an exception if the header does
-- not exist)
getJobName :: Message m -> IO String
-- | Get Job type from headers (and throw an exception if the header does
-- not exist or contains an invalid value)
getJobType :: Message m -> IO JobType
-- | Get Queue name from headers (and throw an exception if the header does
-- not exist)
getQueue :: Message m -> IO String
-- | Get Reply queue (channel) from headers (and throw an exception if the
-- header does not exist)
getChannel :: Message m -> IO String
-- | Get Heartbeat specification from headers (and throw an exception if
-- the header does not exist or if its value is not numeric)
getHB :: Message m -> IO Int
-- | Get Status code from headers (and throw an exception if the header
-- does not exist)
getSC :: Message m -> IO (Either String StatusCode)
-- | Get Generic function to retrieve a header value (and throw an
-- exception if the header does not exist):
--
--
-- - String: Key of the wanted header
-- - String: Error message in case there is no such header
-- - Message m: The message whose headers we want to search
--
getHeader :: String -> String -> Message m -> IO String
-- | Bridges link providers connected to one broker to consumers connected
-- to another broker.
--
-- For publishers and workers, this is quite trivial: the bridge
-- implements the corresponding consumer on one broker and the
-- corresponding provider on the other.
--
-- For servers, the task is somewhat more complicated: since servers use
-- the client's reply queue to send the result back to the client and
-- this queue only exists on the broker to which the client is connected,
-- the bridge has to remember the client's reply queue and use its own
-- queue on the server-side broker to finally route the reply back to the
-- original client. With many broker connected by a service bridge, this
-- can result in long chains of clients and servers sending requests and
-- waiting for replies.
module Network.Mom.Stompl.Patterns.Bridge
-- | Create a forwarder with the lifetime of the application-defined action
-- passed in and start it in a background thread:
--
--
-- - Con: Connection to the source broker (the one where the
-- original publisher is connected);
-- - Con: Connection to the target broker (the one where the
-- target subscribers are connected);
-- - String: Name of the forwarder used for error handling;
-- - JobName: Name of the Topic that is bridged;
-- - QName: Registration queue of the source publisher;
-- - QName: Queue through which the internal subscriber will
-- receive topic data from the source publisher;
-- - QName: Registration queue of the target publisher;
-- - Int: Timeout on registering to the source publisher in
-- microseconds;
-- - OnError: Error handler;
-- - IO r: Action that defines the lifetime of the forwarder; its
-- result r is also the result of withForwarder.
--
--
-- Note the remarkable similarity to the router pattern
-- (withRouter). In fact, a router is but a forwarder where
-- source and target broker are the same.
withForwarder :: Con -> Con -> String -> JobName -> QName -> QName -> QName -> Int -> OnError -> IO r -> IO r
-- | Create a TaskBridge with the lifetime of the action passed in and
-- start it on a background thread:
--
--
-- - Con: Connection to the source broker (the one to which the
-- pusher is connected);
-- - Con: Connection to the target broker (the one to which the
-- worker is connected);
-- - String: Name of the bridge used for error handling;
-- - JobName: Name of the Task that is bridged;
-- - QName: Queue of the worker on the source side; (if the
-- worker is connected to a balancer on the source side, this is an
-- internal queue only visible in the bridge and in the balancer);
-- - QName: Queue of the worker on the target side (which may be
-- a balancer's request queue);
-- - RegistryDesc: Registry (i.e. balancer) to
-- which the bridge is connected on the source side;
-- - OnError: Error handler;
-- - IO r: Action that defines the lifetime of the bridge; its result
-- r is also the result of withTaskBridge.
--
withTaskBridge :: Con -> Con -> String -> JobName -> QName -> QName -> RegistryDesc -> OnError -> IO r -> IO r
-- | Create a ServiceBridge with the lifetime of the action passed in and
-- start it on a background thread:
--
--
-- - Con: Connection to the source broker (the one to which the
-- client is connected);
-- - Con: Connection to the target broker (the one to which the
-- server is connected);
-- - String: Name of the bridge used for error handling;
-- - JobName: Name of the Service that is bridged;
-- - QName: Queue of the server on the source side; (if the
-- server is connected to a balancer on the source side, this is an
-- internal queue only visible in the bridge and in the balancer);
-- - QName: Reader queue of the internal client on the target
-- side;
-- - QName: Queue of the server on the target side (which may be
-- a balancer's request queue);
-- - RegistryDesc: Registry (i.e. balancer) to
-- which the bridge is connected on the source side;
-- - OnError: Error handler;
-- - IO r: Action that defines the lifetime of the bridge; its result
-- r is also the result of withServiceBridge.
--
withServiceBridge :: Con -> Con -> String -> JobName -> QName -> QName -> QName -> RegistryDesc -> OnError -> IO r -> IO r
-- | A Desk is a server that supplies information about providers. A client
-- requests providers for a specific job (service, task or topic) and the
-- desk will reply with a list of queue names of providers of the
-- enquired job.
--
-- The desk is not statically configured, but uses a registry to which
-- providers connect. Providers that cease to work can disconnect or, if
-- heartbeats are required, will be removed from the list of available
-- providers internally when no more heartbeats are sent. This way, the
-- information provided by a desk is always up-to-date.
--
-- Desk balances providers, i.e. providers rotate in a list from
-- which always the first n providers are handed out to requesting
-- consumers (where n corresponds to the number of providers
-- requested by the consumer.)
--
-- Since providers are managed dynamically, the result of two consecutive
-- calls is probably not the same. Desk is thus not idempotent in the
-- strict sense. But, since the call itself does only cause a change of
-- the order of providers (and since it should be irrelevant for the
-- consumer which provider is actually used), two consecutive calls will
-- have the same effect -- if not all providers disconnect between the
-- two calls.
--
-- Internally, the Desk protocol uses the following headers:
--
--
-- - jobs: Comma-separated list of providers;
-- - redundancy: Requested number of providers.
--
module Network.Mom.Stompl.Patterns.Desk
-- | Creates a desk with the lifetime of the application-defined action:
--
--
-- - Con: Connection to a Stomp broker;
-- - String: Name of the desk, used for error handling;
-- - QName: Registration queue -- this queue is used by
-- providers to connect to the registry, it is not used for consumer
-- requests;
-- - (Int, Int): Heartbeat range of the Registry (see
-- withRegistry for details);
-- - OnError: Error handling;
-- - QName: Request queue -- this queue is used by consumers to
-- request information about available providers;
-- - IO r: Action that defines the lifetime of the desk; the result is
-- also the result of withDesk.
--
withDesk :: Con -> String -> QName -> (Int, Int) -> OnError -> QName -> IO r -> IO r
-- | Function used by consumer to request provider information from a desk:
--
--
-- - ClientA () (): The request to the desk is sent through a
-- client of type () (). This client must be created by the application
-- beforehand (e.g.: the client could be created once during
-- initialisation and then be used repeatedly to obtain or update
-- information on providers according to the application needs);
-- - Int: Timeout in microseconds;
-- - JobName: Name of the job for which the consumer needs
-- providers;
-- - Int: Number of providers needed by the consumer. This can be used
-- for redundancy: if one provider fails, the consumer passes to the
-- next. Be aware, however, that the information, at the point in time,
-- when a provider fails, may already be outdated. Therefore, the
-- redundant providers should be used immediately and, when the main
-- provider fails later, the information should be updated by requesting
-- new providers from the desk.
--
--
-- The result is a tuple of (StatusCode, [QName]). If the
-- StatusCode is not OK, the list of QName will be
-- empty; otherwise, it will contain at least one provider and maximum
-- n providers (where n is the number of providers
-- requested). If fewer providers than requested are available, the list
-- will contain less than n providers. But note that this, as long
-- as there is at least one provider, does not count as an error,
-- i.e. the StatusCode is still OK.
requestProvider :: ClientA () () -> Int -> JobName -> Int -> IO (StatusCode, [QName])
-- | This module provides a balancer for services and tasks and a topic
-- router. Balancers for services and tasks improve scalability and
-- reliability of servers and workers. Workers should always be used with
-- a balancer (since balancing workload is the main idea of workers);
-- servers can very well be used without a balancer, but won't scale with
-- increasing numbers of clients.
--
-- A balancer consists of a registry to which servers and workers
-- connect; servers and workers are maintained in lists according to the
-- job they provide. Clients and pushers send requests to the balancer,
-- which then forwards the request to a server or worker. The client will
-- receive the reply not through the balancer, but directly from the
-- server (to which the reply queue was forwarded as part of the request
-- message -- see ClientA for details).
--
-- With servers and workers sending heartbeats, a balancer also improves
-- reliability in contrast to a topology where a task is pushed to a
-- single worker or a request is sent to only one server.
--
-- A router is a forwarder of a topic. A router is very similar to a
-- publisher (PubA) with the difference that the router does not
-- create new topic data, but uses topic data received from a publisher
-- (a router, hence, is a subscriber and a publisher). Routers can be
-- used to balance the workload of publishers: Instead of one publisher
-- serving thousands of subscribers, the initial publisher would serve
-- thousands of routers, which, in their turn, serve thousands of
-- subscribers (or even other routers).
module Network.Mom.Stompl.Patterns.Balancer
-- | Create a Service and Task Balancer with the lifetime of the
-- application-defined action passed in and start it in a background
-- thread:
--
--
-- - Con: Connection to a Stomp broker;
-- - String: Name of the balancer, used for error handling;
-- - QName: Registration queue -- this queue is used by
-- providers to connect to the registry, it is not used for consumer
-- requests;
-- - (Int, Int): Heartbeat range of the Registry (see
-- withRegistry for details);
-- - QName: Request queue -- this queue is used for consumer
-- requests;
-- - OnError: Error handling;
-- - IO r: Action that defines the lifetime of the balancer; the result
-- r is also the result of withBalancer.
--
withBalancer :: Con -> String -> QName -> (Int, Int) -> QName -> OnError -> IO r -> IO r
-- | Create a router with the lifetime of the application-defined action
-- passed in and start it in a background thread:
--
--
-- - Con: Connection to a Stomp broker;
-- - String: Name of the router, used for error handling;
-- - JobName: Routed topic;
-- - QName: Registration queue of the source publisher;
-- - QName: Queue through which the internal subscriber will
-- receive the topic data from the source publisher;
-- - QName: Registration queue of the target publisher to which
-- subscribers will connect;
-- - Int: Registration timeout (timeout to register at the source
-- publisher);
-- - QName: Request queue -- this queue is used for consumer
-- requests;
-- - OnError: Error handling;
-- - IO r: Action that defines the lifetime of the router; the result
-- r is also the result of withRouter.
--
withRouter :: Con -> String -> JobName -> QName -> QName -> QName -> Int -> OnError -> IO r -> IO r