-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Stompl MOM Stomp Patterns -- @package stomp-patterns @version 0.1.0 -- | This module provides the basic patterns client/server, publish -- and subscribe and pusher/worker (a.k.a. pipeline) as well as a -- registry, a means to support patterns where one application -- uses or serves a set of other applications. -- -- Basic patterns can be distinguished by the data exchange defined by -- their protocol: -- -- -- -- We call the processing performed by an application on behalf of -- another application a job. There are three different job types: -- -- -- -- Applications providing a job are generically called providers, -- applications requesting a job are called consumers. Providers, hence, -- are servers, workers and publishers, and consumers are clients, -- pushers and subscribers. Note that this is somewhat different from the -- data-centric terminology "producer" and "consumer". It is not very -- useful to distinugish servers from clients or pushers from workers by -- referring to the distinction of producing or not producing data. It is -- in fact the pusher that produces data, not the worker. The pusher, -- however, is the one that requests something from the worker. The task, -- in this case, is the "good" that is provided by one side and consumed -- by the other. -- -- This distinction is relevant when we start to think about -- reliability. Reliability is a relation between a provider and a -- consumer: The consumer relies on the producer, not the other way -- round, e.g. a pusher relies on a woker and a client on a server -- to get the job done. -- -- The interfaces in this library give some guarantees related to -- reliability, but there are also some pitfalls: -- -- -- -- The library uses a set of headers that must not be used by -- applications. All internal header keys start and end with two -- underscores. By avoiding this naming of header keys, application code -- easily avoids naming conflicts. The headers used in basic patterns -- are: -- -- module Network.Mom.Stompl.Patterns.Basic -- | The client data type, which implements the client side of the -- client/server protocol. data ClientA i o -- | Access to the client name clName :: ClientA i o -> String -- | The function creates a client that lives within its scope. -- -- Parameters: -- -- withClient :: Con -> String -> JobName -> ReaderDesc i -> WriterDesc o -> (ClientA i o -> IO r) -> IO r -- | The client will send the request of type o and wait for the -- reply until the timeout exprires. The reply is of type i and is -- returned as Message i. If the timeout expires before the -- reply has been received, the function returns Nothing. -- -- Since servers do not know the clients they are serving, request -- sends the name of its reader queue (the reply queue) as message -- header to the server. -- -- Parameters: -- -- request :: ClientA i o -> Int -> Type -> [Header] -> o -> IO (Maybe (Message i)) -- | This function serves as a "delayed" receiver for the case that the -- timeout of a request has expired. When using this function, it is -- assumed that a request has been made, but no response has been -- received. It can be used in time-critical applications, where the -- client may use the time between request and reply productively, -- instead of passively blocking on the reply queue. -- -- Use this function with care! It can be easily abused to break the -- client/server pattern, when it is called without a request having been -- made before. If, in this case, timout is -1, the -- application will block forever. -- -- The function receives those parameters from request that are -- related to receiving the reply, i.e. Type, -- [Header] and o are not passed to checkRequest. checkRequest :: ClientA i o -> Int -> IO (Maybe (Message i)) -- | The server data type, which implements the server side of the -- client/server protocol. data ServerA i o -- | Access to the server name srvName :: ServerA i o -> String -- | The function creates a server that lives within the scope of the -- application-defined action passed into it. -- -- Parameters: -- -- withServer :: Con -> String -> ReaderDesc i -> WriterDesc o -> (ServerA i o -> IO r) -> IO r -- | Waits for a client request, calls the application-defined transformer -- to generate a reply and sends this reply through the reply queue whose -- name is indicated by a header in the request. The time a server waits -- for a request may be restricted by the timeout. Typically, you would -- call reply with timeout set to -1 (wait eternally). -- There may be situations, however, where it actually makes sense to -- restrict the waiting time, i.e. to perform some housekeeping in -- between. -- -- Typically, you call reply in a loop like -- --
--   forever $ reply srv (-1) nullType [] f
--   
-- -- where f is a function of type -- --
--   Message i -> IO o.
--   
-- -- Parameters: -- -- reply :: ServerA i o -> Int -> Type -> [Header] -> (Message i -> IO o) -> IO () -- | Connect to a registry: The caller registers itself at the registry. -- The owner of the registry will then use the caller depending on its -- purpose. -- -- -- -- The function returns a tuple of StatusCode and the heartbeat -- proposed by the registry (which may differ from the preferred -- heartbeat of the caller). Whenever the StatusCode is not -- OK, the heartbeat is 0. If the JobName is null, the -- StatusCode will be BadRequest. If the timeout expires, -- register throws TimeoutX. register :: Con -> JobName -> JobType -> QName -> QName -> Int -> Int -> IO (StatusCode, Int) -- | Disconnect from a registry: The caller disconnects from a registry to -- which it has registered before. For the case that the registry is not -- receiving heartbeats from the caller, it is essential to unregister, -- when the service is no longer provided. Otherwise, the registry has no -- way to know that it should not send requests to this provider anymore. -- -- -- -- The function returns a StatusCode. If JobName is null, -- the StatusCode will be BadRequest. If the timeout -- expires, the function will throw TimeoutX. unRegister :: Con -> JobName -> QName -> QName -> Int -> IO StatusCode -- | Send heartbeats: -- -- heartbeat :: MVar HB -> Writer () -> JobName -> QName -> IO () -- | Heartbeat controller type data HB -- | Create a heartbeat controller; receives the heartbeat in milliseconds. mkHB :: Int -> IO HB -- | Create a server that works in a background thread: The background -- thread (and with it the server) is running until the action passed in -- to the function (IO r) terminates; when it terminates, the background -- thread is terminated as well. withServerThread may connect to a -- registry (to serve as a provider of a balancer for instance), which is -- automatically handled internally when a RegistryDesc is passed in with -- a QName that is not null. -- -- withServerThread :: Con -> String -> JobName -> Type -> [Header] -> (Message i -> IO o) -> ReaderDesc i -> WriterDesc o -> RegistryDesc -> OnError -> IO r -> IO r -- | A helper that shall ease the use of the registers. A registry to which -- a call wants to connect is described as -- -- type RegistryDesc = (QName, Int, (Int, Int, Int)) -- | The Pusher data type, which implements the consumer side of the -- pipeline protocol. Note that, when we say "consumer" here, the pusher -- is actually a data producer, but consumes the effect of having a task -- done. The pusher can be seen as a client that does not expect a reply. data PusherA o -- | Access to the pusher's name pushName :: PusherA o -> String -- | Create a Pusher with the lifetime of the action passed in: -- -- withPusher :: Con -> String -> JobName -> WriterDesc o -> (PusherA o -> IO r) -> IO r -- | Push a Job: -- -- push :: PusherA o -> Type -> [Header] -> o -> IO () -- | On the other side of the pipeline, there sits a worker waiting for -- requests. Note that no Worker data type is defined. Instead, -- there is only a withTaskThread function that, internally, -- creates a worker acting in a background thread. The rationale is that -- it does not make too much sense to have a pipeline with only one -- worker. It is in fact part of the idea of the pipeline pattern that -- several workers are used through a balancer. withTaskThread -- implements the interaction with the registry internally and frees the -- programmer from concerns related to registration. If you really need a -- single worker, you can call the function with an empty RegistryDesc, -- i.e. with an empty queue name. -- -- withTaskThread :: Con -> String -> JobName -> (Message i -> IO ()) -> ReaderDesc i -> RegistryDesc -> OnError -> IO r -> IO r -- | Registry: An opaque data type data Registry -- | A registry is used through a function that, internally, creates a -- registry and defines its lifetime in terms of the scope of an action -- passed in to the function: -- -- withRegistry :: Con -> String -> QName -> (Int, Int) -> OnError -> (Registry -> IO r) -> IO r -- | Map action to Providers of job JobName; mapping means -- different things for: -- -- -- -- Parameters: -- -- -- -- The function returns False iff the requested job is not available and -- True otherwise. (Note that a job without providers is removed; when -- the function returns True, the job, thus, was applied at least once. mapR :: Registry -> JobName -> (Provider -> IO ()) -> IO Bool -- | Retrieves n Providers of a certain job; getProvider -- works, for all JobTypes according to the work balancer logic, -- i.e.: it returns the first n providers of the list for this job -- and moves them to the end of the list. getProvider is used, for -- instance, in the Desk pattern. -- -- getProvider :: Registry -> JobName -> Int -> IO [Provider] -- | This function shows all jobs with all their providers in a registry; -- the function is intended for debugging only. showRegistry :: Registry -> IO () -- | A provider is an opaque data type; most of its attributes are used -- only internally by the registry. Interesting for user applications, -- however, is the queue that identifies the provider. data Provider -- | Queue through which the job is provided prvQ :: Provider -> QName -- | JobType: Service, Task or Topic data JobType Service :: JobType Task :: JobType Topic :: JobType -- | The publisher data type data PubA o -- | Access to the name of the publisher pubName :: PubA o -> String -- | Create a publisher with the lifetime of the scope of the user action -- passed in. The publisher, internally, creates a registry to which -- subscribers will connect to obtain the topic data. The registry will -- not expect heartbeats from subscribers, since the dependability -- relation is the other way round: the publisher does not depend on -- subscribers, but subscribers depend on a publisher. The publisher, -- usually, does not send heartbeats either. For exceptions to this rule, -- see withPubProxy. -- -- withPub :: Con -> String -> JobName -> QName -> OnError -> WriterDesc o -> (PubA o -> IO r) -> IO r -- | Publish data of type o: -- -- publish :: PubA o -> Type -> [Header] -> o -> IO () -- | Create a publisher that works in a background thread publishing -- periodically at a monotonic rate, i.e. it creates data and -- publishes them, computes the difference of the publication rate minus -- the time needed to create and publish the data and will then suspend -- the thread for this period. For a publication rate of p -- microseconds, the thread will be delayed for p - x -- microseconds, if x corresponds to the time that was spent on -- creating and publishing the data. -- -- The precision depends of course on your system and its current -- workload. For most cases, this will be equal to just suspending the -- thread for the publication rate. -- -- Parameters: -- -- withPubThread :: Con -> String -> JobName -> QName -> Type -> [Header] -> IO o -> WriterDesc o -> Int -> OnError -> IO r -> IO r -- | Subscriber data type data SubA i -- | Access to the subscriber name subName :: SubA i -> String -- | Create a subscriber with the lifetime of the user action passed in. -- The subscriber will internally connect to a publisher's registry and -- receive data as long as it stays connected. -- -- withSub :: Con -> String -> JobName -> QName -> Int -> ReaderDesc i -> (SubA i -> IO r) -> IO r -- | Check if data have been arrived for this subscriber; if data are -- available before the timeout expires, the function results in -- Just (Message i); if the timeout expires first, the -- result is Nothing. -- -- checkIssue :: SubA i -> Int -> IO (Maybe (Message i)) -- | Create a subscriber that works in a background thread; Whenever data -- are available, an application callback passed in to the function is -- called with the message that has arrived. -- -- withSubThread :: Con -> String -> JobName -> QName -> Int -> ReaderDesc i -> (Message i -> IO ()) -> OnError -> IO r -> IO r -- | Create a subscriber that works in a background thread and updates an -- MVar, whenever new data are available; the function is in fact a -- special case of withSubThread, where the application callback -- updates an MVar. Note that the MVar must not be empty when the -- function is called, otherwise, it will block on modifying the MVar. -- -- withSubMVar :: Con -> String -> JobName -> QName -> Int -> ReaderDesc i -> MVar i -> OnError -> IO r -> IO r -- | Unlike servers and workers, publishers have no interface to connect -- internally to a registry. The rationale for this is that publishers do -- not need load balancers or similar means that would require -- registration. As a consequence, there is no means to send heartbeats -- internally. Sometimes, however, the need to connect to a registry may -- arise. The Desk pattern is an example where it makes sense to register -- a publisher. But then, there is no means to internally send heartbeats -- proving that the publisher is still alive. For this case, a simple -- solution for periodic publishers is available: a heartbeat proxy that -- is implemented as a subscriber receiving data from the publisher and -- sending a heartbeat on every dataset that arrives. -- -- This function provides a proxy that internally connects to a registry -- on behalf of a publisher and sends heartbeats. -- -- withPubProxy :: Con -> String -> JobName -> QName -> ReaderDesc i -> RegistryDesc -> OnError -> IO r -> IO r -- | Patterns Exception data PatternsException -- | Timout expired TimeoutX :: String -> PatternsException -- | Invalid status code BadStatusCodeX :: String -> PatternsException -- | Status code other than OK NotOKX :: StatusCode -> String -> PatternsException -- | Error on Header identified by the first String HeaderX :: String -> String -> PatternsException -- | Thrown on missing heartbeat (after tolerance of 10 missing heartbeats) MissingHbX :: String -> PatternsException -- | Heartbeat proposed by registry out of acceptable range UnacceptableHbX :: Int -> PatternsException -- | No provider for the requested job available NoProviderX :: String -> PatternsException -- | Application-defined exception AppX :: String -> PatternsException -- | Error Handler: -- -- type OnError = SomeException -> String -> IO () -- | Status code to communicate the state of a request between two -- applications. The wire format is inspired by HTTP status codes: -- -- data StatusCode OK :: StatusCode BadRequest :: StatusCode Forbidden :: StatusCode NotFound :: StatusCode Timeout :: StatusCode -- | Safe StatusCode parser (StatusCode is instance of Read, -- but read would cause an error on an invalid StatusCode) readStatusCode :: String -> Either String StatusCode -- | Name of a service, task or topic type JobName = String -- | Name of a Stomp queue type QName = String -- | OutBound converter for messages of type () nobody :: OutBound () -- | InBound converter for messages of type () ignorebody :: InBound () -- | OutBound converter for messages of type ByteString bytesOut :: OutBound ByteString -- | InBound converter for messages of type ByteString bytesIn :: InBound ByteString -- | Get Job name from headers (and throw an exception if the header does -- not exist) getJobName :: Message m -> IO String -- | Get Job type from headers (and throw an exception if the header does -- not exist or contains an invalid value) getJobType :: Message m -> IO JobType -- | Get Queue name from headers (and throw an exception if the header does -- not exist) getQueue :: Message m -> IO String -- | Get Reply queue (channel) from headers (and throw an exception if the -- header does not exist) getChannel :: Message m -> IO String -- | Get Heartbeat specification from headers (and throw an exception if -- the header does not exist or if its value is not numeric) getHB :: Message m -> IO Int -- | Get Status code from headers (and throw an exception if the header -- does not exist) getSC :: Message m -> IO (Either String StatusCode) -- | Get Generic function to retrieve a header value (and throw an -- exception if the header does not exist): -- -- getHeader :: String -> String -> Message m -> IO String -- | Bridges link providers connected to one broker to consumers connected -- to another broker. -- -- For publishers and workers, this is quite trivial: the bridge -- implements the corresponding consumer on one broker and the -- corresponding provider on the other. -- -- For servers, the task is somewhat more complicated: since servers use -- the client's reply queue to send the result back to the client and -- this queue only exists on the broker to which the client is connected, -- the bridge has to remember the client's reply queue and use its own -- queue on the server-side broker to finally route the reply back to the -- original client. With many broker connected by a service bridge, this -- can result in long chains of clients and servers sending requests and -- waiting for replies. module Network.Mom.Stompl.Patterns.Bridge -- | Create a forwarder with the lifetime of the application-defined action -- passed in and start it in a background thread: -- -- -- -- Note the remarkable similarity to the router pattern -- (withRouter). In fact, a router is but a forwarder where -- source and target broker are the same. withForwarder :: Con -> Con -> String -> JobName -> QName -> QName -> QName -> Int -> OnError -> IO r -> IO r -- | Create a TaskBridge with the lifetime of the action passed in and -- start it on a background thread: -- -- withTaskBridge :: Con -> Con -> String -> JobName -> QName -> QName -> RegistryDesc -> OnError -> IO r -> IO r -- | Create a ServiceBridge with the lifetime of the action passed in and -- start it on a background thread: -- -- withServiceBridge :: Con -> Con -> String -> JobName -> QName -> QName -> QName -> RegistryDesc -> OnError -> IO r -> IO r -- | A Desk is a server that supplies information about providers. A client -- requests providers for a specific job (service, task or topic) and the -- desk will reply with a list of queue names of providers of the -- enquired job. -- -- The desk is not statically configured, but uses a registry to which -- providers connect. Providers that cease to work can disconnect or, if -- heartbeats are required, will be removed from the list of available -- providers internally when no more heartbeats are sent. This way, the -- information provided by a desk is always up-to-date. -- -- Desk balances providers, i.e. providers rotate in a list from -- which always the first n providers are handed out to requesting -- consumers (where n corresponds to the number of providers -- requested by the consumer.) -- -- Since providers are managed dynamically, the result of two consecutive -- calls is probably not the same. Desk is thus not idempotent in the -- strict sense. But, since the call itself does only cause a change of -- the order of providers (and since it should be irrelevant for the -- consumer which provider is actually used), two consecutive calls will -- have the same effect -- if not all providers disconnect between the -- two calls. -- -- Internally, the Desk protocol uses the following headers: -- -- module Network.Mom.Stompl.Patterns.Desk -- | Creates a desk with the lifetime of the application-defined action: -- -- withDesk :: Con -> String -> QName -> (Int, Int) -> OnError -> QName -> IO r -> IO r -- | Function used by consumer to request provider information from a desk: -- -- -- -- The result is a tuple of (StatusCode, [QName]). If the -- StatusCode is not OK, the list of QName will be -- empty; otherwise, it will contain at least one provider and maximum -- n providers (where n is the number of providers -- requested). If fewer providers than requested are available, the list -- will contain less than n providers. But note that this, as long -- as there is at least one provider, does not count as an error, -- i.e. the StatusCode is still OK. requestProvider :: ClientA () () -> Int -> JobName -> Int -> IO (StatusCode, [QName]) -- | This module provides a balancer for services and tasks and a topic -- router. Balancers for services and tasks improve scalability and -- reliability of servers and workers. Workers should always be used with -- a balancer (since balancing workload is the main idea of workers); -- servers can very well be used without a balancer, but won't scale with -- increasing numbers of clients. -- -- A balancer consists of a registry to which servers and workers -- connect; servers and workers are maintained in lists according to the -- job they provide. Clients and pushers send requests to the balancer, -- which then forwards the request to a server or worker. The client will -- receive the reply not through the balancer, but directly from the -- server (to which the reply queue was forwarded as part of the request -- message -- see ClientA for details). -- -- With servers and workers sending heartbeats, a balancer also improves -- reliability in contrast to a topology where a task is pushed to a -- single worker or a request is sent to only one server. -- -- A router is a forwarder of a topic. A router is very similar to a -- publisher (PubA) with the difference that the router does not -- create new topic data, but uses topic data received from a publisher -- (a router, hence, is a subscriber and a publisher). Routers can be -- used to balance the workload of publishers: Instead of one publisher -- serving thousands of subscribers, the initial publisher would serve -- thousands of routers, which, in their turn, serve thousands of -- subscribers (or even other routers). module Network.Mom.Stompl.Patterns.Balancer -- | Create a Service and Task Balancer with the lifetime of the -- application-defined action passed in and start it in a background -- thread: -- -- withBalancer :: Con -> String -> QName -> (Int, Int) -> QName -> OnError -> IO r -> IO r -- | Create a router with the lifetime of the application-defined action -- passed in and start it in a background thread: -- -- withRouter :: Con -> String -> JobName -> QName -> QName -> QName -> Int -> OnError -> IO r -> IO r