------------------------------------------------------------------------------- -- | -- Module : Network/Mom/Patterns/Basic.hs -- Copyright : (c) Tobias Schoofs -- License : LGPL -- Stability : experimental -- Portability: portable -- -- Basic communication patterns ------------------------------------------------------------------------------- module Network.Mom.Patterns.Basic ( -- * Server/Client withServer, Client, clientContext, setClientOptions, withClient, request, askFor, checkFor, -- * Publish/Subscribe Pub, pubContext, setPubOptions, withPub, issue, withPeriodicPub, withSub, Sub, subContext, setSubOptions, withSporadicSub, checkSub, waitSub, unsubscribe, resubscribe, -- * Pipeline Pipe, pipeContext, setPipeOptions, withPipe, push, withPuller, -- * Exclusive Pair Peer, peerContext, setPeerOptions, withPeer, send, receive, -- * Service Access Point AccessPoint(..), LinkType(..), parseLink, -- * Converters InBound, OutBound, idIn, idOut, inString, outString, inUTF8, outUTF8, -- * Errors and Error Handlers Criticality(..), OnError, OnError_, chainIO, chainIOe, tryIO, tryIOe, -- * Generic Serivce Service, srvName, srvContext, pause, resume, changeParam, changeOption, -- * ZMQ Context Z.Context, Z.withContext, Z.SocketOption(..), -- * Helpers Topic, alltopics, notopic, Timeout, Parameter, noparam) where import Types import Service import Factory import Network.Mom.Patterns.Device import qualified Data.ByteString.Char8 as B import qualified Data.Enumerator as E import Data.Enumerator (($$)) import Control.Concurrent import Control.Applicative ((<$>)) import Control.Monad import Prelude hiding (catch) import Control.Exception (SomeException, catch, try, finally) import qualified System.ZMQ as Z ------------------------------------------------------------------------ -- | Starts one or more server threads -- and executes an action that -- receives a 'Service' to control the server. -- The 'Service' is a thread local resource. -- It must not be passed to threads forked -- from the thread that has started the service. -- The 'Service' is valid only in the scope of the action. -- When the action terminates, the server is automatically stopped. -- During the action, the server can be paused and restarted. -- Also, the 'SocketOption' of the underlying ZMQ 'Z.Socket' -- can be changed. -- Please refer to 'pause', 'resume' and 'changeOption' for more details. -- -- The application may implement control parameters. -- Control parameters are mere strings that are passed -- to the application call-backs. -- It is up to the application to enquire these strings -- and to implement different behaviour for the possible settings. -- Control parameter can be changed during run-time -- by means of 'changeParam'. -- -- Parameters: -- -- * 'Z.Context': The ZMQ context; -- -- * 'String': The name of the server, useful for debugging; -- -- * 'Parameter': The initial value of the control parameter -- passed to all application call-backs; -- -- * 'Int': The number of worker threads; -- note that a server with only one thread -- handles client requests sequentially. -- The number of threads -- (together with the number of hardware processing resources) -- defines how many client requests can be processed in parallel. -- -- * 'AccessPoint': The access point, -- through which this server can be reached; -- -- * 'LinkType': The link type; -- standalone servers usually bind their access point, -- whereas clients connect to it. -- Instead, a server may also connect -- to a load-balancing device, -- to which other servers and clients connect -- (see 'withDevice' and 'withQueue'). -- -- * 'InBound': The converter to convert the incoming -- data stream (of type 'B.ByteString') -- into a client request component. -- Note that the converter converts -- single message segments to components of type /c/. -- The 'E.Iteratee', receiving this /c/-typed -- elements shall combine them -- to a complete request of type /i/, -- which is then processed by an 'E.Enumerator' -- to create the server response. -- -- * 'OutBound': The converter to convert the results of type /o/ -- to a 'B.ByteString', which then is sent -- back to the client. -- -- * 'OnError': The error handler -- -- * 'String' -> 'E.Iteratee': The 'E.Iteratee' that processes -- request components of type /c/ -- and yields a request of type /i/. -- The 'String' argument is -- the control parameter, -- whose logic is implemented -- by the application. -- -- * 'Fetch': The 'E.Enumerator' that processes -- the request of type /i/ to produce -- results of type /o/. -- -- * 'Service' -> IO (): The action to invoke, -- when the server has been started; -- the service is used to control the server. -- -- The following code fragment shows a -- simple server to process data base queries -- using standard converters and error handlers -- not further defined here: -- -- @ -- withContext 1 $ \\ctx -> do -- c <- connectODBC \"DSN=xyz\" -- some ODBC connection -- s <- prepare c \"select ...\" -- some database query -- withServer ctx -- \"MyQuery\" -- name of the server is \"MyQuery\" -- noparam -- no parameter -- 5 -- five worker threads -- (Address \"tcp:\/\/*:5555\" []) Bind -- bind to this address -- iconv oconv -- some standard converters -- onErr -- some standard error handler -- (\\_ -> one []) -- 'E.Iteratee' for single segment messages; -- -- refer to 'Enumerator' for details -- (dbFetcher s) $ \\srv -> -- the 'E.Enumerator'; -- untilInterrupt $ do -- install a signal handler for /SIGINT/ -- -- and repeat the following action -- -- until /SIGINT/ is received; -- putStrLn $ \"server \" ++ srvName srv ++ -- \" up and running...\" -- threadDelay 1000000 -- @ -- -- The untilInterrupt loop may be implemented as follows: -- -- @ -- -- untilInterrupt :: IO () -> IO () -- untilInterrupt run = do -- continue <- newMVar True -- _ <- installHandler sigINT (Catch $ handler continue) Nothing -- go continue -- where handler m = modifyMVar_ m (\\_ -> return False) -- go m = do run -- continue <- readMVar m -- when continue $ go m -- @ -- -- Finally, a simple dbFetcher: -- -- @ -- dbFetcher :: SQL.Statement -> Fetch [SQL.SqlValue] String -- dbFetcher s _ _ _ stp = tryIO (SQL.execute s []) >>= \\_ -> go stp -- where go step = -- case step of -- E.Continue k -> do -- mbR <- tryIO $ SQL.fetchRow s -- case mbR of -- Nothing -> E.continue k -- -- convRow is not defined here -- Just r -> go $$ k (E.Chunks [convRow r]) -- _ -> E.returnI step -- @ ------------------------------------------------------------------------ withServer :: Z.Context -> String -> Parameter -> Int -> AccessPoint -> LinkType -> InBound c -> OutBound o -> OnError -> (String -> E.Iteratee c IO i) -> Fetch i o -> (Service -> IO a) -> IO a withServer ctx name param n ac t iconv oconv onerr build fetch action = withService ctx name param service action where service = serve n ac t iconv oconv onerr build fetch ------------------------------------------------------------------------ -- the server implementation ------------------------------------------------------------------------ serve :: Int -> AccessPoint -> LinkType -> InBound c -> OutBound o -> OnError -> (String -> E.Iteratee c IO i) -> Fetch i o -> Z.Context -> String -> String -> String -> IO () -> IO () serve n ac t iconv oconv onerr build fetch ctx name sockname param ready ------------------------------------------------------------------------ -- prepare service for single client ------------------------------------------------------------------------ | n <= 1 = ( Z.withSocket ctx Z.Rep $ \client -> do link t ac client Z.withSocket ctx Z.Sub $ \cmd -> do trycon cmd sockname retries Z.subscribe cmd "" ready poll False [Z.S cmd Z.In, Z.S client Z.In] (go client) param) `catch` (\e -> onerr Fatal e name param >>= \_ -> return ()) ------------------------------------------------------------------------ -- prepare service for multiple clients ------------------------------------------------------------------------ | otherwise = (do add <- ("inproc://wrk_" ++) <$> show <$> mkUniqueId as <- replicateM n newEmptyMVar zs <- replicateM n newEmptyMVar withQueue ctx ("Queue " ++ name) (ac, t) (Address add [], Bind) onQErr $ \_ -> do _ <- mapM (\(a,z) -> start add a z) (zip as zs) mapM_ takeMVar as -- wait for workers to start ready -- report state to service mapM_ takeMVar zs) -- wait for workers to terminate `catch` (\e -> onerr Fatal e name param >>= \_ -> return ()) ------------------------------------------------------------------------ -- start thread ------------------------------------------------------------------------ where start add a z = forkIO (startWork add a `finally` putMVar z ()) ------------------------------------------------------------------------ -- start worker for multiple clients ------------------------------------------------------------------------ startWork add starter = Z.withSocket ctx Z.Rep $ \worker -> (do trycon worker add retries Z.withSocket ctx Z.Sub $ \cmd -> do trycon cmd sockname retries Z.subscribe cmd noparam putMVar starter () poll False [Z.S cmd Z.In, Z.S worker Z.In] (go worker) param) `catch` (\e -> onerr Critical e name param >>= \_ -> return ()) ------------------------------------------------------------------------ -- receive requests and do the job ------------------------------------------------------------------------ go worker p = do ei <- E.run (rcvEnum worker iconv $$ build p) ifLeft ei (\e -> handle worker e p) $ \i -> catch (body worker p i) (\e -> handle worker e p) body worker p i = do eiR <- E.run (fetch ctx p i $$ itSend worker oconv) ifLeft eiR (\e -> handle worker e p) (\_ -> return ()) ------------------------------------------------------------------------ -- generic error handler ------------------------------------------------------------------------ handle sock e p = onerr Error e name p >>= \mbX -> case mbX of Nothing -> Z.send sock B.empty [] Just x -> do Z.send sock x [Z.SndMore] Z.send sock B.empty [] onQErr c e nm _ = onerr c e nm noparam >>= \_ -> return () ------------------------------------------------------------------------ -- | Client data type ------------------------------------------------------------------------ data Client i o = Client { cliCtx :: Z.Context, cliSock :: Z.Socket Z.Req, cliAdd :: AccessPoint, cliOut :: OutBound o, cliIn :: InBound i} ------------------------------------------------------------------------ -- | Obtaining the 'Z.Context' from 'Client' ------------------------------------------------------------------------ clientContext :: Client i o -> Z.Context clientContext = cliCtx ------------------------------------------------------------------------ -- | Setting 'Z.SocketOption' to the underlying ZMQ 'Z.Socket' ------------------------------------------------------------------------ setClientOptions :: Client i o -> [Z.SocketOption] -> IO () setClientOptions c = setSockOs (cliSock c) ------------------------------------------------------------------------ -- | Creates a 'Client'; -- a client is not a background process like a server, -- but a data type that provides functions -- to interoperate with a server. -- 'withClient' creates a client and -- invokes the application-defined action, -- which receives a 'Client' argument. -- The lifetime of the 'Client' is limited -- to the invoked action. -- When the action terminates, the 'Client' /dies/. -- -- Parameters: -- -- * 'Z.Context': The ZMQ Context; -- -- * 'AccessPoint': The access point, to which the client connects; -- -- * 'OutBound': Converter to convert a request from type /o/ -- to the wire format 'B.ByteString'. -- Note that, as for servers, the request -- may be composed of components that together -- form the request. The type /o/ -- corresponds to one of these request components, -- not necessarily to the request type as a whole, -- which is determined when issuing a request. -- -- * 'InBound': Converter to convert a reply ('B.ByteString') -- into type 'i'. -- Note again that the reply may consist of many -- message segments. The type /i/ relates to one -- reply component, -- not necessarily to the reply type as a whole, -- which is determined when issuing a request. -- -- * 'Client' -> IO a: The action to perform with this client. ------------------------------------------------------------------------ withClient :: Z.Context -> AccessPoint -> OutBound o -> InBound i -> (Client i o -> IO a) -> IO a withClient ctx ac oconv iconv act = Z.withSocket ctx Z.Req $ \s -> do trycon s (acAdd ac) retries act Client { cliCtx = ctx, cliSock = s, cliAdd = ac, cliOut = oconv, cliIn = iconv} ------------------------------------------------------------------------ -- | Synchronously requesting a service; -- the function blocks the current thread, -- until a reply is received. -- -- Parameters: -- -- * 'Client': The client that performs the request -- -- * 'E.Enumerator': Enumerator to create the request message stream -- -- * 'E.Iteratee': Iteratee to process the reply message stream -- -- A simple client that just writes the results to 'stdout': -- -- @ -- rcv :: String -> IO () -- rcv req = withContext 1 $ \\ctx -> -- withClient ctx -- (Address \"tcp:\/\/localhost:5555\" []) -- connect to this address -- (return . B.pack) (return . B.unpack) $ -- string converters -- \\s -> do -- -- request with enum and outit -- ei <- request s (enum req) outit -- case ei of -- Left e -> putStrLn $ \"Error: \" ++ show (e::SomeException) -- Right _ -> return () -- @ -- -- @ -- -- Enumerator that returns just one string -- enum :: String -> E.Enumerator String IO () -- enum = once (return . Just) -- @ -- -- @ -- -- Iteratee that just writes to stdout -- outit :: E.Iteratee String IO () -- outit = do -- mbi <- EL.head -- case mbi of -- Nothing -> return () -- Just i -> liftIO (putStrLn i) >> outit -- @ -- -- Note that this code just issues one request, -- which is not the most typical use case. -- It is more likely that the action will loop for ever -- and receive requests, for instance, from a user interface. ------------------------------------------------------------------------ request :: Client i o -> E.Enumerator o IO () -> E.Iteratee i IO a -> IO (Either SomeException a) request c enum it = tryout ?> reicv where tryout = try $ askFor c enum reicv _ = rcvClient c it ------------------------------------------------------------------------ -- | Asynchronously requesting a service; -- the function sends a request to the server -- without waiting for a result. -- -- Parameters: -- -- * 'Client': The client that performs the request -- -- * 'E.Enumerator': Enumerator to create the request message stream ------------------------------------------------------------------------ askFor :: Client i o -> E.Enumerator o IO () -> IO () askFor c enum = E.run_ (enum $$ itSend (cliSock c) (cliOut c)) ------------------------------------------------------------------------ -- | Polling for a reply; -- the function polls for a server request. -- If nothing has been received, it returns 'Nothing'; -- otherwise it returns 'Just' the result or an error. -- -- Parameters: -- -- * 'Client': The client that performs the request -- -- * 'E.Iteratee': Iteratee to process the reply message stream -- -- The synchronous request (see 'request') -- could be implemented asynchronously like: -- -- @ -- rcv :: String -> IO () -- rcv req = withContext 1 $ \\ctx -> do -- let ap = address l \"tcp:\/\/localhost:5555\" [] -- withClient ctx ap -- (return . B.pack) (return . B.unpack) -- $ \\s -> do -- ei <- try $ askFor s (enum req) -- case ei of -- Left e -> putStrLn $ \"Error: \" ++ show (e::SomeException) -- Right _ -> wait s -- -- check for results periodically -- where wait s = checkFor s outit >>= \\mbei -> -- case mbei of -- Nothing -> do putStrLn \"Waiting...\" -- threadDelay 10000 >> wait s -- Just (Left e) -> putStrLn $ \"Error: \" ++ show e -- Just (Right _) -> putStrLn \"Ready!\" -- @ ------------------------------------------------------------------------ checkFor :: Client i o -> E.Iteratee i IO a -> IO (Maybe (Either SomeException a)) checkFor c it = Z.poll [Z.S (cliSock c) Z.In] 0 >>= \[s] -> case s of Z.S _ Z.In -> Just <$> rcvClient c it _ -> return Nothing ------------------------------------------------------------------------ -- The real working horse behind the scene ------------------------------------------------------------------------ rcvClient :: Client i o -> E.Iteratee i IO a -> IO (Either SomeException a) rcvClient c it = E.run (rcvEnum (cliSock c) (cliIn c) $$ it) ------------------------------------------------------------------------ -- | Publisher ------------------------------------------------------------------------ data Pub o = Pub { pubCtx :: Z.Context, pubSock :: Z.Socket Z.Pub, pubAdd :: AccessPoint, pubOut :: OutBound o} ------------------------------------------------------------------------ -- | Obtaining the 'Z.Context' from 'Pub' ------------------------------------------------------------------------ pubContext :: Pub o -> Z.Context pubContext = pubCtx ------------------------------------------------------------------------ -- | Setting 'Z.SocketOption' to the underlying ZMQ 'Z.Socket' ------------------------------------------------------------------------ setPubOptions :: Pub o -> [Z.SocketOption] -> IO () setPubOptions p = setSockOs (pubSock p) ------------------------------------------------------------------------ -- | Creates a publisher; -- A publisher is a data type -- that provides an interface to publish data to subscribers. -- 'withPub' creates a publisher and invokes -- an application-defined action, -- which receives a 'Pub' argument. -- The lifetime of the publisher is limited to the action. -- When the action terminates, the publisher /dies/. -- -- Parameter: -- -- * 'Z.Context': The ZMQ Context -- -- * 'AccessPoint': The access point the publisher will bind -- -- * 'OutBound': A converter to convert from type /o/ -- to the wire format 'B.ByteString'. -- Note that a publisher may create -- a data stream; the type /o/ is then -- the type of one segment of this stream, -- not of the stream as a whole. -- -- * 'Pub' -> IO (): The action to invoke ------------------------------------------------------------------------ withPub :: Z.Context -> AccessPoint -> OutBound o -> (Pub o -> IO a) -> IO a withPub ctx ac oconv act = Z.withSocket ctx Z.Pub $ \s -> do Z.bind s (acAdd ac) act Pub { pubCtx = ctx, pubSock = s, pubAdd = ac, pubOut = oconv} ------------------------------------------------------------------------ -- | Publishes the data stream created by an enumerator; -- -- Parameters: -- -- * 'Pub': The publisher -- -- * 'E.Enumerator': The enumerator to create an outgoing -- data stream. -- -- A simple weather report publisher: -- -- @ -- withContext 1 $ \\ctx -> withPub ctx -- (Address \"tcp:\/\/*:5555\" []) -- (return . B.pack) $ \\pub -> untilInterrupt $ do -- issue pub (once weather noparam) -- threadDelay 10000 -- update every 10ms -- @ -- -- @ -- -- fake weather report with some random values -- weather :: String -> IO (Maybe String) -- weather _ = do -- zipcode <- randomRIO (10000, 99999) :: IO Int -- temperature <- randomRIO (-10, 30) :: IO Int -- humidity <- randomRIO ( 10, 60) :: IO Int -- return $ Just (unwords [show zipcode, -- show temperature, -- show humidity]) -- @ ------------------------------------------------------------------------ issue :: Pub o -> E.Enumerator o IO () -> IO () issue p enum = E.run_ (enum $$ itSend (pubSock p) (pubOut p)) ------------------------------------------------------------------------ -- | Creates a background process that -- periodically publishes data; -- -- Parameters: -- -- * 'Z.Context': The ZMQ Context -- -- * 'String': Name of this Publisher; -- useful for debugging -- -- * 'Parameter': The initial value of the control parameter -- -- * 'Z.Timeout': The period of the publisher in microseconds; -- the process will issue the publisher data -- every n microseconds. -- -- * 'AccessPoint': Bind address -- -- * 'OutBound': A converter that converts one segment -- of the data stream from type /o/ -- to the wire format 'B.ByteString' -- -- * 'OnError_': Error Handler -- -- * 'String' -> 'Fetch': 'E.Enumerator' to create -- the outgoing data stream; -- the string argument is the parameter. -- -- * 'Service' -> IO (): The user action to perform -- -- The weather report publisher introduced above (see 'withPub') -- can be implemented by means of 'withPeriodicPub' as: -- -- @ -- withPeriodicPub ctx \"Weather Report\" noparam -- 100000 -- publish every 100ms -- (Address \"tcp:\/\/*:5555\" []) -- (return . B.pack) -- string converter -- onErr_ -- standard error handler -- (\\_ -> fetch1 fetch) -- creates one instance -- -- of the return of \"fetch\"; -- -- see 'Enumerator' for details -- $ \\pub -> -- untilInterrupt $ do -- until /SIGINT/, see 'withServer' for details -- threadDelay 100000 -- putStrLn $ \"I am doing nothing \" ++ srvName pub -- @ ------------------------------------------------------------------------ withPeriodicPub :: Z.Context -> String -> Parameter -> Z.Timeout -> AccessPoint -> OutBound o -> OnError_ -> Fetch_ o -> (Service -> IO a) -> IO a withPeriodicPub ctx name param period ac oconv onerr fetch action = withService ctx name param service action where service = publish period ac oconv onerr fetch ------------------------------------------------------------------------ -- PeriodicPub implementation ------------------------------------------------------------------------ publish :: Z.Timeout -> AccessPoint -> OutBound o -> OnError_ -> Fetch_ o -> Z.Context -> String -> String -> String -> IO () -> IO () publish period ac oconv onerr fetch ctx name sockname param ready = ( Z.withSocket ctx Z.Pub $ \sock -> do Z.bind sock (acAdd ac) Z.withSocket ctx Z.Sub $ \cmd -> do trycon cmd sockname retries Z.subscribe cmd "" ready periodicSend False period cmd (go sock) param) `catch` (\e -> onerr Fatal e name param) ------------------------------------------------------------------------ -- do the job periodically ------------------------------------------------------------------------ where go sock p = catch (body sock p) (\e -> onerr Error e name p) body sock p = do eiR <- E.run (fetch ctx p () $$ itSend sock oconv) ifLeft eiR (\e -> onerr Error e name p) (\_ -> return ()) ------------------------------------------------------------------------ -- | A subscription is a background service -- that receives and processes data streams -- from a publisher. -- A typical use case is an application -- that operates on periodically updated data; -- the subscriber would receive these data and -- and make them accessible to other threads in the process -- through an 'MVar'. -- -- Parameters: -- -- * 'Z.Context': The ZMQ Context -- -- * 'String': The subscriber's name -- -- * 'Parameter': The initial value of the control parameter -- -- * ['Topic']: The topics to subscribe to; -- in the example above ('withPub'), -- the publisher publishes the weather report -- per zip code; the zip code, in this example, -- could be a meaningful topic for a subscriber. -- It is good practice to send the topic -- in an initial message segment, -- the envelope, to avoid that the subscriber -- matches on some arbitrary part of the message. -- -- * 'InBound': A converter that converts one segment -- of the incoming data stream to type /o/ -- -- * 'OnError_': Error handler -- -- * 'Dump': 'E.Iteratee' to process the incoming data stream. -- -- * 'Service' -> IO (): Application-defined action to control -- the service. Note that 'Service' is -- a thread-local resource and must not -- be passed to threads forked from the action. -- -- Weather Report Subscriber: -- -- @ -- withContext 1 $ \\ctx -> -- withSub ctx \"Weather Report\" noparam -- [\"10001\"] -- zipcode to subscribe to -- (Address \"tcp:\/\/localhost:5555\" []) -- (return . B.unpack) -- onErr_ output -- Iteratee that just writes to stdout -- $ \\s -> untilInterrupt $ do -- putStrLn $ \"Doing nothing \" ++ srvName s -- threadDelay 1000000 -- @ ------------------------------------------------------------------------ withSub :: Z.Context -> String -> Parameter -> [Topic] -> AccessPoint -> InBound i -> OnError_ -> Dump i -> (Service -> IO a) -> IO a withSub ctx name param sub ac iconv onErr dump action = withService ctx name param service action where service = subscribe sub ac iconv onErr dump subscribe :: [Topic] -> AccessPoint -> InBound i -> OnError_ -> Dump i -> Z.Context -> String -> String -> Parameter -> IO () -> IO () subscribe sub ac iconv onerr dump ctx name sockname param ready = ( Z.withSocket ctx Z.Sub $ \sock -> do trycon sock (acAdd ac) retries mapM_ (Z.subscribe sock) sub Z.withSocket ctx Z.Sub $ \cmd -> do trycon cmd sockname retries Z.subscribe cmd "" ready poll False [Z.S cmd Z.In, Z.S sock Z.In] (go sock) param) `catch` (\e -> onerr Fatal e name param) where go sock p = do eiR <- E.run (rcvEnum sock iconv $$ dump ctx p) ifLeft eiR (\e -> onerr Error e name p) (\_ -> return ()) ------------------------------------------------------------------------ -- | An alternative to the background subscriber (see 'withSub'); ------------------------------------------------------------------------ data Sub i = Sub { subCtx :: Z.Context, subSock :: Z.Socket Z.Sub, subAdd :: AccessPoint, subIn :: InBound i} ------------------------------------------------------------------------ -- | Obtaining the 'Z.Context' from 'Sub' ------------------------------------------------------------------------ subContext :: Sub i -> Z.Context subContext = subCtx ------------------------------------------------------------------------ -- | Setting 'Z.SocketOption' to the underlying ZMQ 'Z.Socket' ------------------------------------------------------------------------ setSubOptions :: Sub i -> [Z.SocketOption] -> IO () setSubOptions s = setSockOs (subSock s) ------------------------------------------------------------------------ -- | Similar to 'Pub', a 'Sub' is a data type -- that provides an interface to subscribe data. -- 'withSporadicSub' creates a subscriber and invokes -- an application-defined action, -- which receives a 'Sub' argument. -- The lifetime of the subscriber is limited to the action. -- When the action terminates, the subscriber /dies/. ------------------------------------------------------------------------ withSporadicSub :: Z.Context -> AccessPoint -> InBound i -> [Topic] -> (Sub i -> IO a) -> IO a withSporadicSub ctx ac iconv topics act = Z.withSocket ctx Z.Sub $ \s -> do trycon s (acAdd ac) retries mapM_ (Z.subscribe s) topics act Sub { subCtx = ctx, subSock = s, subAdd = ac, subIn = iconv} ------------------------------------------------------------------------ -- | Polling for data; -- If nothing has been received, the function returns 'Nothing'; -- otherwise it returns 'Just' the result or an error. -- -- Parameters: -- -- * 'Sub': The subscriber -- -- * 'E.Iteratee': Iteratee to process the data ------------------------------------------------------------------------ checkSub :: Sub i -> E.Iteratee i IO a -> IO (Maybe (Either SomeException a)) checkSub s it = Z.poll [Z.S (subSock s) Z.In] 0 >>= \[p] -> case p of Z.S _ Z.In -> Just <$> rcvSub s it _ -> return Nothing ------------------------------------------------------------------------ -- | Waiting for data; -- the function blocks the current thread, -- until data are being received from the publisher. -- It returns either 'SomeException' or the result. -- -- Parameters: -- -- * 'Sub': The subscriber -- -- * 'E.Iteratee': Iteratee to process the data stream ------------------------------------------------------------------------ waitSub :: Sub i -> E.Iteratee i IO a -> IO (Either SomeException a) waitSub = rcvSub ------------------------------------------------------------------------ -- | Unsubscribe a topic ------------------------------------------------------------------------ unsubscribe :: Sub i -> Topic -> IO () unsubscribe s t = Z.unsubscribe (subSock s) t ------------------------------------------------------------------------ -- | Subscribe another topic ------------------------------------------------------------------------ resubscribe :: Sub i -> Topic -> IO () resubscribe s t = Z.subscribe (subSock s) t ------------------------------------------------------------------------ -- The working horse behind the scene ------------------------------------------------------------------------ rcvSub :: Sub i -> E.Iteratee i IO a -> IO (Either SomeException a) rcvSub s it = E.run (rcvEnum (subSock s) (subIn s) $$ it) ------------------------------------------------------------------------ -- | A puller is a background service -- that receives and processes data streams from a pipeline. -- -- Parameters: -- -- * 'Z.Context': The ZMQ Context -- -- * 'String': The service name -- -- * 'Parameter': The initial value of the control parameter -- -- * 'AccessPoint': The address to connect to -- -- * 'InBound': A converter to convert -- segments of the incoming data stream -- from the wire format 'B.ByteString' -- to the type /i/ -- -- * 'OnError_': Error Handler -- -- * 'Dump': 'E.Iteratee' to process -- the incoming data stream -- -- * 'Service' -> IO (): Application-defined action -- -- A worker that just writes the incoming stream to /stdout/: -- -- @ -- withContext 1 $ \\ctx -> -- withPuller ctx \"Worker\" noparam -- (Address \"tcp:\/\/localhost:5555\" []) -- (return . B.unpack) -- onErr_ output -- $ \\s -> untilInterrupt $ do -- putStrLn \"Doing nothing \" ++ srvName s -- threadDelay 100000 -- @ ------------------------------------------------------------------------ withPuller :: Z.Context -> String -> Parameter -> AccessPoint -> InBound i -> OnError_ -> Dump i -> (Service -> IO a) -> IO a withPuller ctx name param ac iconv onerr dump action = withService ctx name param service action where service = pull ac iconv onerr dump pull :: AccessPoint -> InBound i -> OnError_ -> Dump i -> Z.Context -> String -> String -> String -> IO () -> IO () pull ac iconv onerr dump ctx name sockname param ready = ( Z.withSocket ctx Z.Pull $ \sock -> do trycon sock (acAdd ac) retries Z.withSocket ctx Z.Sub $ \cmd -> do trycon cmd sockname retries Z.subscribe cmd "" ready poll False [Z.S cmd Z.In, Z.S sock Z.In] (go sock) param) `catch` (\e -> onerr Fatal e name param) ------------------------------------------------------------------------ -- do the job ------------------------------------------------------------------------ where go sock p = E.run_ (rcvEnum sock iconv $$ dump ctx p) `catch` (\e -> onerr Error e name p) ------------------------------------------------------------------------ -- | A pipeline consists of a \"pusher\" -- and a set of workers (\"pullers\"). -- The pusher sends jobs down the pipeline that will be -- assigned to one of the workers. -- The pipeline pattern is, thus, a work-balancing scheme. ------------------------------------------------------------------------ data Pipe o = Pipe { pipCtx :: Z.Context, pipSock :: Z.Socket Z.Push, pipAdd :: AccessPoint, pipOut :: OutBound o } ------------------------------------------------------------------------ -- | Obtaining the 'Z.Context' from 'Pipe' ------------------------------------------------------------------------ pipeContext :: Pipe o -> Z.Context pipeContext = pipCtx ------------------------------------------------------------------------ -- | Setting 'Z.SocketOption' to the underlying ZMQ 'Z.Socket' ------------------------------------------------------------------------ setPipeOptions :: Pipe o -> [Z.SocketOption] -> IO () setPipeOptions p = setSockOs (pipSock p) ------------------------------------------------------------------------ -- | Creates a pipeline; -- a 'Pipe' is a data type -- that provides an interface to /push/ a data stream -- to workers connected to the other side of the pipe. -- 'withPipe' creates a pipeline and invokes an application-defined -- action which receives a 'Pipe' argument. -- The lifetime of the 'Pipe' is limited to the action. -- When the action terminates, the 'Pipe' /dies/. -- -- Parameters: -- -- * 'Z.Context': The ZMQ Context -- -- * 'AccessPoint': The bind address -- -- * 'OutBound': A converter to convert message segments -- of type /o/ to the wire format 'B.ByteString' -- -- * 'Pipe' -> IO (): The action to invoke ------------------------------------------------------------------------ withPipe :: Z.Context -> AccessPoint -> OutBound o -> (Pipe o -> IO a) -> IO a withPipe ctx ac oconv act = Z.withSocket ctx Z.Push $ \s -> do Z.bind s (acAdd ac) act Pipe { pipCtx = ctx, pipSock = s, pipAdd = ac, pipOut = oconv} ------------------------------------------------------------------------ -- | Sends a job down the pipeline; -- -- Parameters: -- -- * 'Pipe': The pipeline -- -- * 'E.Enumerator': enumerator to create the data stream -- that constitutes the /job/ -- -- A simple pusher: -- -- @ -- sendF :: FilePath -> IO () -- sendF f = withContext 1 $ \\ctx -> do -- let ap = Address \"tcp:\/\/*:5555\" [] -- withPipe ctx ap return $ \\p -> -- push pu (EB.enumFile f) -- file enumerator -- -- see Data.Enumerator.Binary (EB) -- @ ------------------------------------------------------------------------ push :: Pipe o -> E.Enumerator o IO () -> IO () push p enum = E.run_ (enum $$ itSend (pipSock p) (pipOut p)) ------------------------------------------------------------------------ -- | An Exclusive Pair is a general purpose pattern -- of two equal peers that communicate with each other -- by sending ('send') and receiving ('receive') data. -- One of the peers has to 'Z.bind' the 'AccessPoint' -- the other 'Z.connect's to it. ------------------------------------------------------------------------ data Peer a = Peer { peeCtx :: Z.Context, peeSock :: Z.Socket Z.Pair, peeAdd :: AccessPoint, peeIn :: InBound a, peeOut :: OutBound a } ------------------------------------------------------------------------ -- | Obtains the 'Z.Context' from a 'Peer' ------------------------------------------------------------------------ peerContext :: Peer a -> Z.Context peerContext = peeCtx ------------------------------------------------------------------------ -- | Sets 'Z.SocketOption' ------------------------------------------------------------------------ setPeerOptions :: Peer a -> [Z.SocketOption] -> IO () setPeerOptions p = setSockOs (peeSock p) ------------------------------------------------------------------------ -- | Creates a 'Peer'; -- a peer is a data type -- that provides an interface to exchange data with another peer. -- 'withPeer' creates the peer and invokes an application-defined -- action that receives a 'Peer' argument. -- The lifetime of the 'Peer' is limited to the action. -- When the action terminates, the 'Peer' /dies/. -- -- Parameters: -- -- * 'Z.Context': The ZMQ Context -- -- * 'AccessPoint': The address, to which this peer either -- binds or connects -- -- * 'LinkType': One of the peers has to bind the address, -- the other has to connect. -- -- * 'InBound': A converter to convert message segments -- from the wire format 'B.ByteString' to type /i/ -- -- * 'OutBound': A converter to convert message segments -- of type /o/ to the wire format 'B.ByteString' -- -- * 'Peer' -> IO (): The action to invoke ------------------------------------------------------------------------ withPeer :: Z.Context -> AccessPoint -> LinkType -> InBound a -> OutBound a -> (Peer a -> IO b) -> IO b withPeer ctx ac t iconv oconv act = Z.withSocket ctx Z.Pair $ \s -> link t ac s >> act Peer { peeCtx = ctx, peeSock = s, peeAdd = ac, peeIn = iconv, peeOut = oconv} ------------------------------------------------------------------------ -- | Sends a data stream to another peer; -- -- Parameters: -- -- * 'Peer': The peer -- -- * 'E.Enumerator': Enumerator to create the outoing data stream ------------------------------------------------------------------------ send :: Peer o -> E.Enumerator o IO () -> IO () send p enum = E.run_ (enum $$ itSend (peeSock p) (peeOut p)) ------------------------------------------------------------------------ -- | Receives a data stream from another peer; -- -- Parameters: -- -- * 'Peer': The peer -- -- * 'E.Iteratee': Iteratee to process the incoming data stream ------------------------------------------------------------------------ receive :: Peer i -> E.Iteratee i IO a -> IO (Either SomeException a) receive p it = E.run (rcvEnum (peeSock p) (peeIn p) $$ it)