lt      !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~ Safe-Infered Safe-Infered3Subscription Topic Control Parameter 8A timeout action is just an IO action without arguments $A device identifier is just a plain  A simple string  converter may be: ! let oconv = return . fromString &Converters are user-defined functions  that convert a  to a value of type a () or  a value of type a to  (). % Converters are, hence, similar to put and get in the Binary  monad. ; The reason for using explicit, user-defined converters  instead of Binary encode and decode  is that the conversion : may be more complex, involving reading configurations  or other  actions. ?The simplest possible in-bound converter for plain strings is:  let iconv = return . toString How to link to an , Connect to the address Bind the address )Indicates criticality of the error event .The service cannot recover and will terminate One worker thread is lost (Server only) !The current operation  (e.g. processing a request)  has not terminated properly, ( but the service is able to continue; . the error may have been caused by a faulty ) request or other temporal conditions. ' Note that if an application-defined  or   results in   (by means of ), ! the incident is classified as !; & if it throws an IO Error, however, ! the incident is classified as . ",Error handler for all services but servers;  receives the  of the error event, $ the exception, the service name & and the service control parameter. A good policy is ' to terminate or restart the service  when a  error occurs ! and to continue, if possible,  on a plain !. 9 The error handler, additionally, may log the incident  or inform an administrator. #Error handler for servers;  receives the  of the error event, E the exception, the server name and the service control parameter.  If the error handler returns  a   this ) is sent to the client as error message. *A good policy for implementing servers is  to terminate or restart the Server  when a  or   error occurs . and to send an error message to the client  on a plain !. 9 The error handler, additionally, may log the incident  or inform an administrator. $" to process data segments of type i;  receives the  and the control parameter % A variant of & that returns type o instead of   o.  Please note that ' does not mean strict, here; * it just means that the result is not a . & A variant of ( without input ' A variant of ( that returns type o instead of   o.  Please note that ' does not mean strict, here; * it just means that the result is not a . (7A function that may be used with some of the fetchers;  The helper returns  to signal # that no more data are available  and  o to continue the stream.  FetchHelpers are used with Servers ! that receive requests of type i.  The function  receives the , the conrol parameter  and an input of type i; ) A variant of * without input *" to process data segments of type o;  receives the , the control parameter  and an input of type i;  * is used by Server s that receive requests of type i 8 and produce an outgoing stream with segments of type o. +(A poll entry describes how to handle an , ,#Describes how to access a service;  an ,! usually consists of an address  and a list of . 3 Addresses are passed in as strings of the form:  "tcp://*:5555": for binding the port 5555 via TCP/IP 4 on all network interfaces; A an IPv4 address or the operating system A interface name could be given instead.  "tcp://localhost:5555": for connecting to the port 5555 $ on  localhost via TCP/IP; ; the endpoint may given as DNS name 8 or as an IPv4 address.  "ipc://tmp/queues/0" : for binding and connecting to F a local inter-process communication G endpoint, in this case created under " /tmp/queues/0; : only available on UNIX.  "inproc://worker" : for binding and connecting to 9 the process internal address worker <For more options, please refer to the zeromq documentation. .Address string /Socket options 0Defines the type of a +; A the names of the constructors are similar to ZMQ socket types = but with some differences to keep the terminology in line  with basic patterns.  The leading "X" stands for "Access"  (not for "eXtended" as in XRep and XReq). 1Represents a Peer; . corresponding peers must use complementing ; " corresponds to ZMQ Socket Type  2Represents a Puller;  should be used with ; " corresponds to ZMQ Socket Type  3Represents a Pipe;  should be used with ; " corresponds to ZMQ Socket Type  4Represents a subscriber;  should be used with ; " corresponds to ZMQ Socket Type  5Represents a publisher;  should be used with ; " corresponds to ZMQ Socket Type  6Represents a router ' expecting connections from servers;  should be used with ; " corresponds to ZMQ Socket Type  7Represents a load balancer, ' expecting connections from clients;  should be used with ; " corresponds to ZMQ Socket Type  8.Represents a client and connects to a server;  should be used with ; " corresponds to ZMQ Socket Type  9:Represents a server and expects connections from clients;  should be used with ; " corresponds to ZMQ Socket Type  : Creates a +;  Parameters:  : identifies an ,; E the identifier shall be unique within the device.  0: the 0 of this ,  ,: the ,  : how to link to this ,  []: The subscription topics - 8 ignored for all poll entries, but those  with 0 4 ; Safely read ; ( ignores the case of the input string  and, besides "bind" and "connect",  also accepts "bin", "con" and "conn"; 1 intended for use with command line parameters <  ->  =  ->  > UTF8 String ->  ?  -> UTF8 String @ String ->  A  -> String BChains IO Actions in an  together;  returns  when an error occurs CChains IO Actions in an  together;  throws   using   when an error occurs DExecutes an IO Actions in an ;  throws   using  when an error occurs EExecutes an IO Actions in an ;  returns  when an error occurs FIgnore parameter GSubscribe to all topics HSubscribe to no topic H !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGH[  !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGH0! "#$%&'()*+,-./0 987654321:;<=>?@ABCDEFGH Safe-Infered IGeneric Service data type;  I* is passed to application-defined actions ) used with background services, namely E withServer, withPeriodicPub, withSub, withPuller and withDevice. JObtains the service name K Obtains the  from I L Pauses the I M Resumes the I N Changes the I control parameter OChanges SocketOption PAdds a + to a device;  the I , of course, must be a device, % the command is otherwise ignored. Q Removes a + from a device;  the I , of course, must be a device, % the command is otherwise ignored. R!Changes the timeout of a device;  the I, of course, must be a device, % the command is otherwise ignored. #IJKLMNOPQR#IJKLMNOPQRIJKLMNOPQRportable experimental Safe-InferedSCalls an application-defined getter function  until this returns ; & if the getter throws an exception,  the enumerator returns . TCalls the application-defined getter function n times; # The enumerator receives a pair (, ), - where the first integer is a counter and " the second is the upper bound.  n is defined as  snd - fst, i.e. 9 the counter is incremented until it reaches the value A of the bound. The counter must be a value less than the bound  to avoid protocol errors, i.e. the getter must be called  at least once. 9 The current value of the counter and additional input  are passed to the getter. & if the getter throws an exception,  the enumerator returns . UCalls the application-defined getter function once; ' the enumerator must return a value  (the result type is not ), 8 otherwise, the sending iteratee has nothing to send 7 which would most likely result in a protocol error. & if the getter throws an exception,  the enumerator returns . V)Passes just the input value to iteratee;   just "hello world" hence, reduces to just  hello world sent over the wire. WCalls the application-defined (  until it returns ;  note that the ( shall return at least one  $ value to avoid a protocol error.  If the ( throws an exception,  the W returns . X A variant of W without input; YCalls the application-defined ' once;  If the ' throws an exception,  the W returns . Z A variant of Y without input; [6Calls the iteratee for each element of the input list \ A variant of [ for services without input; ? the list, in this case, is passed as an additional argument  to the fetcher. ]Calls the application-defined getter n times;  The getter is a variant of ' A with the current value of the counter as additional argument.  For more details, refer to T. ^ A variant of ] without input _-Passes just the input value to the iteratee;   fetchJust "hello world" hence, reduces to just " hello world" sent over the wire.  Note that the input i is ignored. ` A variant of _ without input a(Calls the application-defined IO action # for each element of the stream; ' The IO action could, for instance, $ write to an already opened file,  store values in an MVar or  send them through a Chan to another thread  for further processing. ( An exception thrown in the IO action  is re-thrown by . bReturns one value of type i; > if the enumerator creates a value, this value is returned; + otherwise, the input value is returned. cReturns one value of type  i;  equal to  d4Returns a list containing all chunks of the stream;  equal to  ; / note that this iteratee causes a space leak D and is not suitable for huge streams or streams of unknown size. e5Returns a string containing all chunks of the stream ' intercalated with the input string, e.g.: . if the stream consists of the two elements "hello" and "world"   toString " " returns  hello world. / Note that this iteratee causes a space leak D and is not suitable for huge streams or streams of unknown size. f&Merges the elements of a stream using ;  if the stream is empty, f returns .  The type i must be instance of . / Note that this iteratee causes a space leak D and is not suitable for huge streams or streams of unknown size. g3Opens a data sink, dumps the stream into this sink 2 and closes the sink when the stream terminates  or when an error occurs; 9 the first IO action is used to open the sink (of type s), " the second closes the sink and / the third writes one element into the sink. h Variant of g+ that uses the first segment of the stream ( as input parameter to open the sink. + The first segment, which could contain $ a file name or parameters for an SQL query,  is not written to the sink.  As with g3, the sink is closed when the stream terminates or  when an error occurs. i Similar to g1, but uses a data sink that is opened and closed < outside the scope of the service or does not need to be  opened and closed at all; * examples may be services that write to MVar or Chan.  i is implemented as a closure of a: ( nosink save ctx p = store (save ctx p) STUVWXYZ[\]^_`abcdefghi$%&'()*STUVWXYZ[\]^_`abcdefghiSTUV*)('&%WXYZ]^_`[\bcdefa$ghiSTUVWXYZ[\]^_`abcdefghi Safe-InferedjBHolds information on streams and the current state of the device; ) streamers are passed to transformers. kA transformer is an   to transform streams.  It receives two arguments:  a j. which provides information on access points;  a Sequence2 which may be used to store chunks of an incoming 0 stream before they are sent to the target. @Streamer and sequence keep track of the current transformation. 7 The streamer knows where the stream comes from and 5 may be queried about other streams in the device. l7Starts a device and executes an action that receives a I  to control the device  Parameters:    - The ZMQ context   - The device name  . - The initial value of the control parameter   - The polling timeout:  < 0 - listens eternally,  0 - returns immediately,  > 0 - timeout in microseconds; # when the timeout expires, the  action is invoked.  + - List of +; / the device will polll over / all list members and direct 4 streams to a subset of this list 9 determined by the stream transformer.   - in-bound converter; < the stream is presented to the transformer # as chunks of type o.   - out-bound converter  " - Error handler   ->  - Action to perform on timeout   -> k - The stream transformer  I" -> IO () - The action to invoke, < when the device has been started;  The I is used to control the device. mStarts a queue; + a queue connects clients with a dealer (7),  i.e. a load balancer for requests, ( and servers with a router (6) that routes responses ' back to the client.  Parameters:  : the ZMQ Context  : the queue name  (,, ): / the access point of the dealer (7) * and its link type; ; you usually want to bind the dealer ? so that many clients can connect to it.  (,, ): / the access point of the router (6); * and its link type; ; you usually want to bind the router ? so that many servers can connect to it.  ": the error handler  I -> IO (): the action to run m is implemented by means of l as:  @ withQueue ctx name (dealer, ld) (router, lr) onerr act = ) withDevice ctx name noparam (-1)  [pollEntry "clients" XDealer dealer ld [],  pollEntry "server" XRouter router lr []] # return return onerr (_ -> return ()) (_ -> putThrough) act nStarts a Forwarder; 9 a forwarder connects a publisher and its subscribers. " Note that the forwarder uses a  subscriber (4)  to conntect to the  publisher and  a  publisher (5) to bind the  subscribers.  Parameters:  : the ZMQ Context  : the forwarder name  : the subscription topic  (,, ,): * the access points; ( the first is the  subscriber (4), ) the second is the  publisher (5); 2 this rule is not enforced - by the type system; @ you have to take care of it on your own!  ": the error handler  I -> IO (): the action to run n is implemented by means of l as:  < withForwarder ctx name topics (sub, pub) onerr act = ) withDevice ctx name noparam (-1)  [pollEntry " subscriber" XSub router Connect topics,  pollEntry " publisher" XPub dealer Bind []] # return return onerr (_ -> return ()) (_ -> putThrough) act oStarts a pipeline;  a pipeline connects a pipe  and its workers. ! Note that the pipeline uses a puller (2)  to conntect to the pipe and  a pipe (3) to bind the pullers.  Parameters:  : the ZMQ Context  : the pipeline name  (,, ): / the access point of the puller (2) * and its link type; ? you usually want to connect the puller 7 to one pipe so that it appears 3 as one puller among others, 8 to which the pipe may send jobs.  (,, ): / the access point of the pipe (3); * and its link type; 9 you usually want to bind the pipe ? so that many pullers can connect to it.  ": the error handler  I -> IO (): the action to run o is implemented by means of l as:  I withPipeline ctx name topics (puller, l1) (pusher, l2) onerr act = ) withDevice ctx name noparam (-1)  [pollEntry "pull" XPull puller l1 [],  pollEntry "push" XPush pusher l2 []] # return return onerr (_ -> return ()) (_ -> putThrough) act p=Retrieves the identifier of the source of the current stream qFilters target streams;  the function resembles filter of   : " it receives the property of an ;  if a +3 has this property, it is added to the result set. DThe function is intended to select targets for an out-going stream, ; typically based on the identifier of the source stream. C The following example selects all poll entries, but the source:  + broadcast :: Streamer o -> [Identifier] - broadcast s = filterTargets s notSource  where notSource = (/=) (getStreamSource s) r6Sends all sequence elements to the targets identified  by the list of % and terminates the outgoing stream. 5 The transformation continues with the transformer % passed in and an empty sequence. s6Sends all sequence elements to the targets identified  by the list of  , but unlike r, + does not terminate the outgoing stream. 5 The transformation continues with the transformer % passed in and an empty sequence. /Note that all outgoing streams, once started, 7 have to be terminated before the transformer ends. + Otherwise, a protocol error will occur. tSends one element (o ) to the targets and continues  with an empty sequence; E the Boolean parameter determines whether this is the last message  to send. /Note that all outgoing streams, once started, 7 have to be terminated before the transformer ends. + Otherwise, a protocol error will occur. uSends one element (o) to the targets,  but, unlike t+, passes the sequence to the transformer.  u) does not terminate the outgoing stream. v:Terminates the outgoing stream by sending the new element ; as last segment to all targets and ends the transformer 0 by ignoring the rest of the incoming stream. w#Adds a new element to the sequence 6 and calls the transformer without sending anything x>Merges the new element with the last element of the sequence; 7 if the sequence is currently empty, the new element  will be its only member. : Merged elements appear as one element of the sequence . in the continuation of the transformation.  The type o must be a , i.e.,  it must implement mappend and mempty. ( The function does not send anything. yTransformer that 3 passes messages one-to-one to all poll entries  but the current source zTransformer that 0 ignores the remainder of the current stream; 2 it is usually used to terminate a transformer. {Transformer that H does nothing but continuing the transformer, from which it is called  and, hence, is identical to  return (); 5 it is usually passed to a transformer combinator,  like r%, to continue processing right here 2 instead of recursing into another transformer. jklmnopqrstuvwxyz{#+0123456789:PQRjklmnopqrstuvwxyz{#lmno+:0987654321PQRjpqkyz{rstuvwxjklmnopqrstuvwxyz{portable experimental Safe-Infered#|/An Exclusive Pair is a general purpose pattern 7 of two equal peers that communicate with each other  by sending () and receiving () data.  One of the peers has to  the ,  the other  s to it. }A pipeline consists of a "pusher"  and a set of workers ("pullers"). 8 The pusher sends jobs down the pipeline that will be $ assigned to one of the workers. ; The pipeline pattern is, thus, a work-balancing scheme. ~1An alternative to the background subscriber (see );  Publisher Client data type "Starts one or more server threads  and executes an action that  receives a I to control the server.  The I is a thread local resource. , It must not be passed to threads forked 1 from the thread that has started the service.  The I+ is valid only in the scope of the action. D When the action terminates, the server is automatically stopped. > During the action, the server can be paused and restarted.  Also, the  of the underlying ZMQ   can be changed.  Please refer to L, M and O for more details. 2The application may implement control parameters. 7 Control parameters are mere strings that are passed # to the application call-backs. 8 It is up to the application to enquire these strings C and to implement different behaviour for the possible settings. 4 Control parameter can be changed during run-time  by means of N.  Parameters:   : The ZMQ context;  0: The name of the server, useful for debugging;  .: The initial value of the control parameter 8 passed to all application call-backs;   : The number of worker threads; 4 note that a server with only one thread 2 handles client requests sequentially. # The number of threads H (together with the number of hardware processing resources) L defines how many client requests can be processed in parallel.  ,: The access point, > through which this server can be reached;  : The link type; E standalone servers usually bind their access point, 0 whereas clients connect to it. 4 Instead, a server may also connect - to a load-balancing device, < to which other servers and clients connect  (see l and m).  (: The converter to convert the incoming % data stream (of type ) 1 into a client request component. 1 Note that the converter converts > single message segments to components of type c.  The , receiving this c-typed - elements shall combine them . to a complete request of type i, . which is then processed by an  / to create the server response.  /: The converter to convert the results of type o  to a , which then is sent % back to the client.  #: The error handler   -> : The  that processes 8 request components of type c : and yields a request of type i. ! The  argument is 4 the control parameter, 9 whose logic is implemented 1 by the application.  *: The  that processes " the request of type i to produce  results of type o.  I" -> IO (): The action to invoke, < when the server has been started; D the service is used to control the server. %The following code fragment shows a / simple server to process data base queries 0 using standard converters and error handlers  not further defined here:    withContext 1 $ \ ctx -> do  c <- connectODBC "DSN=xyz" -- some ODBC connection  s < - prepare c " select ..." -- some database query  withServer ctx  "MyQuery" -- name of the server is "MyQuery" & noparam -- no parameter - 5 -- five worker threads  (Address "tcp://*:5555" []) Bind -- bind to this address 2 iconv oconv -- some standard converters 5 onErr -- some standard error handler  (\ _ -> one []) --  for single segment messages; ' -- refer to  Enumerator for details  (dbFetcher s) $ \srv -> -- the ; @ untilInterrupt $ do -- install a signal handler for SIGINT C -- and repeat the following action ) -- until SIGINT is received;  putStrLn $ "server " ++ srvName srv ++  " up and running..." " threadDelay 1000000 7The untilInterrupt loop may be implemented as follows:    !untilInterrupt :: IO () -> IO ()  untilInterrupt run = do  continue <- newMVar True  _ <;- installHandler sigINT (Catch $ handler continue) Nothing  go continue ' where handler m = modifyMVar_ m (\_ -> return False)  go m = do run $ continue < - readMVar m 0 when continue $ go m Finally, a simple dbFetcher:  6 dbFetcher :: SQL.Statement -> Fetch [SQL.SqlValue] String 3 dbFetcher s _ _ _ stp = tryIO (SQL.execute s []) >>= \ _ -> go stp  where go step =  case step of $ E.Continue k -> do  mbR <- tryIO $ SQL.fetchRow s  case mbR of - Nothing -> E.continue k G -- convRow is not defined here < Just r -> go $$ k (E.Chunks [convRow r]) % _ -> E.returnI step Obtaining the  from  Setting  to the underlying ZMQ   Creates a ; 7 a client is not a background process like a server, + but a data type that provides functions " to interoperate with a server.   creates a client and + invokes the application-defined action,  which receives a  argument.  The lifetime of the  is limited  to the invoked action. # When the action terminates, the  dies.  Parameters:  : The ZMQ Context;  ,2: The access point, to which the client connects;  +: Converter to convert a request from type o $ to the wire format . 8 Note that, as for servers, the request = may be composed of components that together , form the request. The type o A corresponds to one of these request components, A not necessarily to the request type as a whole, = which is determined when issuing a request.   : Converter to convert a reply ()  into type i. > Note again that the reply may consist of many + message segments. The type i relates to one " reply component, > not necessarily to the reply type as a whole, < which is determined when issuing a request.  2 -> IO a: The action to perform with this client. $Synchronously requesting a service; + the function blocks the current thread,  until a reply is received.  Parameters:  ': The client that performs the request  2: Enumerator to create the request message stream  /: Iteratee to process the reply message stream 0A simple client that just writes the results to stdout:    rcv :: String -> IO ()  rcv req = withContext 1 $ \ctx ->  withClient ctx  (Address "tcp://localhost:5555" []) -- connect to this address F (return . B.pack) (return . B.unpack) $ -- string converters  \s -> do * -- request with enum and outit  ei <#- request s (enum req) outit  case ei of # Left e -> putStrLn $ "Error: " ++ show (e::SomeException) " Right _ -> return ()   0 -- Enumerator that returns just one string 1 enum :: String -> E.Enumerator String IO () ! enum = once (return . Just)   , -- Iteratee that just writes to stdout & outit :: E.Iteratee String IO ()  outit = do  mbi < - EL.head  case mbi of  Nothing -> return () 1 Just i -> liftIO (putStrLn i) >> outit -Note that this code just issues one request, + which is not the most typical use case. 8 It is more likely that the action will loop for ever > and receive requests, for instance, from a user interface. %Asynchronously requesting a service; / the function sends a request to the server ! without waiting for a result.  Parameters:  ': The client that performs the request  2: Enumerator to create the request message stream Polling for a reply; , the function polls for a server request. , If nothing has been received, it returns ;  otherwise it returns  the result or an error.  Parameters:  ': The client that performs the request  /: Iteratee to process the reply message stream The synchronous request (see ) - could be implemented asynchronously like:   rcv :: String -> IO ()  rcv req = withContext 1 $ \ ctx -> do  let ap = address l "tcp://localhost:5555" []  withClient ctx ap 0 (return . B.pack) (return . B.unpack)  $ \s -> do  ei <- try $ askFor s (enum req)  case ei of # Left e -> putStrLn $ "Error: " ++ show (e::SomeException)  Right _ -> wait s * -- check for results periodically + where wait s = checkFor s outit >>= \mbei ->  case mbei of / Nothing -> do putStrLn " Waiting..." B threadDelay 10000 >> wait s . Just (Left e) -> putStrLn $ "Error: " ++ show e , Just (Right _) -> putStrLn "Ready!" Obtaining the  from  Setting  to the underlying ZMQ  Creates a publisher;  A publisher is a data type > that provides an interface to publish data to subscribers.  ! creates a publisher and invokes " an application-defined action,  which receives a  argument. ; The lifetime of the publisher is limited to the action. - When the action terminates, the publisher dies.  Parameter:  : The ZMQ Context  ,+: The access point the publisher will bind  #: A converter to convert from type o $ to the wire format . 2 Note that a publisher may create ) a data stream; the type o is then 9 the type of one segment of this stream, / not of the stream as a whole.   -> IO (): The action to invoke 4Publishes the data stream created by an enumerator;  Parameters:  : The publisher  (: The enumerator to create an outgoing " data stream. #A simple weather report publisher:    withContext 1 $ \ctx -> withPub ctx  (Address "tcp://*:5555" [])  (return . B.pack) $ \pub -> untilInterrupt $ do , issue pub (once weather noparam) 2 threadDelay 10000 -- update every 10ms   4 -- fake weather report with some random values , weather :: String -> IO (Maybe String)  weather _ = do  zipcode <%- randomRIO (10000, 99999) :: IO Int  temperature < - randomRIO (-10, 30) :: IO Int  humidity < - randomRIO ( 10, 60) :: IO Int 0 return $ Just (unwords [show zipcode, 4 show temperature, . show humidity]) "Creates a background process that  periodically publishes data;  Parameters:   : The ZMQ Context  : Name of this Publisher; $ useful for debugging  -: The initial value of the control parameter  /: The period of the publisher in microseconds; = the process will issue the publisher data ( every n microseconds.  ,: Bind address  (: A converter that converts one segment . of the data stream from type o $ to the wire format   ": Error Handler   -> *:  to create 4 the outgoing data stream; @ the string argument is the parameter.  I& -> IO (): The user action to perform 3The weather report publisher introduced above (see ) " can be implemented by means of  as:   withPeriodicPub ctx "Weather Report" noparam % 100000 -- publish every 100ms  (Address "tcp://*:5555" []) - (return . B.pack) -- string converter 3 onErr_ -- standard error handler  (\+_ -> fetch1 fetch) -- creates one instance 1 -- of the return of "fetch"; $ -- see  Enumerator for details  $ \pub -> & untilInterrupt $ do -- until SIGINT, see  for details  threadDelay 100000  putStrLn $ "I am doing nothing " ++ srvName pub 'A subscription is a background service , that receives and processes data streams  from a publisher. ( A typical use case is an application / that operates on periodically updated data; / the subscriber would receive these data and < and make them accessible to other threads in the process  through an .  Parameters:  : The ZMQ Context  : The subscriber's name  -: The initial value of the control parameter  []: The topics to subscribe to; ' in the example above (), < the publisher publishes the weather report > per zip code; the zip code, in this example, ? could be a meaningful topic for a subscriber. 7 It is good practice to send the topic 0 in an initial message segment, < the envelope, to avoid that the subscriber @ matches on some arbitrary part of the message.  (: A converter that converts one segment 4 of the incoming data stream to type o  ": Error handler  $: & to process the incoming data stream.  I1 -> IO (): Application-defined action to control 0 the service. Note that I is > a thread-local resource and must not F be passed to threads forked from the action. Weather Report Subscriber:   withContext 1 $ \ctx ->  withSub ctx "Weather Report" noparam  ["10001"] -- zipcode to subscribe to  (Address "tcp://localhost:5555" []) $ (return . B.unpack) D onErr_ output -- Iteratee that just writes to stdout  $ \s -> untilInterrupt $ do  putStrLn $ "Doing nothing " ++ srvName s % threadDelay 1000000 Obtaining the  from ~ Setting  to the underlying ZMQ   Similar to , a ~ is a data type 1 that provides an interface to subscribe data.  " creates a subscriber and invokes " an application-defined action,  which receives a ~ argument. < The lifetime of the subscriber is limited to the action. . When the action terminates, the subscriber dies. Polling for data; 6 If nothing has been received, the function returns ;  otherwise it returns  the result or an error.  Parameters:  ~: The subscriber  : Iteratee to process the data Waiting for data; + the function blocks the current thread, 5 until data are being received from the publisher.  It returns either  or the result.  Parameters:  ~: The subscriber  &: Iteratee to process the data stream Unsubscribe a topic Subscribe another topic "A puller is a background service = that receives and processes data streams from a pipeline.  Parameters:  : The ZMQ Context  : The service name  -: The initial value of the control parameter  ,: The address to connect to  : A converter to convert 5 segments of the incoming data stream % from the wire format   to the type i  ": Error Handler  $:  to process ' the incoming data stream  I& -> IO (): Application-defined action 1A worker that just writes the incoming stream to stdout:   withContext 1 $ \ctx ->  withPuller ctx "Worker" noparam  (Address "tcp://localhost:5555" []) ! (return . B.unpack)  onErr_ output  $ \s -> untilInterrupt $ do  putStrLn "Doing nothing " ++ srvName s " threadDelay 100000 Obtaining the  from } Setting  to the underlying ZMQ  Creates a pipeline;  a } is a data type ! that provides an interface to push a data stream 7 to workers connected to the other side of the pipe.  7 creates a pipeline and invokes an application-defined  action which receives a } argument.  The lifetime of the } is limited to the action. # When the action terminates, the } dies.  Parameters:  : The ZMQ Context  ,: The bind address  *: A converter to convert message segments  of type o to the wire format   } -> IO (): The action to invoke Sends a job down the pipeline;  Parameters:  }: The pipeline  ': enumerator to create the data stream * that constitutes the job A simple pusher:   sendF :: FilePath -> IO ()  sendF f = withContext 1 $ \ ctx -> do  let ap = Address "tcp://*:5555" []  withPipe ctx ap return $ \p -> 2 push pu (EB.enumFile f) -- file enumerator B -- see Data.Enumerator.Binary (EB)  Obtains the  from a | Sets   Creates a |;  a peer is a data type B that provides an interface to exchange data with another peer.  5 creates the peer and invokes an application-defined  action that receives a | argument.  The lifetime of the | is limited to the action. # When the action terminates, the | dies.  Parameters:  : The ZMQ Context  ,): The address, to which this peer either & binds or connects  ,: One of the peers has to bind the address, + the other has to connect.  *: A converter to convert message segments % from the wire format  to type i  *: A converter to convert message segments  of type o to the wire format   | -> IO (): The action to invoke %Sends a data stream to another peer;  Parameters:  | : The peer  /: Enumerator to create the outoing data stream *Receives a data stream from another peer;  Parameters:  | : The peer  /: Iteratee to process the incoming data stream #|}~^  !"#,-./;<=>?@ABCDEFGHIJKLMNO|}~_~}|,-./;<=A@?>! #"CBDEIJJKLMNO GHF#|}~ portable experimental Safe-Infered  !"#$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~                   ! "# "$%&'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\]^_`abcdefghijklmnopqrstuvwxyz{|}~         1   "patterns-0.0.3Network.Mom.Patterns.BasicNetwork.Mom.Patterns.DeviceNetwork.Mom.Patterns.EnumeratorFactoryTypesServiceData.Enumerator.ListheadconsumeDataListNetwork.Mom.Patternszeromq-haskell-0.8.4 System.ZMQ withContextAffinityBacklogEventsFDIdentityLingerRate ReceiveBuf ReceiveMore ReconnectIVLReconnectIVLMax RecoveryIVLSendBufHighWM McastLoopRecoveryIVLMsecSwap SocketOptionSystem.ZMQ.InternalTimeoutContextTopic Parameter OnTimeout IdentifierOutBoundInBoundLinkTypeConnectBind CriticalityFatalCriticalErrorOnError_OnErrorDump FetchHelper_' FetchHelper_ FetchHelper' FetchHelperFetch_Fetch PollEntry AccessPointAddressacAddacOs AccessTypeXPeerXPullXPipeXSubXPubXRouterXDealerXClientXServer pollEntry parseLinkidInidOutoutUTF8inUTF8 outStringinStringchainIOechainIOtryIOtryIOenoparam alltopicsnotopicsrvName srvContextpauseresume changeParam changeOption addDevice remDevice changeTimeoutenumWithenumForoncejustfetcherfetcher_fetch1fetch1_ listFetcher listFetcher_fetchFor fetchFor_ fetchJust fetchJust_storeonembOnetoListtoStringappendsinksinkInosinkStreamer Transformer withDevice withQueue withForwarder withPipelinegetStreamSource filterTargetsemitemitPartpasspassByendabsorbmerge putThrough ignoreStream continueHerePeerPipeSubPubClient withServer clientContextsetClientOptions withClientrequestaskForcheckFor pubContext setPubOptionswithPubissuewithPeriodicPubwithSub subContext setSubOptionswithSporadicSubcheckSubwaitSub unsubscribe resubscribe withPuller pipeContextsetPipeOptionswithPipepush peerContextsetPeerOptionswithPeersendreceive mkUniqueIdbaseGHC.BaseStringbytestring-0.9.2.1Data.ByteString.Internal ByteStringghc-prim GHC.TypesIOenumerator-0.4.19Data.Enumerator.InternalIteratee Enumerator GHC.Exception SomeExceptionData.Enumerator throwError Data.MaybeJustMaybeNothingPairPullPushXReqXRepReqRepPollpollIdpollAddpollTypepollLinkpollSubpollOsaccess safeCloselink setSockOsrcvEnumitSendretriestryconifLeft?>ouch $fEqPollEntry$fShowAccessPointXPollxpCtxxpTmoxpMapxpIdsxpPollDevCmdTMOREMADDCommandOPTAPPDEVICERESUMEPAUSESTOPsrvIdstop withServicepollxpoll periodicSendperiodicInt Data.MonoidmappendmemptyMonoidbindconnectSocketGHC.MVarMVar