patterns-0.1.1: Common patterns in message-oriented applications

Safe HaskellNone




Stream processing services


Processing Streams

This module provides functions to automate stream processing, in particular the function withStreams that starts a background action polling on a set of streams. The function uses uses application-defined callbacks to manipulate streams.

The functions runReceiver and runSender are intended mainly for testing. They send or receive respectively streams, which are handled or created by a conduit Sink and Source.

withStreams :: Context -> Service -> Timeout -> [PollEntry] -> StreamAction -> OnError_ -> StreamSink -> Control a -> IO aSource

Starts polling on a set of streams. The actual polling will be run in another thread. The current thread continues with the action passed in. When this action terminates, the streamer stops polling.


  • Context - The ZMQ context
  • Service - The service name indicated for instance in error messages.
  • Timeout - The polling timeout: < 0 - listens eternally, 0 - returns immediately, > 0 - timeout in microseconds; when the timeout expires, the StreamAction is invoked.
  • PollEntry - List of PollEntry; the streamer will poll over all list members. When input is available, it is directed to the StreamSink.
  • StreamAction - Invoked when timeout expires.
  • OnError_ - Error handler
  • StreamSink - The sink, to which the stream is sent. Note that the sink must terminate the outgoing stream (using one of the terminating sinks described below). Not terminating the stream properly will result in a zeromq socket error.
  • Control a - The action to invoke, when the streamer has been started; The Control is used to control the device.

runReceiver :: Socket a -> Timeout -> SinkR (Maybe o) -> IO (Maybe o)Source

Receiver Sink: Internally a zeromq socket is waiting for input; when input is available, it is send to the sink.

  • 'Z.Socket a' - The source socket
  • Timeout - receiver timeout < 0 - listens eternally, 0 - returns immediately, > 0 - timeout in microseconds; when the timeout expires, the stream terminates and the return value is Nothing.

runSender :: Socket a -> Source -> IO ()Source

Sender Source: The Source generates a stream, which is relayed to the Socket.

data PollEntry Source

A poll entry describes how to access and identify a socket




pollId :: Identifier

How to address this particular stream

pollAdd :: String

The address to link to

pollType :: AccessType

The zeromq socket type

pollLink :: LinkType

How to link (bind or connect)

pollSub :: [Service]

List of Service (or topics) for subscribers

pollOs :: [SocketOption]

zeromq socket options

data AccessType Source

Defines the type of a PollEntry; the names of the constructors are similar to the corresponding ZMQ socket types.



Represents a server and expects connections from clients; corresponds to ZMQ Socket Type Rep


Represents a client and connects to a server; corresponds to ZMQ Socket Type Req


Represents a load balancer, expecting connections from clients; corresponds to ZMQ Socket Type XRep


Represents a router expecting connections from servers; corresponds to ZMQ Socket Type XReq


Represents a publisher; corresponds to ZMQ Socket Type Pub


Represents a subscriber; corresponds to ZMQ Socket Type Sub


Represents a Pipe; corresponds to ZMQ Socket Type Push


Represents a Puller; corresponds to ZMQ Socket Type Pull


Represents a Peer; corresponds to ZMQ Socket Type Pair

parseAccess :: String -> Maybe AccessTypeSource

Safely read AccessType; ignores the case of the input string (e.g. "servert" -> ServerT)

data LinkType Source

A zeromq AccessPoint can be bound or connected to its address. Only one peer can bind the address, all other parties have to connect.



Bind the address


Connect to the address

link :: LinkType -> Socket a -> String -> [SocketOption] -> IO ()Source

Binds or connects a socket to an address

parseLink :: String -> Maybe LinkTypeSource

Safely read LinkType; ignores the case of the input string and, besides "bind" and "connect", also accepts "bin", "con" and "conn"; intended for use with command line parameters


A streamer represents the current state of the streaming device started by means of withStreams. It is passed in to application-defined callbacks, namely the timeout action (StreamAction) and the Sink (StreamSink).

There is a bunch of useful sinks that receive a streamer as input (see below).

data Streamer Source

Holds information on streams and the current state of the streamer, i.e. the current source. Streamers are passed to processing conduits.

type StreamConduit = Streamer -> Conduit ByteString ()Source

Conduit with Streamer

type StreamSink = Streamer -> SinkSource

Sink with Streamer

type StreamAction = Streamer -> IO ()Source

IO Action with Streamer (e.g. Timeout action)

filterStreams :: Streamer -> (Identifier -> Bool) -> [Identifier]Source

Filter subset of streams; usually you want to filter a subset of streams to which to relay an incoming stream. Note that the result is just a list of stream identifiers, which of course could be used directly in the first place. A meaningful use of filterstreams would be, for instance:

 let targets = filterStreams s (/= getSource s)

Where all streams but the source are selected.

getSource :: Streamer -> IdentifierSource

Get current source


To manipulate and relay incoming streams, the application passes a StreamSink to withStreams. The following sinks are building blocks for more application-focused manipulations.

The peculiarities of the zeromq library, in particular the fact that messages are sent entirely, i.e. with all segments belonging to the same message, or not at all, require some care in designing zeromq sinks. The sink must ensure to mark the last segment sent (see SndMore). Also, the incoming stream should be exhausted to avoid message segements lingering around in the pipe.

Applications can construct new sinks by either calling a building block in the their own sink code, e.g.:

 example :: [B.ByteString] -> StreamSink
 example headers s is = do
   mbX <- C.await
   case mbX of 
     Nothing -> return ()
     Just x  -> do stream  s is headers
                   passAll s is

or by combining a sink with a conduit forming a more complex sink, e.g.:

 example :: StreamSink
 example s is = sourceList headers =$ passAll s is

stream :: Streamer -> [Identifier] -> [ByteString] -> SinkSource

Send the ByteString segments to the outgoing streams identified by [Identifier]. The stream is terminated.

part :: Streamer -> [Identifier] -> [ByteString] -> SinkSource

Send the ByteString segments to the outgoing streams identified by [Identifier] without terminating the stream, i.e. more segments must be sent.

passAll :: Streamer -> [Identifier] -> SinkSource

Pass all segments of an incoming stream to a list of outgoing streams. The stream is terminated.

pass1 :: Streamer -> [Identifier] -> SinkSource

Pass one segment and ignore the remainder of the stream. The stream is terminated.

passN :: Streamer -> [Identifier] -> Int -> SinkSource

Pass n segments and ignore the remainder of the stream. The stream is terminated.

passWhile :: Streamer -> [Identifier] -> (ByteString -> Bool) -> SinkSource

Pass while condition is true and ignore the remainder of the stream. The stream is terminated.

ignoreStream :: SinkSource

Ignore an incoming stream


The controller is passed in to the control action of withStreams. It allows the application to control the polling device. Through the controller, the device can be stopped, restarted, paused and resumed and it is possible to send and receive streams through the controler. To relay streams to the controller (i.e. directly to application code) the internal stream, which is identified by the string "_internal" can be used.

data Controller Source


type Control a = Controller -> IO aSource

Control Action

internal :: IdentifierSource

The internal stream that represents the Controller. StreamSinks can write to this stream, e.g.:

 passAll s [internal]

And the streamer may also receive from this stream, e.g.:

 if getSource s == internal

stop :: Controller -> IO ()Source

Stop streams

pause :: Controller -> IO ()Source

Pause streams

resume :: Controller -> IO ()Source

Resume streams

send :: Controller -> [Identifier] -> Source -> IO ()Source

Send a stream through the controller

receive :: Controller -> Timeout -> SinkR (Maybe a) -> IO (Maybe a)Source

Receive a stream through the controller that was sink'd to the target internal.

Complete Example

The following code implements a ping pong communication using two streamers. The code is somewhat simplistic; it does not use timeout, ignores errors and does not provide means for clean shutdown. It focuses instead on demonstrating the core of the streamer functionality.

For more examples on how to use streams, you may want to refer to the MDP Broker code in Network.Mom.Patterns.Broker.Broker.

 import           Control.Monad.Trans
 import           Control.Monad (forever)
 import           Control.Concurrent
 import qualified Data.Conduit          as C
 import qualified Data.ByteString.Char8 as B
 import           Network.Mom.Patterns.Streams 
 import qualified System.ZMQ as Z
 main :: IO ()
 main = Z.withContext 1 $ \ctx -> do
          ready <- newEmptyMVar
          _ <- forkIO (ping ctx ready)
          _ <- forkIO (pong ctx ready)
          forever $ threadDelay 100000
 ping :: Z.Context -> MVar () -> IO ()
 ping ctx ready = withStreams ctx "pong" (-1)
                        [Poll "ping" "inproc://ping" PeerT Bind [] []]
                        (\_ -> return ())      -- no timeout
                        (\_ _ _ -> return ())  -- ignore errors
                        pinger $ \c -> do
                    putMVar ready () -- ping is ready
                    putStrLn "starting game!"
                    send c ["ping"] startPing -- send through controller
                                              -- to initialise ping pong
                    putStrLn "game started!"
                    forever $ threadDelay 100000
   where startPing = C.yield $ B.pack "ping"
 pong :: Z.Context -> MVar () -> IO ()
 pong ctx ready = do 
   _ <- takeMVar ready -- wait for ping getting ready
   withStreams ctx "ping" (-1)
               [Poll "pong" "inproc://ping" PeerT Connect [] []]
               (\_ -> return ())
               (\_ _ _ -> return ())
               pinger $ \_ -> forever $ threadDelay 100000
 pinger :: StreamSink
 pinger s = C.awaitForever $ \i -> 
              let x = B.unpack i 
               in do liftIO $ putStrLn x
                     liftIO $ threadDelay 500000
                     case x of
                       "ping" -> stream s ["pong"] [B.pack "pong"]
                       "pong" -> stream s ["ping"] [B.pack "ping"]
                       _      -> return ()