patterns-0.0.2: Common patterns in message-oriented applications

Portabilityportable
Stabilityexperimental
Safe HaskellSafe-Infered

Network.Mom.Patterns.Basic

Contents

Description

Basic communication patterns

Synopsis

Server/Client

withServer :: Context -> String -> Parameter -> Int -> AccessPoint -> LinkType -> InBound c -> OutBound o -> OnError -> (String -> Iteratee c IO i) -> Fetch i o -> (Service -> IO a) -> IO aSource

Starts one or more server threads and executes an action that receives a Service to control the server. The Service is a thread local resource. It must not be passed to threads forked from the thread that has started the service. The Service is valid only in the scope of the action. When the action terminates, the server is automatically stopped. During the action, the server can be paused and restarted. Also, the SocketOption of the underlying ZMQ Socket can be changed. Please refer to pause, resume and changeOption for more details.

The application may implement control parameters. Control parameters are mere strings that are passed to the application call-backs. It is up to the application to enquire these strings and to implement different behaviour for the possible settings. Control parameter can be changed during run-time by means of changeParam.

Parameters:

  • Context: The ZMQ context;
  • String: The name of the server, useful for debugging;
  • Parameter: The initial value of the control parameter passed to all application call-backs;
  • Int: The number of worker threads; note that a server with only one thread handles client requests sequentially. The number of threads (together with the number of hardware processing resources) defines how many client requests can be processed in parallel.
  • AccessPoint: The access point, through which this server can be reached;
  • LinkType: The link type; standalone servers usually bind their access point, whereas clients connect to it. Instead, a server may also connect to a load-balancing device, to which other servers and clients connect (see withDevice and withQueue).
  • InBound: The converter to convert the incoming data stream (of type ByteString) into a client request component. Note that the converter converts single message segments to components of type c. The Iteratee, receiving this c-typed elements shall combine them to a complete request of type i, which is then processed by an Enumerator to create the server response.
  • OutBound: The converter to convert the results of type o to a ByteString, which then is sent back to the client.
  • OnError: The error handler
  • String -> Iteratee: The Iteratee that processes request components of type c and yields a request of type i. The String argument is the control parameter, whose logic is implemented by the application.
  • Fetch: The Enumerator that processes the request of type i to produce results of type o.
  • Service -> IO (): The action to invoke, when the server has been started; the service is used to control the server.

The following code fragment shows a simple server to process data base queries using standard converters and error handlers not further defined here:

   withContext 1 $ \ctx -> do
      c <- connectODBC "DSN=xyz"  -- some ODBC connection
      s <- prepare c "select ..." -- some database query 
      withServer ctx 
          "MyQuery" -- name of the server is "MyQuery"
          noparam     -- no parameter
          5           -- five worker threads
          (Address "tcp://*:5555" []) Bind -- bind to this address
          iconv oconv -- some standard converters
          onErr       -- some standard error handler
          (\_  -> one []) -- Iteratee for single segment messages;
                           -- refer to Enumerator for details
          (dbFetcher s) $ \srv -> -- the Enumerator;
            untilInterrupt $ do -- install a signal handler for SIGINT
                                -- and repeat the following action
                                -- until SIGINT is received;
              putStrLn $ "server " ++ srvName srv ++ 
                         " up and running..."
              threadDelay 1000000

The untilInterrupt loop may be implemented as follows:


untilInterrupt :: IO () -> IO ()
     untilInterrupt run = do
       continue <- newMVar True
       _ <- installHandler sigINT (Catch $ handler continue) Nothing
       go continue 
      where handler m = modifyMVar_ m (\_ -> return False)
            go      m = do run
                           continue <- readMVar m
                           when continue $ go m

Finally, a simple dbFetcher:

     dbFetcher :: SQL.Statement -> Fetch [SQL.SqlValue] String
     dbFetcher s _ _ _ stp = tryIO (SQL.execute s []) >>= \_ -> go stp
       where go step = 
               case step of
                 E.Continue k -> do
                   mbR <- tryIO $ SQL.fetchRow s
                   case mbR of
                     Nothing -> E.continue k
                                        -- convRow is not defined here
                     Just r  -> go $$ k (E.Chunks [convRow r]) 
                 _ -> E.returnI step

data Client i o Source

Client data type

clientContext :: Client i o -> ContextSource

Obtaining the Context from Client

setClientOptions :: Client i o -> [SocketOption] -> IO ()Source

Setting SocketOption to the underlying ZMQ Socket

withClient :: Context -> AccessPoint -> OutBound o -> InBound i -> (Client i o -> IO a) -> IO aSource

Creates a Client; a client is not a background process like a server, but a data type that provides functions to interoperate with a server. withClient creates a client and invokes the application-defined action, which receives a Client argument. The lifetime of the Client is limited to the invoked action. When the action terminates, the Client dies.

Parameters:

  • Context: The ZMQ Context;
  • AccessPoint: The access point, to which the client connects;
  • OutBound: Converter to convert a request from type o to the wire format ByteString. Note that, as for servers, the request may be composed of components that together form the request. The type o corresponds to one of these request components, not necessarily to the request type as a whole, which is determined when issuing a request.
  • InBound: Converter to convert a reply (ByteString) into type i. Note again that the reply may consist of many message segments. The type i relates to one reply component, not necessarily to the reply type as a whole, which is determined when issuing a request.
  • Client -> IO a: The action to perform with this client.

request :: Client i o -> Enumerator o IO () -> Iteratee i IO a -> IO (Either SomeException a)Source

Synchronously requesting a service; the function blocks the current thread, until a reply is received.

Parameters:

  • Client: The client that performs the request
  • Enumerator: Enumerator to create the request message stream
  • Iteratee: Iteratee to process the reply message stream

A simple client that just writes the results to stdout:

     rcv :: String -> IO ()
     rcv req = withContext 1 $ \ctx -> 
       withClient ctx 
         (Address "tcp://localhost:5555" []) -- connect to this address
         (return . B.pack) (return . B.unpack) $ -- string converters
         \s -> do
           -- request with enum and outit
           ei <- request s (enum req) outit      
           case ei of
             Left e  -> putStrLn $ "Error: " ++ show (e::SomeException)
             Right _ -> return ()
     -- Enumerator that returns just one string
     enum :: String -> E.Enumerator String IO ()
     enum = once (return . Just)
     -- Iteratee that just writes to stdout
     outit :: E.Iteratee String IO ()
     outit = do
       mbi <- EL.head
       case mbi of
         Nothing -> return ()
         Just i  -> liftIO (putStrLn i) >> outit

Note that this code just issues one request, which is not the most typical use case. It is more likely that the action will loop for ever and receive requests, for instance, from a user interface.

askFor :: Client i o -> Enumerator o IO () -> IO ()Source

Asynchronously requesting a service; the function sends a request to the server without waiting for a result.

Parameters:

  • Client: The client that performs the request
  • Enumerator: Enumerator to create the request message stream

checkFor :: Client i o -> Iteratee i IO a -> IO (Maybe (Either SomeException a))Source

Polling for a reply; the function polls for a server request. If nothing has been received, it returns Nothing; otherwise it returns Just the result or an error.

Parameters:

  • Client: The client that performs the request
  • Iteratee: Iteratee to process the reply message stream

The synchronous request (see request) could be implemented asynchronously like:

     rcv :: String -> IO ()
     rcv req = withContext 1 $ \ctx -> do
       let ap = address l "tcp://localhost:5555" []
       withClient ctx ap 
         (return . B.pack) (return . B.unpack) 
         $ \s -> do
           ei <- try $ askFor s (enum req)
           case ei of
             Left  e -> putStrLn $ "Error: " ++ show (e::SomeException)
             Right _ -> wait s
       -- check for results periodically 
       where wait s = checkFor s outit >>= \mbei ->
               case mbei of
                 Nothing        -> do putStrLn "Waiting..."
                                      threadDelay 10000 >> wait s
                 Just (Left e)  -> putStrLn $ "Error: " ++ show e
                 Just (Right _) -> putStrLn "Ready!"

Publish/Subscribe

data Pub o Source

Publisher

pubContext :: Pub o -> ContextSource

Obtaining the Context from Pub

setPubOptions :: Pub o -> [SocketOption] -> IO ()Source

Setting SocketOption to the underlying ZMQ Socket

withPub :: Context -> AccessPoint -> OutBound o -> (Pub o -> IO a) -> IO aSource

Creates a publisher; A publisher is a data type that provides an interface to publish data to subscribers. withPub creates a publisher and invokes an application-defined action, which receives a Pub argument. The lifetime of the publisher is limited to the action. When the action terminates, the publisher dies.

Parameter:

  • Context: The ZMQ Context
  • AccessPoint: The access point the publisher will bind
  • OutBound: A converter to convert from type o to the wire format ByteString. Note that a publisher may create a data stream; the type o is then the type of one segment of this stream, not of the stream as a whole.
  • Pub -> IO (): The action to invoke

issue :: Pub o -> Enumerator o IO () -> IO ()Source

Publishes the data stream created by an enumerator;

Parameters:

  • Pub: The publisher
  • Enumerator: The enumerator to create an outgoing data stream.

A simple weather report publisher:

     withContext 1 $ \ctx -> withPub ctx
         (Address "tcp://*:5555" [])
         (return . B.pack) $ \pub -> untilInterrupt $ do
           issue pub (once weather noparam)
           threadDelay 10000 -- update every 10ms
     -- fake weather report with some random values
     weather :: String -> IO (Maybe String)
     weather _ = do
         zipcode     <- randomRIO (10000, 99999) :: IO Int
         temperature <- randomRIO (-10, 30) :: IO Int
         humidity    <- randomRIO ( 10, 60) :: IO Int
         return $ Just (unwords [show zipcode, 
                                 show temperature, 
                                 show humidity])

withPeriodicPub :: Context -> String -> Parameter -> Timeout -> AccessPoint -> OutBound o -> OnError_ -> Fetch_ o -> (Service -> IO a) -> IO aSource

Creates a background process that periodically publishes data;

Parameters:

  • Context: The ZMQ Context
  • String: Name of this Publisher; useful for debugging
  • Parameter: The initial value of the control parameter
  • Timeout: The period of the publisher in microseconds; the process will issue the publisher data every n microseconds.
  • AccessPoint: Bind address
  • OutBound: A converter that converts one segment of the data stream from type o to the wire format ByteString
  • OnError_: Error Handler
  • String -> Fetch: Enumerator to create the outgoing data stream; the string argument is the parameter.
  • Service -> IO (): The user action to perform

The weather report publisher introduced above (see withPub) can be implemented by means of withPeriodicPub as:

     withPeriodicPub ctx "Weather Report" noparam 
       100000 -- publish every 100ms
       (Address "tcp://*:5555" []) 
       (return . B.pack) -- string converter
       onErr_            -- standard error handler
       (\_ -> fetch1 fetch) -- creates one instance
                             -- of the return of "fetch";
                             -- see Enumerator for details
       $ \pub -> 
         untilInterrupt $ do -- until SIGINT, see withServer for details
           threadDelay 100000
           putStrLn $ "I am doing nothing " ++ srvName pub

withSub :: Context -> String -> Parameter -> [Topic] -> AccessPoint -> InBound i -> OnError_ -> Dump i -> (Service -> IO a) -> IO aSource

A subscription is a background service that receives and processes data streams from a publisher. A typical use case is an application that operates on periodically updated data; the subscriber would receive these data and and make them accessible to other threads in the process through an MVar.

Parameters:

  • Context: The ZMQ Context
  • String: The subscriber's name
  • Parameter: The initial value of the control parameter
  • [Topic]: The topics to subscribe to; in the example above (withPub), the publisher publishes the weather report per zip code; the zip code, in this example, could be a meaningful topic for a subscriber. It is good practice to send the topic in an initial message segment, the envelope, to avoid that the subscriber matches on some arbitrary part of the message.
  • InBound: A converter that converts one segment of the incoming data stream to type o
  • OnError_: Error handler
  • Dump: Iteratee to process the incoming data stream.
  • Service -> IO (): Application-defined action to control the service. Note that Service is a thread-local resource and must not be passed to threads forked from the action.

Weather Report Subscriber:

     withContext 1 $ \ctx -> 
       withSub ctx "Weather Report" noparam 
               ["10001"] -- zipcode to subscribe to
               (Address "tcp://localhost:5555" []) 
               (return . B.unpack) 
               onErr_ output -- Iteratee that just writes to stdout
               $ \s -> untilInterrupt $ do
                 putStrLn $ "Doing nothing " ++ srvName s
                 threadDelay 1000000

data Sub i Source

An alternative to the background subscriber (see withSub);

subContext :: Sub i -> ContextSource

Obtaining the Context from Sub

setSubOptions :: Sub i -> [SocketOption] -> IO ()Source

Setting SocketOption to the underlying ZMQ Socket

withSporadicSub :: Context -> AccessPoint -> InBound i -> [Topic] -> (Sub i -> IO a) -> IO aSource

Similar to Pub, a Sub is a data type that provides an interface to subscribe data. withSporadicSub creates a subscriber and invokes an application-defined action, which receives a Sub argument. The lifetime of the subscriber is limited to the action. When the action terminates, the subscriber dies.

checkSub :: Sub i -> Iteratee i IO a -> IO (Maybe (Either SomeException a))Source

Polling for data; If nothing has been received, the function returns Nothing; otherwise it returns Just the result or an error.

Parameters:

  • Sub: The subscriber
  • Iteratee: Iteratee to process the data

waitSub :: Sub i -> Iteratee i IO a -> IO (Either SomeException a)Source

Waiting for data; the function blocks the current thread, until data are being received from the publisher. It returns either SomeException or the result.

Parameters:

  • Sub: The subscriber
  • Iteratee: Iteratee to process the data stream

unsubscribe :: Sub i -> Topic -> IO ()Source

Unsubscribe a topic

resubscribe :: Sub i -> Topic -> IO ()Source

Subscribe another topic

Pipeline

data Pipe o Source

A pipeline consists of a "pusher" and a set of workers ("pullers"). The pusher sends jobs down the pipeline that will be assigned to one of the workers. The pipeline pattern is, thus, a work-balancing scheme.

pipeContext :: Pipe o -> ContextSource

Obtaining the Context from Pipe

setPipeOptions :: Pipe o -> [SocketOption] -> IO ()Source

Setting SocketOption to the underlying ZMQ Socket

withPipe :: Context -> AccessPoint -> OutBound o -> (Pipe o -> IO a) -> IO aSource

Creates a pipeline; a Pipe is a data type that provides an interface to push a data stream to workers connected to the other side of the pipe. withPipe creates a pipeline and invokes an application-defined action which receives a Pipe argument. The lifetime of the Pipe is limited to the action. When the action terminates, the Pipe dies.

Parameters:

push :: Pipe o -> Enumerator o IO () -> IO ()Source

Sends a job down the pipeline;

Parameters:

  • Pipe: The pipeline
  • Enumerator: enumerator to create the data stream that constitutes the job

A simple pusher:

    sendF :: FilePath -> IO ()
    sendF f = withContext 1 $ \ctx -> do
     let ap = Address "tcp://*:5555" []
     withPipe ctx ap return $ \p ->
       push pu (EB.enumFile f) -- file enumerator
                               -- see Data.Enumerator.Binary (EB)

withPuller :: Context -> String -> Parameter -> AccessPoint -> InBound i -> OnError_ -> Dump i -> (Service -> IO a) -> IO aSource

A puller is a background service that receives and processes data streams from a pipeline.

Parameters:

A worker that just writes the incoming stream to stdout:

     withContext 1 $ \ctx -> 
       withPuller ctx "Worker" noparam 
             (Address "tcp://localhost:5555" [])
             (return . B.unpack)
             onErr_ output
             $ \s -> untilInterrupt $ do
               putStrLn "Doing nothing " ++ srvName s
               threadDelay 100000

Exclusive Pair

data Peer a Source

An Exclusive Pair is a general purpose pattern of two equal peers that communicate with each other by sending (send) and receiving (receive) data. One of the peers has to bind the AccessPoint the other connects to it.

peerContext :: Peer a -> ContextSource

Obtains the Context from a Peer

withPeer :: Context -> AccessPoint -> LinkType -> InBound a -> OutBound a -> (Peer a -> IO b) -> IO bSource

Creates a Peer; a peer is a data type that provides an interface to exchange data with another peer. withPeer creates the peer and invokes an application-defined action that receives a Peer argument. The lifetime of the Peer is limited to the action. When the action terminates, the Peer dies.

Parameters:

  • Context: The ZMQ Context
  • AccessPoint: The address, to which this peer either binds or connects
  • LinkType: One of the peers has to bind the address, the other has to connect.
  • InBound: A converter to convert message segments from the wire format ByteString to type i
  • OutBound: A converter to convert message segments of type o to the wire format ByteString
  • Peer -> IO (): The action to invoke

send :: Peer o -> Enumerator o IO () -> IO ()Source

Sends a data stream to another peer;

Parameters:

  • Peer: The peer
  • Enumerator: Enumerator to create the outoing data stream

receive :: Peer i -> Iteratee i IO a -> IO (Either SomeException a)Source

Receives a data stream from another peer;

Parameters:

  • Peer: The peer
  • Iteratee: Iteratee to process the incoming data stream

Service Access Point

data AccessPoint Source

Describes how to access a service; an AccessPoint usually consists of an address and a list of SocketOption. Addresses are passed in as strings of the form:

  • "tcp://*:5555": for binding the port 5555 via TCP/IP on all network interfaces; an IPv4 address or the operating system interface name could be given instead.
  • "tcp://localhost:5555": for connecting to the port 5555 on localhost via TCP/IP; the endpoint may given as DNS name or as an IPv4 address.
  • "ipc://tmp/queues/0": for binding and connecting to a local inter-process communication endpoint, in this case created under /tmp/queues/0; only available on UNIX.
  • "inproc://worker": for binding and connecting to the process internal address worker

For more options, please refer to the zeromq documentation.

Constructors

Address 

Fields

acAdd :: String

Address string

acOs :: [SocketOption]

Socket options

Instances

data LinkType Source

How to link to an AccessPoint

Constructors

Bind

Bind the address

Connect

Connect to the address

parseLink :: String -> Maybe LinkTypeSource

Safely read LinkType; ignores the case of the input string and, besides "bind" and "connect", also accepts "bin", "con" and "conn"; intended for use with command line parameters

Converters

type InBound a = ByteString -> IO aSource

Converters are user-defined functions that convert a ByteString to a value of type a (InBound) or a value of type a to ByteString (OutBound). Converters are, hence, similar to put and get in the Binary monad. The reason for using explicit, user-defined converters instead of Binary encode and decode is that the conversion may be more complex, involving reading configurations or other IO actions.

The simplest possible in-bound converter for plain strings is:

 let iconv = return . toString

type OutBound a = a -> IO ByteStringSource

A simple string OutBound converter may be:

 let oconv = return . fromString

Errors and Error Handlers

data Criticality Source

Indicates criticality of the error event

Constructors

Error

The current operation (e.g. processing a request) has not terminated properly, but the service is able to continue; the error may have been caused by a faulty request or other temporal conditions. Note that if an application-defined Iteratee or Enumerator results in SomeException (by means of throwError), the incident is classified as Error; if it throws an IO Error, however, the incident is classified as Fatal.

Critical

One worker thread is lost (Server only)

Fatal

The service cannot recover and will terminate

type OnError = Criticality -> SomeException -> String -> Parameter -> IO (Maybe ByteString)Source

Error handler for servers; receives the Criticality of the error event, the exception, the server name and the service control parameter. If the error handler returns Just a ByteString this ByteString is sent to the client as error message.

A good policy for implementing servers is to terminate or restart the Server when a Fatal or Critical error occurs and to send an error message to the client on a plain Error. The error handler, additionally, may log the incident or inform an administrator.

type OnError_ = Criticality -> SomeException -> String -> Parameter -> IO ()Source

Error handler for all services but servers; receives the Criticality of the error event, the exception, the service name and the service control parameter.

A good policy is to terminate or restart the service when a Fatal error occurs and to continue, if possible, on a plain Error. The error handler, additionally, may log the incident or inform an administrator.

chainIO :: IO a -> (a -> Iteratee b IO c) -> Iteratee b IO cSource

Chains IO Actions in an Enumerator together; throws SomeException using throwError when an error occurs

chainIOe :: IO a -> (a -> Iteratee b IO c) -> Iteratee b IO cSource

Chains IO Actions in an Enumerator together; returns Error when an error occurs

tryIO :: IO a -> Iteratee i IO aSource

Executes an IO Actions in an Iteratee; throws SomeException using throwError when an error occurs

tryIOe :: IO a -> Iteratee i IO aSource

Executes an IO Actions in an Iteratee; returns Error when an error occurs

Generic Serivce

data Service Source

Generic Service data type; Service is passed to application-defined actions used with background services, namely withServer, withPeriodicPub, withSub, withPuller and withDevice.

srvName :: Service -> StringSource

Obtains the service name

pause :: Service -> IO ()Source

Pauses the Service

resume :: Service -> IO ()Source

Resumes the Service

changeParam :: Service -> Parameter -> IO ()Source

Changes the Service control parameter

changeOption :: Service -> SocketOption -> IO ()Source

Changes SocketOption

ZMQ Context

data Context

A 0MQ context representation.

withContext :: Size -> (Context -> IO a) -> IO a

Run an action with a 0MQ context. The Context supplied to your action will not be valid after the action either returns or throws an exception.

data SocketOption

The option to set on 0MQ sockets (cf. zmq_setsockopt and zmq_getsockopt manpages for details).

Constructors

Affinity Word64

ZMQ_AFFINITY

Backlog CInt

ZMQ_BACKLOG

Events PollEvent

ZMQ_EVENTS

FD CInt

ZMQ_FD

Identity String

ZMQ_IDENTITY

Linger CInt

ZMQ_LINGER

Rate Int64

ZMQ_RATE

ReceiveBuf Word64

ZMQ_RCVBUF

ReceiveMore Bool

ZMQ_RCVMORE

ReconnectIVL CInt

ZMQ_RECONNECT_IVL

ReconnectIVLMax CInt

ZMQ_RECONNECT_IVL_MAX

RecoveryIVL Int64

ZMQ_RECOVERY_IVL

SendBuf Word64

ZMQ_SNDBUF

HighWM Word64

ZMQ_HWM

McastLoop Bool

ZMQ_MCAST_LOOP

RecoveryIVLMsec Int64

ZMQ_RECOVERY_IVL_MSEC

Swap Int64

ZMQ_SWAP

Helpers

type Topic = StringSource

Subscription Topic

alltopics :: [Topic]Source

Subscribe to all topics

notopic :: [Topic]Source

Subscribe to no topic

type Timeout = Int64

type Parameter = StringSource

Control Parameter

noparam :: ParameterSource

Ignore parameter