-- Hoogle documentation, generated by Haddock -- See Hoogle, http://www.haskell.org/hoogle/ -- | Common patterns in message-oriented applications -- -- There are common patterns often reused - or, in fact, reimplemented, -- in many distributed, message-oriented applications, such as -- Server/Client (a.k.a Request/Response), Publish/Subscribe, Pipline -- (a.k.a. Push/Pull) and Exclusive Pair (a.k.a. Peer-to-Peer). The -- Patterns package implements those patterns based on zeromq. More -- information on zeromq can be found at http://www.zeromq.org. -- More examples and a test suite are available on -- http://github.com/toschoo/mom. @package patterns @version 0.0.1 -- | Enumerators for basic patterns module Network.Mom.Patterns.Enumerator -- | Calls an application-defined getter function until this returns -- Nothing; if the getter throws an exception, the enumerator -- returns Error. enumWith :: (i -> IO (Maybe o)) -> i -> Enumerator o IO () -- | Calls the application-defined getter function n times; -- The enumerator receives a pair (Int, Int), where the -- first integer is a counter and the second is the upper bound. n -- is defined as snd - fst, i.e. the counter is incremented -- until it reaches the value of the bound. The counter must be a value -- less than the bound to avoid protocol errors, i.e. the -- getter must be called at least once. The current value of the -- counter and additional input are passed to the getter. if the -- getter throws an exception, the enumerator returns Error. enumFor :: (Int -> i -> IO o) -> (Int, Int) -> i -> Enumerator o IO () -- | Calls the application-defined getter function once; the -- enumerator must return a value (the result type is not Maybe), -- otherwise, the sending iteratee has nothing to send which would most -- likely result in a protocol error. if the getter throws an exception, -- the enumerator returns Error. once :: (i -> IO o) -> i -> Enumerator o IO () -- | Passes just the input value to iteratee; -- --
--   just "hello world"
--   
-- -- hence, reduces to just hello world sent over the wire. just :: o -> Enumerator o IO () -- | Enumerator to process data segments of type o; receives -- the Context, the control parameter and an input of type -- i; Fetch is used by Servers that receive -- requests of type i and produce an outgoing stream with segments -- of type o. type Fetch i o = Context -> Parameter -> i -> Enumerator o IO () -- | A variant of Fetch without input type Fetch_ o = Fetch () o -- | A function that may be used with some of the fetchers; The helper -- returns Nothing to signal that no more data are available and -- Just o to continue the stream. FetchHelpers are used -- with Servers that receive requests of type i. The -- function receives the Context, the conrol parameter and an -- input of type i; type FetchHelper i o = Context -> Parameter -> i -> IO (Maybe o) -- | A variant of FetchHelper that returns type o instead of -- Maybe o. Please note that ' does not mean -- strict, here; it just means that the result is not a -- Maybe. type FetchHelper' i o = Context -> Parameter -> i -> IO o -- | A variant of FetchHelper without input type FetchHelper_ o = FetchHelper () o -- | A variant of FetchHelper_ that returns type o instead of -- Maybe o. Please note that ' does not mean -- strict, here; it just means that the result is not a -- Maybe. type FetchHelper_' o = FetchHelper' () o -- | Calls the application-defined FetchHelper until it returns -- Nothing; note that the FetchHelper shall return at least -- one Just value to avoid a protocol error. If the -- FetchHelper throws an exception, the fetcher returns -- Error. fetcher :: FetchHelper i o -> Fetch i o -- | A variant of fetcher without input; fetcher_ :: FetchHelper_ o -> Fetch_ o -- | Calls the application-defined FetchHelper' once; If the -- FetchHelper' throws an exception, the fetcher returns -- Error. fetch1 :: FetchHelper' i o -> Fetch i o -- | A variant of fetch1 without input; fetch1_ :: FetchHelper_' o -> Fetch_ o -- | Calls the application-defined getter n times; The -- getter is a variant of FetchHelper' with the current -- value of the counter as additional argument. For more details, refer -- to enumFor. fetchFor :: (Context -> Parameter -> Int -> i -> IO o) -> (Int, Int) -> Fetch i o -- | A variant of fetchFor without input fetchFor_ :: (Context -> Parameter -> Int -> () -> IO o) -> (Int, Int) -> Fetch_ o -- | Passes just the input value to the iteratee; -- --
--   fetchJust "hello world"
--   
-- -- hence, reduces to just "hello world" sent over the wire. Note that the -- input i is ignored. fetchJust :: o -> Fetch i o -- | A variant of fetchJust without input fetchJust_ :: o -> Fetch_ o -- | Calls the iteratee for each element of the input list listFetcher :: Fetch [o] o -- | A variant of listFetcher for services without input; the list, -- in this case, is passed as an additional argument to the fetcher. listFetcher_ :: [o] -> Fetch_ o -- | Returns one value of type i; if the enumerator creates a value, -- this value is returned; otherwise, the input value is returned. one :: i -> Iteratee i IO i -- | Returns one value of type Maybe i; equal to head mbOne :: Iteratee i IO (Maybe i) -- | Returns a list containing all chunks of the stream; equal to -- consume; note that this iteratee causes a space leak and is not -- suitable for huge streams or streams of unknown size. toList :: Iteratee i IO [i] -- | Returns a string containing all chunks of the stream intercalated with -- the input string, e.g.: if the stream consists of the two -- elements "hello" and "world" -- --
--   toString " " 
--   
-- -- returns hello world. Note that this iteratee causes a space -- leak and is not suitable for huge streams or streams of unknown size. toString :: String -> Iteratee String IO String -- | Merges the elements of a stream using mappend; if the stream is -- empty, append returns mempty. The type i must be -- instance of Monoid. Note that this iteratee causes a space leak -- and is not suitable for huge streams or streams of unknown size. append :: Monoid i => Iteratee i IO i -- | Calls the application-defined IO action for each element of the -- stream; The IO action could, for instance, write to an already opened -- file, store values in an MVar or send them through a -- Chan to another thread for further processing. An exception -- thrown in the IO action is re-thrown by throwError. store :: (i -> IO ()) -> Iteratee i IO () -- | Iteratee to process data segments of type i; receives -- the Context and the control parameter type Dump i = Context -> Parameter -> Iteratee i IO () -- | Opens a data sink, dumps the stream into this sink and closes the sink -- when the stream terminates or when an error occurs; the first IO -- action is used to open the sink (of type s), the second closes -- the sink and the third writes one element into the sink. sink :: (Context -> String -> IO s) -> (Context -> String -> s -> IO ()) -> (Context -> String -> s -> i -> IO ()) -> Dump i -- | Variant of sink that uses the first segment of the stream as -- input parameter to open the sink. The first segment, which could -- contain a file name or parameters for an SQL query, is not -- written to the sink. As with sink, the sink is closed when the -- stream terminates or when an error occurs. sinkI :: (Context -> String -> i -> IO s) -> (Context -> String -> s -> IO ()) -> (Context -> String -> s -> i -> IO ()) -> Dump i -- | Similar to sink, but uses a data sink that is opened and closed -- outside the scope of the service or does not need to be opened and -- closed at all; examples may be services that write to MVar or -- Chan. nosink is implemented as a closure of -- store: -- --
--   nosink save ctx p = store (save ctx p)
--   
nosink :: (Context -> String -> i -> IO ()) -> Dump i module Network.Mom.Patterns.Device -- | Starts a device and executes an action that receives a Service -- to control the device -- -- Parameters: -- -- withDevice :: Context -> String -> Parameter -> Timeout -> [PollEntry] -> InBound o -> OutBound o -> OnError_ -> (Parameter -> OnTimeout) -> (Parameter -> Transformer o) -> (Service -> IO a) -> IO a -- | Starts a queue; a queue connects clients with a dealer -- (XDealer), i.e. a load balancer for requests, and -- servers with a router (XRouter) that routes responses back to -- the client. -- -- Parameters: -- -- -- -- withQueue is implemented by means of withDevice as: -- --
--   
--   withQueue ctx name (dealer, ld) (router, lr) onerr act = 
--     withDevice ctx name noparam (-1)
--           [pollEntry "clients" XDealer dealer ld [],
--            pollEntry "server"  XRouter router lr []]
--           return return onerr (_ -> return ()) (_ -> putThrough) act
--   
withQueue :: Context -> String -> (AccessPoint, LinkType) -> (AccessPoint, LinkType) -> OnError_ -> (Service -> IO a) -> IO a -- | Starts a Forwarder; a forwarder connects a publisher and its -- subscribers. Note that the forwarder uses a subscriber -- (XSub) to conntect to the publisher and a -- publisher (XPub) to bind the subscribers. -- -- Parameters: -- -- -- -- withForwarder is implemented by means of withDevice as: -- --
--   
--   withForwarder ctx name topics (sub, pub) onerr act = 
--     withDevice ctx name noparam (-1)
--           [pollEntry "subscriber" XSub router Connect topics,
--            pollEntry "publisher"  XPub dealer Bind    []]
--           return return onerr (_ -> return ()) (_ -> putThrough) act
--   
withForwarder :: Context -> String -> [Topic] -> (AccessPoint, LinkType) -> (AccessPoint, LinkType) -> OnError_ -> (Service -> IO a) -> IO a -- | Starts a pipeline; a pipeline connects a pipe and its -- workers. Note that the pipeline uses a puller -- (XPull) to conntect to the pipe and a pipe -- (XPipe) to bind the pullers. -- -- Parameters: -- -- -- -- withPipeline is implemented by means of withDevice as: -- --
--   
--   withPipeline ctx name topics (puller, l1) (pusher, l2) onerr act =
--     withDevice ctx name noparam (-1)
--           [pollEntry "pull"  XPull puller l1 [],
--            pollEntry "push"  XPush pusher l2 []]
--           return return onerr (_ -> return ()) (_ -> putThrough) act
--   
withPipeline :: Context -> String -> (AccessPoint, LinkType) -> (AccessPoint, LinkType) -> OnError_ -> (Service -> IO a) -> IO a -- | A poll entry describes how to handle an AccessPoint data PollEntry -- | Creates a PollEntry; -- -- Parameters: -- -- pollEntry :: Identifier -> AccessType -> AccessPoint -> LinkType -> [Topic] -> PollEntry -- | Defines the type of a PollEntry; the names of the constructors -- are similar to ZMQ socket types but with some differences to keep the -- terminology in line with basic patterns. The leading "X" stands for -- "Access" (not for "eXtended" as in XRep and XReq). data AccessType -- | Represents a server and expects connections from clients; should be -- used with Bind; corresponds to ZMQ Socket Type Rep XServer :: AccessType -- | Represents a client and connects to a server; should be used with -- Connect; corresponds to ZMQ Socket Type Req XClient :: AccessType -- | Represents a load balancer, expecting connections from clients; should -- be used with Bind; corresponds to ZMQ Socket Type XRep XDealer :: AccessType -- | Represents a router expecting connections from servers; should be used -- with Bind; corresponds to ZMQ Socket Type XReq XRouter :: AccessType -- | Represents a publisher; should be used with Bind; corresponds -- to ZMQ Socket Type Pub XPub :: AccessType -- | Represents a subscriber; should be used with Connect; -- corresponds to ZMQ Socket Type Sub XSub :: AccessType -- | Represents a Pipe; should be used with Bind; corresponds to ZMQ -- Socket Type Push XPipe :: AccessType -- | Represents a Puller; should be used with Connect; corresponds -- to ZMQ Socket Type Pull XPull :: AccessType -- | Represents a Peer; corresponding peers must use complementing -- LinkType; corresponds to ZMQ Socket Type Pair XPeer :: AccessType -- | Adds a PollEntry to a device; the Service, of course, -- must be a device, the command is otherwise ignored. addDevice :: Service -> PollEntry -> IO () -- | Removes a PollEntry from a device; the Service, of -- course, must be a device, the command is otherwise ignored. remDevice :: Service -> Identifier -> IO () -- | Changes the timeout of a device; the Service, of course, must -- be a device, the command is otherwise ignored. changeTimeout :: Service -> Timeout -> IO () -- | Holds information on streams and the current state of the device; -- streamers are passed to transformers. data Streamer o -- | Retrieves the identifier of the source of the current stream getStreamSource :: Streamer o -> Identifier -- | Filters target streams; the function resembles filter of -- List: it receives the property of an Identifier; if a -- PollEntry has this property, it is added to the result set. -- -- The function is intended to select targets for an out-going stream, -- typically based on the identifier of the source stream. The following -- example selects all poll entries, but the source: -- --
--   broadcast :: Streamer o -> [Identifier]
--   broadcast s = filterTargets s notSource
--     where notSource = (/=) (getStreamSource s)
--   
filterTargets :: Streamer o -> (Identifier -> Bool) -> [Identifier] -- | A transformer is an Iteratee to transform streams. It receives -- two arguments: -- -- -- -- Streamer and sequence keep track of the current transformation. The -- streamer knows where the stream comes from and may be queried about -- other streams in the device. type Transformer o = Streamer o -> Seq o -> Iteratee o IO () -- | Transformer that passes messages one-to-one to all poll entries but -- the current source putThrough :: Transformer a -- | Transformer that ignores the remainder of the current stream; it is -- usually used to terminate a transformer. ignoreStream :: Transformer a -- | Transformer that does nothing but continuing the transformer, from -- which it is called and, hence, is identical to return (); it is -- usually passed to a transformer combinator, like emit, to -- continue processing right here instead of recursing into another -- transformer. continueHere :: Transformer a -- | Sends all sequence elements to the targets identified by the list of -- Identifier and terminates the outgoing stream. The -- transformation continues with the transformer passed in and an empty -- sequence. emit :: Streamer o -> [Identifier] -> Seq o -> Transformer o -> Iteratee o IO () -- | Sends all sequence elements to the targets identified by the list of -- Identifier, but unlike emit, does not terminate the -- outgoing stream. The transformation continues with the transformer -- passed in and an empty sequence. -- -- Note that all outgoing streams, once started, have to be terminated -- before the transformer ends. Otherwise, a protocol error will occur. emitPart :: Streamer o -> [Identifier] -> Seq o -> Transformer o -> Iteratee o IO () -- | Sends one element (o) to the targets and continues with an -- empty sequence; the Boolean parameter determines whether this is the -- last message to send. -- -- Note that all outgoing streams, once started, have to be terminated -- before the transformer ends. Otherwise, a protocol error will occur. pass :: Streamer o -> [Identifier] -> o -> Bool -> Transformer o -> Iteratee o IO () -- | Sends one element (o) to the targets, but, unlike pass, -- passes the sequence to the transformer. passBy does not -- terminate the outgoing stream. passBy :: Streamer o -> [Identifier] -> o -> Seq o -> Transformer o -> Iteratee o IO () -- | Terminates the outgoing stream by sending the new element as last -- segment to all targets and ends the transformer by ignoring the rest -- of the incoming stream. end :: Streamer o -> [Identifier] -> o -> Iteratee o IO () -- | Adds a new element to the sequence and calls the transformer without -- sending anything absorb :: Streamer o -> o -> Seq o -> Transformer o -> Iteratee o IO () -- | Merges the new element with the last element of the sequence; if the -- sequence is currently empty, the new element will be its only member. -- Merged elements appear as one element of the sequence in the -- continuation of the transformation. The type o must be a -- Monoid, i.e., it must implement mappend and -- mempty. The function does not send anything. merge :: Monoid o => Streamer o -> o -> Seq o -> Transformer o -> Iteratee o IO () -- | A device identifier is just a plain String type Identifier = String -- | A timeout action is just an IO action without arguments type OnTimeout = IO () -- | Basic communication patterns module Network.Mom.Patterns.Basic -- | Starts one or more server threads and executes an action that receives -- a Service to control the server. The Service is a thread -- local resource. It must not be passed to threads forked from the -- thread that has started the service. The Service is valid only -- in the scope of the action. When the action terminates, the server is -- automatically stopped. During the action, the server can be paused and -- restarted. Also, the SocketOption of the underlying ZMQ -- Socket can be changed. Please refer to pause, -- resume and changeOption for more details. -- -- The application may implement control parameters. Control parameters -- are mere strings that are passed to the application call-backs. It is -- up to the application to enquire these strings and to implement -- different behaviour for the possible settings. Control parameter can -- be changed during run-time by means of changeParam. -- -- Parameters: -- -- -- -- The following code fragment shows a simple server to process data base -- queries using standard converters and error handlers not further -- defined here: -- --
--   withContext 1 $ \ctx -> do
--      c <- connectODBC "DSN=xyz"  -- some ODBC connection
--      s <- prepare c "select ..." -- some database query 
--      withServer ctx 
--          "MyQuery" -- name of the server is "MyQuery"
--          noparam     -- no parameter
--          5           -- five worker threads
--          (Address "tcp://*:5555" []) Bind -- bind to this address
--          iconv oconv -- some standard converters
--          onErr       -- some standard error handler
--          (\_  -> one []) -- Iteratee for single segment messages;
--                           -- refer to Enumerator for details
--          (dbFetcher s) $ \srv -> -- the Enumerator;
--            untilInterrupt $ do -- install a signal handler for SIGINT
--                                -- and repeat the following action
--                                -- until SIGINT is received;
--              putStrLn $ "server " ++ srvName srv ++ 
--                         " up and running..."
--              threadDelay 1000000
--   
-- -- The untilInterrupt loop may be implemented as follows: -- --
--   untilInterrupt :: IO () -> IO ()
--        untilInterrupt run = do
--          continue <- newMVar True
--          _ <- installHandler sigINT (Catch $ handler continue) Nothing
--          go continue 
--         where handler m = modifyMVar_ m (\_ -> return False)
--               go      m = do run
--                              continue <- readMVar m
--                              when continue $ go m
--   
-- -- Finally, a simple dbFetcher: -- --
--   dbFetcher :: SQL.Statement -> Fetch [SQL.SqlValue] String
--   dbFetcher s _ _ _ stp = tryIO (SQL.execute s []) >>= \_ -> go stp
--     where go step = 
--             case step of
--               E.Continue k -> do
--                 mbR <- tryIO $ SQL.fetchRow s
--                 case mbR of
--                   Nothing -> E.continue k
--                                      -- convRow is not defined here
--                   Just r  -> go $$ k (E.Chunks [convRow r]) 
--               _ -> E.returnI step
--   
withServer :: Context -> String -> Parameter -> Int -> AccessPoint -> LinkType -> InBound c -> OutBound o -> OnError -> (String -> Iteratee c IO i) -> Fetch i o -> (Service -> IO a) -> IO a -- | Client data type data Client i o -- | Obtaining the Context from Client clientContext :: Client i o -> Context -- | Setting SocketOption to the underlying ZMQ Socket setClientOptions :: Client i o -> [SocketOption] -> IO () -- | Creates a Client; a client is not a background process like a -- server, but a data type that provides functions to interoperate with a -- server. withClient creates a client and invokes the -- application-defined action, which receives a Client argument. -- The lifetime of the Client is limited to the invoked action. -- When the action terminates, the Client dies. -- -- Parameters: -- -- withClient :: Context -> AccessPoint -> OutBound o -> InBound i -> (Client i o -> IO a) -> IO a -- | Synchronously requesting a service; the function blocks the current -- thread, until a reply is received. -- -- Parameters: -- -- -- -- A simple client that just writes the results to stdout: -- --
--   rcv :: String -> IO ()
--   rcv req = withContext 1 $ \ctx -> 
--     withClient ctx 
--       (Address "tcp://localhost:5555" []) -- connect to this address
--       (return . B.pack) (return . B.unpack) $ -- string converters
--       \s -> do
--         -- request with enum and outit
--         ei <- request s (enum req) outit      
--         case ei of
--           Left e  -> putStrLn $ "Error: " ++ show (e::SomeException)
--           Right _ -> return ()
--   
-- --
--   -- Enumerator that returns just one string
--   enum :: String -> E.Enumerator String IO ()
--   enum = once (return . Just)
--   
-- --
--   -- Iteratee that just writes to stdout
--   outit :: E.Iteratee String IO ()
--   outit = do
--     mbi <- EL.head
--     case mbi of
--       Nothing -> return ()
--       Just i  -> liftIO (putStrLn i) >> outit
--   
-- -- Note that this code just issues one request, which is not the most -- typical use case. It is more likely that the action will loop for ever -- and receive requests, for instance, from a user interface. request :: Client i o -> Enumerator o IO () -> Iteratee i IO a -> IO (Either SomeException a) -- | Asynchronously requesting a service; the function sends a request to -- the server without waiting for a result. -- -- Parameters: -- -- askFor :: Client i o -> Enumerator o IO () -> IO () -- | Polling for a reply; the function polls for a server request. If -- nothing has been received, it returns Nothing; otherwise it -- returns Just the result or an error. -- -- Parameters: -- -- -- -- The synchronous request (see request) could be implemented -- asynchronously like: -- --
--   rcv :: String -> IO ()
--   rcv req = withContext 1 $ \ctx -> do
--     let ap = address l "tcp://localhost:5555" []
--     withClient ctx ap 
--       (return . B.pack) (return . B.unpack) 
--       $ \s -> do
--         ei <- try $ askFor s (enum req)
--         case ei of
--           Left  e -> putStrLn $ "Error: " ++ show (e::SomeException)
--           Right _ -> wait s
--     -- check for results periodically 
--     where wait s = checkFor s outit >>= \mbei ->
--             case mbei of
--               Nothing        -> do putStrLn "Waiting..."
--                                    threadDelay 10000 >> wait s
--               Just (Left e)  -> putStrLn $ "Error: " ++ show e
--               Just (Right _) -> putStrLn "Ready!"
--   
checkFor :: Client i o -> Iteratee i IO a -> IO (Maybe (Either SomeException a)) -- | Publisher data Pub o -- | Obtaining the Context from Pub pubContext :: Pub o -> Context -- | Setting SocketOption to the underlying ZMQ Socket setPubOptions :: Pub o -> [SocketOption] -> IO () -- | Creates a publisher; A publisher is a data type that provides an -- interface to publish data to subscribers. withPub creates a -- publisher and invokes an application-defined action, which receives a -- Pub argument. The lifetime of the publisher is limited to the -- action. When the action terminates, the publisher dies. -- -- Parameter: -- -- withPub :: Context -> AccessPoint -> OutBound o -> (Pub o -> IO a) -> IO a -- | Publishes the data stream created by an enumerator; -- -- Parameters: -- -- -- -- A simple weather report publisher: -- --
--   withContext 1 $ \ctx -> withPub ctx
--       (Address "tcp://*:5555" [])
--       (return . B.pack) $ \pub -> untilInterrupt $ do
--         issue pub (once weather noparam)
--         threadDelay 10000 -- update every 10ms
--   
-- --
--   -- fake weather report with some random values
--   weather :: String -> IO (Maybe String)
--   weather _ = do
--       zipcode     <- randomRIO (10000, 99999) :: IO Int
--       temperature <- randomRIO (-10, 30) :: IO Int
--       humidity    <- randomRIO ( 10, 60) :: IO Int
--       return $ Just (unwords [show zipcode, 
--                               show temperature, 
--                               show humidity])
--   
issue :: Pub o -> Enumerator o IO () -> IO () -- | Creates a background process that periodically publishes data; -- -- Parameters: -- -- -- -- The weather report publisher introduced above (see withPub) can -- be implemented by means of withPeriodicPub as: -- --
--   withPeriodicPub ctx "Weather Report" noparam 
--     100000 -- publish every 100ms
--     (Address "tcp://*:5555" []) 
--     (return . B.pack) -- string converter
--     onErr_            -- standard error handler
--     (\_ -> fetch1 fetch) -- creates one instance
--                           -- of the return of "fetch";
--                           -- see Enumerator for details
--     $ \pub -> 
--       untilInterrupt $ do -- until SIGINT, see withServer for details
--         threadDelay 100000
--         putStrLn $ "I am doing nothing " ++ srvName pub
--   
withPeriodicPub :: Context -> String -> Parameter -> Timeout -> AccessPoint -> OutBound o -> OnError_ -> Fetch_ o -> (Service -> IO a) -> IO a -- | A subscription is a background service that receives and processes -- data streams from a publisher. A typical use case is an application -- that operates on periodically updated data; the subscriber would -- receive these data and and make them accessible to other threads in -- the process through an MVar. -- -- Parameters: -- -- -- -- Weather Report Subscriber: -- --
--   withContext 1 $ \ctx -> 
--     withSub ctx "Weather Report" noparam 
--             ["10001"] -- zipcode to subscribe to
--             (Address "tcp://localhost:5555" []) 
--             (return . B.unpack) 
--             onErr_ output -- Iteratee that just writes to stdout
--             $ \s -> untilInterrupt $ do
--               putStrLn $ "Doing nothing " ++ srvName s
--               threadDelay 1000000
--   
withSub :: Context -> String -> Parameter -> [Topic] -> AccessPoint -> InBound i -> OnError_ -> Dump i -> (Service -> IO a) -> IO a -- | An alternative to the background subscriber (see withSub); data Sub i -- | Obtaining the Context from Sub subContext :: Sub i -> Context -- | Setting SocketOption to the underlying ZMQ Socket setSubOptions :: Sub i -> [SocketOption] -> IO () -- | Similar to Pub, a Sub is a data type that provides an -- interface to subscribe data. withSporadicSub creates a -- subscriber and invokes an application-defined action, which receives a -- Sub argument. The lifetime of the subscriber is limited to the -- action. When the action terminates, the subscriber dies. withSporadicSub :: Context -> AccessPoint -> InBound i -> [Topic] -> (Sub i -> IO a) -> IO a -- | Polling for data; If nothing has been received, the function returns -- Nothing; otherwise it returns Just the result or an -- error. -- -- Parameters: -- -- checkSub :: Sub i -> Iteratee i IO a -> IO (Maybe (Either SomeException a)) -- | Waiting for data; the function blocks the current thread, until data -- are being received from the publisher. It returns either -- SomeException or the result. -- -- Parameters: -- -- waitSub :: Sub i -> Iteratee i IO a -> IO (Either SomeException a) -- | Unsubscribe a topic unsubscribe :: Sub i -> Topic -> IO () -- | Subscribe another topic resubscribe :: Sub i -> Topic -> IO () -- | A pipeline consists of a "pusher" and a set of workers ("pullers"). -- The pusher sends jobs down the pipeline that will be assigned to one -- of the workers. The pipeline pattern is, thus, a work-balancing -- scheme. data Pipe o -- | Obtaining the Context from Pipe pipeContext :: Pipe o -> Context -- | Setting SocketOption to the underlying ZMQ Socket setPipeOptions :: Pipe o -> [SocketOption] -> IO () -- | Creates a pipeline; a Pipe is a data type that provides an -- interface to push a data stream to workers connected to the -- other side of the pipe. withPipe creates a pipeline and invokes -- an application-defined action which receives a Pipe argument. -- The lifetime of the Pipe is limited to the action. When the -- action terminates, the Pipe dies. -- -- Parameters: -- -- withPipe :: Context -> AccessPoint -> OutBound o -> (Pipe o -> IO a) -> IO a -- | Sends a job down the pipeline; -- -- Parameters: -- -- -- -- A simple pusher: -- --
--   sendF :: FilePath -> IO ()
--   sendF f = withContext 1 $ \ctx -> do
--    let ap = Address "tcp://*:5555" []
--    withPipe ctx ap return $ \p ->
--      push pu (EB.enumFile f) -- file enumerator
--                              -- see Data.Enumerator.Binary (EB)
--   
push :: Pipe o -> Enumerator o IO () -> IO () -- | A puller is a background service that receives and processes data -- streams from a pipeline. -- -- Parameters: -- -- -- -- A worker that just writes the incoming stream to stdout: -- --
--   withContext 1 $ \ctx -> 
--     withPuller ctx "Worker" noparam 
--           (Address "tcp://localhost:5555" [])
--           (return . B.unpack)
--           onErr_ output
--           $ \s -> untilInterrupt $ do
--             putStrLn "Doing nothing " ++ srvName s
--             threadDelay 100000
--   
withPuller :: Context -> String -> Parameter -> AccessPoint -> InBound i -> OnError_ -> Dump i -> (Service -> IO a) -> IO a -- | An Exclusive Pair is a general purpose pattern of two equal peers that -- communicate with each other by sending (send) and receiving -- (receive) data. One of the peers has to bind the -- AccessPoint the other connects to it. data Peer a -- | Obtains the Context from a Peer peerContext :: Peer a -> Context -- | Sets SocketOption setPeerOptions :: Peer a -> [SocketOption] -> IO () -- | Creates a Peer; a peer is a data type that provides an -- interface to exchange data with another peer. withPeer creates -- the peer and invokes an application-defined action that receives a -- Peer argument. The lifetime of the Peer is limited to -- the action. When the action terminates, the Peer dies. -- -- Parameters: -- -- withPeer :: Context -> AccessPoint -> LinkType -> InBound a -> OutBound a -> (Peer a -> IO b) -> IO b -- | Sends a data stream to another peer; -- -- Parameters: -- -- send :: Peer o -> Enumerator o IO () -> IO () -- | Receives a data stream from another peer; -- -- Parameters: -- -- receive :: Peer i -> Iteratee i IO a -> IO (Either SomeException a) -- | Describes how to access a service; an AccessPoint usually -- consists of an address and a list of SocketOption. Addresses -- are passed in as strings of the form: -- -- -- -- For more options, please refer to the zeromq documentation. data AccessPoint Address :: String -> [SocketOption] -> AccessPoint -- | Address string acAdd :: AccessPoint -> String -- | Socket options acOs :: AccessPoint -> [SocketOption] -- | How to link to an AccessPoint data LinkType -- | Bind the address Bind :: LinkType -- | Connect to the address Connect :: LinkType -- | Safely read LinkType; ignores the case of the input string and, -- besides "bind" and "connect", also accepts "bin", "con" and "conn"; -- intended for use with command line parameters parseLink :: String -> Maybe LinkType -- | Converters are user-defined functions that convert a ByteString -- to a value of type a (InBound) or a value of type -- a to ByteString (OutBound). Converters are, -- hence, similar to put and get in the Binary -- monad. The reason for using explicit, user-defined converters instead -- of Binary encode and decode is that the -- conversion may be more complex, involving reading configurations or -- other IO actions. -- -- The simplest possible in-bound converter for plain strings is: -- --
--   let iconv = return . toString
--   
type InBound a = ByteString -> IO a -- | A simple string OutBound converter may be: -- --
--   let oconv = return . fromString
--   
type OutBound a = a -> IO ByteString -- | InBound ByteString -> ByteString idIn :: InBound ByteString -- | OutBound ByteString -> ByteString idOut :: OutBound ByteString -- | InBound ByteString -> String inString :: InBound String -- | OutBound String -> ByteString outString :: OutBound String -- | InBound ByteString -> UTF8 String inUTF8 :: InBound String -- | OutBound UTF8 String -> ByteString outUTF8 :: OutBound String -- | Indicates criticality of the error event data Criticality -- | The current operation (e.g. processing a request) has not -- terminated properly, but the service is able to continue; the error -- may have been caused by a faulty request or other temporal conditions. -- Note that if an application-defined Iteratee or -- Enumerator results in SomeException (by means of -- throwError), the incident is classified as Error; if it -- throws an IO Error, however, the incident is classified as -- Fatal. Error :: Criticality -- | One worker thread is lost (Server only) Critical :: Criticality -- | The service cannot recover and will terminate Fatal :: Criticality -- | Error handler for servers; receives the Criticality of the -- error event, the exception, the server name and the service control -- parameter. If the error handler returns Just a -- ByteString this ByteString is sent to the client as -- error message. -- -- A good policy for implementing servers is to terminate or restart the -- Server when a Fatal or Critical error occurs -- and to send an error message to the client on a plain Error. -- The error handler, additionally, may log the incident or inform an -- administrator. type OnError = Criticality -> SomeException -> String -> Parameter -> IO (Maybe ByteString) -- | Error handler for all services but servers; receives the -- Criticality of the error event, the exception, the service name -- and the service control parameter. -- -- A good policy is to terminate or restart the service when a -- Fatal error occurs and to continue, if possible, on a plain -- Error. The error handler, additionally, may log the incident or -- inform an administrator. type OnError_ = Criticality -> SomeException -> String -> Parameter -> IO () -- | Chains IO Actions in an Enumerator together; throws -- SomeException using throwError when an error occurs chainIO :: IO a -> (a -> Iteratee b IO c) -> Iteratee b IO c -- | Chains IO Actions in an Enumerator together; returns -- Error when an error occurs chainIOe :: IO a -> (a -> Iteratee b IO c) -> Iteratee b IO c -- | Executes an IO Actions in an Iteratee; throws -- SomeException using throwError when an error occurs tryIO :: IO a -> Iteratee i IO a -- | Executes an IO Actions in an Iteratee; returns Error -- when an error occurs tryIOe :: IO a -> Iteratee i IO a -- | Generic Service data type; Service is passed to -- application-defined actions used with background services, namely -- withServer, withPeriodicPub, withSub, withPuller and withDevice. data Service -- | Obtains the service name srvName :: Service -> String -- | Obtains the Context from Service srvContext :: Service -> Context -- | Pauses the Service pause :: Service -> IO () -- | Resumes the Service resume :: Service -> IO () -- | Changes the Service control parameter changeParam :: Service -> Parameter -> IO () -- | Changes SocketOption changeOption :: Service -> SocketOption -> IO () -- | A 0MQ context representation. data Context :: * -- | Run an action with a 0MQ context. The Context supplied to your -- action will not be valid after the action either returns or -- throws an exception. withContext :: Size -> (Context -> IO a) -> IO a -- | The option to set on 0MQ sockets (cf. zmq_setsockopt and -- zmq_getsockopt manpages for details). data SocketOption :: * -- | ZMQ_AFFINITY Affinity :: Word64 -> SocketOption -- | ZMQ_BACKLOG Backlog :: CInt -> SocketOption -- | ZMQ_EVENTS Events :: PollEvent -> SocketOption -- | ZMQ_FD FD :: CInt -> SocketOption -- | ZMQ_IDENTITY Identity :: String -> SocketOption -- | ZMQ_LINGER Linger :: CInt -> SocketOption -- | ZMQ_RATE Rate :: Int64 -> SocketOption -- | ZMQ_RCVBUF ReceiveBuf :: Word64 -> SocketOption -- | ZMQ_RCVMORE ReceiveMore :: Bool -> SocketOption -- | ZMQ_RECONNECT_IVL ReconnectIVL :: CInt -> SocketOption -- | ZMQ_RECONNECT_IVL_MAX ReconnectIVLMax :: CInt -> SocketOption -- | ZMQ_RECOVERY_IVL RecoveryIVL :: Int64 -> SocketOption -- | ZMQ_SNDBUF SendBuf :: Word64 -> SocketOption -- | ZMQ_HWM HighWM :: Word64 -> SocketOption -- | ZMQ_MCAST_LOOP McastLoop :: Bool -> SocketOption -- | ZMQ_RECOVERY_IVL_MSEC RecoveryIVLMsec :: Int64 -> SocketOption -- | ZMQ_SWAP Swap :: Int64 -> SocketOption -- | Subscription Topic type Topic = String -- | Subscribe to all topics alltopics :: [Topic] -- | Subscribe to no topic notopic :: [Topic] type Timeout = Int64 -- | Control Parameter type Parameter = String -- | Ignore parameter noparam :: Parameter -- | This package implements communication patterns that are often used in -- distributed applications. The package implements a set of basic -- patterns and a device to connect basic patterns through routers, -- brokers, load balancers, etc. The package is based on the -- zeromq library, but, in some cases, deviates from the -- zeromq terminology. More information on zeromq can be -- found at http://www.zeromq.org. module Network.Mom.Patterns