{-# LANGUAGE CPP, DeriveDataTypeable, RankNTypes #-} ------------------------------------------------------------------------------- -- | -- Module : Network/Mom/Patterns/Streams.hs -- Copyright : (c) Tobias Schoofs -- License : LGPL -- Stability : experimental -- Portability: non-portable -- -- Stream processing services ------------------------------------------------------------------------------- module Network.Mom.Patterns.Streams ( -- * Processing Streams -- $procStreams withStreams, runReceiver, runSender, PollEntry(..), AccessType(..), parseAccess, LinkType(..), link, parseLink, -- * Streamer -- $streamer Streamer, StreamConduit, StreamSink, StreamAction, filterStreams, getSource, -- * StreamSinks -- $sinks stream, part, passAll, pass1, passN, passWhile, ignoreStream, -- * Controller -- $controller Controller, Control, internal, stop, pause, resume, send, receive -- * Complete Example -- $example ) where import Control.Monad.Trans (liftIO) import Control.Monad (when) import Control.Applicative ((<$>)) import Prelude hiding (catch) import Control.Exception (SomeException, bracket, bracketOnError, finally, catch, throwIO) import Control.Concurrent import Data.Conduit (($$)) import qualified Data.Conduit as C import qualified Data.ByteString.Char8 as B import Data.Char (toLower) import Data.Map (Map) import qualified Data.Map as Map import qualified System.ZMQ as Z import Factory import Network.Mom.Patterns.Types {- $procStreams This module provides functions to automate stream processing, in particular the function 'withStreams' that starts a background action polling on a set of streams. The function uses uses application-defined callbacks to manipulate streams. The functions 'runReceiver' and 'runSender' are intended mainly for testing. They send or receive respectively streams, which are handled or created by a conduit 'Sink' and 'Source'. -} ------------------------------------------------------------------------ -- | Starts polling on a set of streams. -- The actual polling will be run in another thread. -- The current thread continues with the action passed in. -- When this action terminates, the streamer stops polling. -- -- Parameters: -- -- * 'Context' - The /ZMQ/ context -- -- * 'Service' - The service name -- indicated for instance in error messages. -- -- * 'Timeout' - The polling timeout: -- /< 0/ - listens eternally, -- /0/ - returns immediately, -- /> 0/ - timeout in microseconds; -- when the timeout expires, the 'StreamAction' is invoked. -- -- * 'PollEntry' - List of 'PollEntry'; -- the streamer will poll over -- all list members. -- When input is available, -- it is directed to the 'StreamSink'. -- -- * 'StreamAction' - Invoked when timeout expires. -- -- * 'OnError_' - Error handler -- -- * 'StreamSink' - The sink, to which the stream is sent. -- Note that the sink must terminate -- the outgoing stream -- (using one of the terminating sinks -- described below). -- Not terminating the stream properly -- will result in a zeromq socket error. -- -- * 'Control' a - The action to invoke, -- when the streamer has been started; -- The 'Control' is used to control the device. ------------------------------------------------------------------------ withStreams :: Context -> Service -> Timeout -> [PollEntry] -> StreamAction -> -- on timeout OnError_ -> -- error handler StreamSink -> -- stream handler Control a -> IO a withStreams ctx sn tmo pes onTmo onErr onStream ctrl = do running <- newEmptyMVar ready <- newEmptyMVar catch (startService ready running) (\e -> do onErr Fatal e "Service is going down" throwIO (e::SomeException)) ---------------------------------------------------------------- -- start the polling thread -- create the controller -- invoke the control action ---------------------------------------------------------------- where startService ready running = do cmdN <- cmdName sn _ <- forkIO $ finally (runStreams cmdN ready `catch` \e -> do onErr Fatal e "Service is going down" throwIO (e::SomeException)) (putMVar running ()) Z.withSocket ctx Z.XReq $ \cmd -> do _ <- readMVar ready Z.connect cmd cmdN let c = Controller {ctrlCtx = ctx, ctrlCmd = cmd, ctrlOpts = []} finally (ctrl c) (finally (stop c) (takeMVar running)) ---------------------------------------------------------------- -- The polling thread ---------------------------------------------------------------- runStreams cmdN ready = Z.withSocket ctx Z.XReq $ \cmd -> bracket (do Z.bind cmd cmdN let c = Z.S cmd Z.In (m, is, ps) <- mkPoll ctx pes Map.empty [] [] return (m, c, is, ps)) (\(_, _, _, ps) -> mapM_ closeS ps) (\(m, c, is, ps) -> do xp <- newMVar PollSt { polMap = m, polCmd = c, polIs = is, polPs = ps, polTmo = tmo, polCont = True } putMVar ready () poll xp) ---------------------------------------------------------------- -- Poll ---------------------------------------------------------------- poll :: PollT () poll xp = do c <- psCmd xp (_, ps) <- psPolls xp (x:ss) <- Z.poll (c:ps) tmo case x of Z.S _ Z.In -> do catch (handleCmd xp) (\e -> do onErr Critical e "Can't handle command" cleanStream c) q <- psContinue xp when q $ poll xp _ -> handleStream ss xp >> poll xp ---------------------------------------------------------------- -- Handle Streams ---------------------------------------------------------------- handleStream :: [Z.Poll] -> PollT () handleStream ss xp = do m <- psMap xp c <- psCmd xp (is, ps) <- psPolls xp case getStream is ss of Nothing -> onTmo Streamer {strmSrc = Nothing, strmIdx = m, strmCmd = c, strmPoll = ps} `catch` \e -> onErr Error e "Timeout Action failed" Just (i, p) -> catch ( C.runResourceT $ readPoll p $$ onStream Streamer { strmSrc = Just (i, p), strmIdx = m, strmCmd = c, strmPoll = ps}) ( \e -> onErr Error e "Stream handling failed" >> cleanStream p) ---------------------------------------------------------------- -- Handle Commands ---------------------------------------------------------------- handleCmd :: PollT () handleCmd xp = do m <- psMap xp c <- psCmd xp (_, ps) <- psPolls xp case c of Z.S s _ -> do x <- B.unpack <$> Z.receive s [] case x of "send" -> cmdSend s Streamer {strmSrc = Nothing, strmIdx = m, strmCmd = c, strmPoll = ps} "stop" -> cmdStop ps >> setPsContinue False xp "pause" -> cmdPause s "resume" -> return () -- ignore "test" -> putStrLn "test successful" _ -> undefined _ -> throwIO $ Ouch "Ouch! Not a poller in handleCmd!" ------------------------------------------------------------------------ -- Simplify parameter passing in streams ------------------------------------------------------------------------ data PollState = PollSt { polMap :: Map Identifier Z.Poll, polCmd :: Z.Poll, polIs :: [Identifier], polPs :: [Z.Poll], polTmo :: Timeout, polCont :: Bool} type PollT r = MVar PollState -> IO r psPolls :: PollT ([Identifier], [Z.Poll]) psPolls m = withMVar m $ \p -> return (polIs p, polPs p) psMap :: PollT (Map Identifier Z.Poll) psMap m = withMVar m (return . polMap) psCmd :: PollT Z.Poll psCmd m = withMVar m (return . polCmd) psContinue :: PollT Bool psContinue m = withMVar m (return . polCont) setPsContinue :: Bool -> PollT () setPsContinue t m = modifyMVar_ m $ \p -> return p{polCont = t} ------------------------------------------------------------------------ -- Get first stream with input ------------------------------------------------------------------------ getStream :: [Identifier] -> [Z.Poll] -> Maybe (Identifier, Z.Poll) getStream _ [] = Nothing getStream [] _ = Nothing getStream (i:is) (p:ps) = case p of Z.S _ Z.In -> Just (i, p) _ -> getStream is ps ------------------------------------------------------------------------ -- Receive from poll entry ------------------------------------------------------------------------ readPoll :: Z.Poll -> Source readPoll p = case p of Z.S s _ -> recv s _ -> liftIO $ throwIO $ Ouch "Ouch! No socket in poll" ------------------------------------------------------------------------ -- Remove all messages from stream ------------------------------------------------------------------------ cleanStream :: Z.Poll -> IO () cleanStream p = case p of Z.S s _ -> go s _ -> return () where go s = do m <- Z.moreToReceive s when m $ Z.receive s [] >>= \_ -> go s ------------------------------------------------------------------------ -- Traditional receive ------------------------------------------------------------------------ recv :: Z.Socket a -> Source recv s = liftIO (Z.receive s []) >>= \x -> do C.yield x m <- liftIO $ Z.moreToReceive s when m $ recv s ------------------------------------------------------------------------ -- Receive with tmo ------------------------------------------------------------------------ waitForRecv :: Z.Socket a -> Z.Timeout -> IO (Maybe (Z.Socket a)) waitForRecv s tmo = Z.poll [Z.S s Z.In] tmo >>= \[s'] -> case s' of Z.S _ Z.In -> return $ Just s _ -> return Nothing ------------------------------------------------------------------------ -- | Receiver Sink: -- Internally a zeromq socket is waiting for input; -- when input is available, it is send to the sink. -- -- * 'Z.Socket a' - The source socket -- -- * 'Timeout' - receiver timeout -- /< 0/ - listens eternally, -- /0/ - returns immediately, -- /> 0/ - timeout in microseconds; -- when the timeout expires, the stream terminates -- and the return value is Nothing. ------------------------------------------------------------------------ runReceiver :: Z.Socket a -> Timeout -> SinkR (Maybe o) -> IO (Maybe o) runReceiver s tmo snk = do mb_ <- waitForRecv s tmo case mb_ of Nothing -> return Nothing Just _ -> C.runResourceT $ recv s $$ snk ------------------------------------------------------------------------ -- | Sender Source: -- The 'Source' generates a stream, -- which is relayed to the 'Z.Socket'. ------------------------------------------------------------------------ runSender :: Z.Socket a -> Source -> IO () runSender s src = C.runResourceT $ src $$ relay s ------------------------------------------------------------------------ -- Create cmdName ------------------------------------------------------------------------ cmdName :: Service -> IO String cmdName sn = do u <- show <$> mkUniqueId return $ "inproc://_" ++ sn ++ "_" ++ u ------------------------------------------------------------------------ -- Creates a socket, binds or links it and sets the socket options ------------------------------------------------------------------------ access :: Context -> AccessType -> LinkType -> [Z.SocketOption] -> String -> [Service] -> IO Z.Poll access ctx a l os u ts = case a of ServerT -> Z.socket ctx Z.Rep >>= go ClientT -> Z.socket ctx Z.Req >>= go DealerT -> Z.socket ctx Z.XReq >>= go RouterT -> Z.socket ctx Z.XRep >>= go PubT -> Z.socket ctx Z.Pub >>= go PipeT -> Z.socket ctx Z.Push >>= go PullT -> Z.socket ctx Z.Pull >>= go PeerT -> Z.socket ctx Z.Pair >>= go SubT -> Z.socket ctx Z.Sub >>= \s -> mapM_ (Z.subscribe s) ts >> go s where go s = do setSockOs s os case l of Bind -> Z.bind s u Connect -> trycon s u retries return $ Z.S s Z.In ------------------------------------------------------------------------ -- creates and binds or connects all sockets recursively; -- on its way, creates the Map from Identifiers to PollItems, -- a list of PollItems -- and a list of Identifiers with the same order; -- finally executes "run" ------------------------------------------------------------------------ mkPoll :: Context -> [PollEntry] -> Map Identifier Z.Poll -> [Identifier] -> [Z.Poll] -> IO (Map Identifier Z.Poll, [Identifier], [Z.Poll]) mkPoll _ [] m is ps = return (m, is, ps) mkPoll ctx (k:ks) m is ps = bracketOnError (access ctx (pollType k) (pollLink k) (pollOs k) (pollAdd k) (pollSub k)) (\p -> closeS p >> return (m, [], [])) (\p -> mkPoll ctx ks (Map.insert (pollId k) p m) (pollId k:is) (p:ps)) ------------------------------------------------------------------------ -- Close socket in a poll entry ------------------------------------------------------------------------ closeS :: Z.Poll -> IO () closeS p = case p of Z.S s _ -> safeClose s _ -> return () ------------------------------------------------------------------------ -- safely close a socket ------------------------------------------------------------------------ safeClose :: Z.Socket a -> IO () safeClose s = catch (Z.close s) (\e -> let _ = (e::SomeException) in return ()) ------------------------------------------------------------------------ -- handle stop command ------------------------------------------------------------------------ cmdStop :: [Z.Poll] -> IO () cmdStop = mapM_ closeS ------------------------------------------------------------------------ -- handle pause command ------------------------------------------------------------------------ cmdPause :: Z.Socket a -> IO () cmdPause s = do x <- B.unpack <$> Z.receive s [] case x of "resume" -> return () _ -> cmdPause s ------------------------------------------------------------------------ -- handle send command ------------------------------------------------------------------------ cmdSend :: Z.Socket a -> Streamer -> IO () cmdSend cmd s = do ok <- Z.moreToReceive cmd if ok then do ds <- getDest cmd [] C.runResourceT $ recv cmd $$ passAll s ds else throwIO $ ProtocolExc "Abrupt end of send command" ------------------------------------------------------------------------ -- get destination to send to ------------------------------------------------------------------------ getDest :: Z.Socket a -> [Identifier] -> IO [Identifier] getDest cmd is = do i <- B.unpack <$> Z.receive cmd [] if null i then if null is then throwIO $ ProtocolExc "No identifiers" else return is else do ok <- Z.moreToReceive cmd if ok then getDest cmd (i:is) else throwIO $ ProtocolExc "Incomplete identifier list" ------------------------------------------------------------------------ -- relay stream to a socket ------------------------------------------------------------------------ relay :: Z.Socket a -> Sink relay s = do mbX <- C.await case mbX of Nothing -> return () Just x -> go x where go x = do mbN <- C.await case mbN of Nothing -> liftIO (Z.send s x []) Just n -> liftIO (Z.send s x [Z.SndMore]) >> go n ------------------------------------------------------------------------ -- send one message to many streams ------------------------------------------------------------------------ multiSend :: [(Identifier, Z.Poll)] -> B.ByteString -> [Z.Flag] -> IO () multiSend ps m os = go ps where go [] = return () go ((_,p):pp) = sndPoll p >> go pp sndPoll p = case p of Z.S s _ -> Z.send s m os _ -> throwIO $ Ouch "Ouch! Not a Poll!" ------------------------------------------------------------------------ -- Find sockets corresponding to identifiers ------------------------------------------------------------------------ idsToSocks :: Streamer -> [Identifier] -> Either String [(Identifier, Z.Poll)] idsToSocks s = getSocks where getSock i | i == internal = Right $ strmCmd s | otherwise = case Map.lookup i (strmIdx s) of Nothing -> Left $ "Unknown identifier " ++ i Just p -> Right p getSocks [] = Right [] getSocks (i:is) = case getSock i of Left e -> Left e Right p -> case getSocks is of Left e -> Left e Right ps -> Right ((i,p):ps) {- $streamer A streamer represents the current state of the streaming device started by means of 'withStreams'. It is passed in to application-defined callbacks, namely the timeout action ('StreamAction') and the 'Sink' ('StreamSink'). There is a bunch of useful sinks that receive a streamer as input (see below). -} ------------------------------------------------------------------------ -- | Holds information on streams and the current state of the streamer, -- /i.e./ the current source. -- Streamers are passed to processing conduits. ------------------------------------------------------------------------ data Streamer = Streamer { strmSrc :: Maybe (Identifier, Z.Poll), strmIdx :: Map Identifier Z.Poll, strmCmd :: Z.Poll, strmPoll :: [Z.Poll]} ------------------------------------------------------------------------ -- | Conduit with Streamer ------------------------------------------------------------------------ type StreamConduit = Streamer -> Conduit B.ByteString () ------------------------------------------------------------------------ -- | Sink with Streamer ------------------------------------------------------------------------ type StreamSink = Streamer -> Sink ------------------------------------------------------------------------ -- | IO Action with Streamer (/e.g./ Timeout action) ------------------------------------------------------------------------ type StreamAction = Streamer -> IO () ------------------------------------------------------------------------ -- | Get current source ------------------------------------------------------------------------ getSource :: Streamer -> Identifier getSource s = case strmSrc s of Nothing -> "" Just (i,_) -> i ------------------------------------------------------------------------ -- | Filter subset of streams; usually you want to filter -- a subset of streams to which to relay an incoming stream. -- Note that the result is just a list of stream identifiers, -- which of course could be used directly in the first place. -- A meaningful use of filterstreams would be, for instance: -- -- > let targets = filterStreams s (/= getSource s) -- -- Where all streams but the source are selected. ------------------------------------------------------------------------ filterStreams :: Streamer -> (Identifier -> Bool) -> [Identifier] filterStreams s p = map fst $ Map.toList $ Map.filterWithKey (\k _ -> p k) $ strmIdx s {- $sinks To manipulate and relay incoming streams, the application passes a 'StreamSink' to 'withStreams'. The following sinks are building blocks for more application-focused manipulations. The peculiarities of the zeromq library, in particular the fact that messages are sent entirely, /i.e./ with all segments belonging to the same message, or not at all, require some care in designing zeromq sinks. The sink must ensure to mark the last segment sent (see 'Z.SndMore'). Also, the incoming stream should be exhausted to avoid message segements lingering around in the pipe. Applications can construct new sinks by either calling a building block in the their own sink code, /e.g./: > example :: [B.ByteString] -> StreamSink > example headers s is = do > mbX <- C.await > case mbX of > Nothing -> return () > Just x -> do stream s is headers > passAll s is or by combining a sink with a conduit forming a more complex sink, /e.g./: > example :: StreamSink > example s is = sourceList headers =$ passAll s is -} ------------------------------------------------------------------------ -- | Send the 'ByteString' segments to the outgoing streams -- identified by ['Identifier']. -- The stream is terminated. ------------------------------------------------------------------------ stream :: Streamer -> [Identifier] -> [B.ByteString] -> Sink stream s ds ms = case idsToSocks s ds of Left e -> liftIO $ throwIO $ ProtocolExc e Right ss -> go ms ss where go [] _ = return () go [x] ss = liftIO (multiSend ss x []) go (x:xs) ss = liftIO (multiSend ss x [Z.SndMore]) >> go xs ss ------------------------------------------------------------------------ -- | Send the 'ByteString' segments to the outgoing streams -- identified by ['Identifier'] -- without terminating the stream, -- /i.e./ more segments must be sent. ------------------------------------------------------------------------ part :: Streamer -> [Identifier] -> [B.ByteString] -> Sink part s ds ms = case idsToSocks s ds of Left e -> liftIO $ throwIO $ ProtocolExc e Right ss -> go ss where go ss = liftIO $ mapM_ (\x -> multiSend ss x [Z.SndMore]) ms ------------------------------------------------------------------------ -- | Pass all segments of an incoming stream -- to a list of outgoing streams. -- The stream is terminated. ------------------------------------------------------------------------ passAll :: Streamer -> [Identifier] -> Sink passAll s ds = case idsToSocks s ds of Left e -> liftIO $ throwIO $ ProtocolExc e Right ss -> do mbI <- C.await -- C.awaitForever $ \i -> do case mbI of Nothing -> return () Just i -> go ss i where go ss i = do mbN <- C.await case mbN of Nothing -> liftIO (multiSend ss i []) Just n -> liftIO (multiSend ss i [Z.SndMore]) >> go ss n ------------------------------------------------------------------------ -- | Pass one segment and ignore the remainder of the stream. -- The stream is terminated. ------------------------------------------------------------------------ pass1 :: Streamer -> [Identifier] -> Sink pass1 s ds = case idsToSocks s ds of Left e -> liftIO $ throwIO $ ProtocolExc e Right ss -> do mbX <- C.await case mbX of Nothing -> return () Just x -> liftIO (multiSend ss x []) >> ignoreStream ------------------------------------------------------------------------ -- | Pass n segments and ignore the remainder of the stream. -- The stream is terminated. ------------------------------------------------------------------------ passN :: Streamer -> [Identifier] -> Int -> Sink passN s ds n | n <= 0 = ignoreStream | otherwise = case idsToSocks s ds of Left e -> liftIO $ throwIO $ ProtocolExc e Right ss -> do mbX <- C.await case mbX of Nothing -> return () Just x -> go ss n x where go ss i x | i <= 0 = return () | i == 1 = do liftIO (multiSend ss x []) ignoreStream | otherwise = do mbY <- C.await case mbY of Nothing -> liftIO (multiSend ss x []) Just y -> do liftIO (multiSend ss x [Z.SndMore]) go ss (i-1) y ------------------------------------------------------------------------ -- | Pass while condition is true and ignore the remainder of the stream. -- The stream is terminated. ------------------------------------------------------------------------ passWhile :: Streamer -> [Identifier] -> (B.ByteString -> Bool) -> Sink passWhile s ds f = case idsToSocks s ds of Left e -> liftIO $ throwIO $ ProtocolExc e Right ss -> do mbX <- C.await case mbX of Nothing -> return () Just x | f x -> go ss x | otherwise -> return () where go ss x = do mbY <- C.await case mbY of Nothing -> liftIO (multiSend ss x []) Just y | f y -> do liftIO (multiSend ss x [Z.SndMore]) go ss y | otherwise -> do liftIO (multiSend ss x []) ignoreStream ------------------------------------------------------------------------ -- | Ignore an incoming stream ------------------------------------------------------------------------ ignoreStream :: Sink ignoreStream = do mb <- C.await case mb of Nothing -> return () Just _ -> ignoreStream {- $controller The controller is passed in to the control action of 'withStreams'. It allows the application to control the polling device. Through the controller, the device can be stopped, restarted, paused and resumed and it is possible to send and receive streams through the controler. To relay streams to the controller (/i.e./ directly to application code) the 'internal' stream, which is identified by the string \"_internal\" can be used. -} ------------------------------------------------------------------------ -- | The internal stream that represents the 'Controller'. -- StreamSinks can write to this stream, /e.g./: -- -- > passAll s [internal] -- -- And the streamer may also receive from this stream, /e.g./: -- -- > if getSource s == internal ------------------------------------------------------------------------ internal :: Identifier internal = "_internal" ------------------------------------------------------------------------ -- | Controller ------------------------------------------------------------------------ data Controller = Controller { ctrlCtx :: Context, ctrlCmd :: Z.Socket Z.XReq, ctrlOpts :: [Z.SocketOption]} ------------------------------------------------------------------------ -- | Control Action ------------------------------------------------------------------------ type Control a = Controller -> IO a ------------------------------------------------------------------------ -- Get the socket from a controller and send a string to it ------------------------------------------------------------------------ sndCmd :: String -> Controller -> IO () sndCmd cmd ctrl = let s = ctrlCmd ctrl in Z.send s (B.pack cmd) [] ------------------------------------------------------------------------ -- | Stop streams ------------------------------------------------------------------------ stop :: Controller -> IO () stop = sndCmd "stop" ------------------------------------------------------------------------ -- | Pause streams ------------------------------------------------------------------------ pause :: Controller -> IO () pause = sndCmd "pause" ------------------------------------------------------------------------ -- | Resume streams ------------------------------------------------------------------------ resume :: Controller -> IO () resume = sndCmd "resume" ------------------------------------------------------------------------ -- | Receive a stream through the controller -- that was sink\'d to the target 'internal'. ------------------------------------------------------------------------ receive :: Controller -> Timeout -> SinkR (Maybe a) -> IO (Maybe a) receive c tmo snk = do mb_ <- waitForRecv (ctrlCmd c) tmo case mb_ of Nothing -> return Nothing Just _ -> C.runResourceT $ recv (ctrlCmd c) $$ snk ------------------------------------------------------------------------ -- | Send a stream through the controller ------------------------------------------------------------------------ send :: Controller -> [Identifier] -> Source -> IO () send c is src = let s = ctrlCmd c in do Z.send s (B.pack "send") [Z.SndMore] sendIds s C.runResourceT $ src $$ relay s where sendIds s = do mapM_ (sendId s) is Z.send s B.empty [Z.SndMore] sendId s i = Z.send s (B.pack i) [Z.SndMore] ------------------------------------------------------------------------ -- | A poll entry describes how to access and identify a socket ------------------------------------------------------------------------ data PollEntry = Poll { -- | How to address this particular stream pollId :: Identifier, -- | The address to link to pollAdd :: String, -- | The zeromq socket type pollType :: AccessType, -- | How to link (bind or connect) pollLink :: LinkType, -- | List of 'Service' (or topics) -- for subscribers pollSub :: [Service], -- | zeromq socket options pollOs :: [Z.SocketOption] } deriving (Show, Read) instance Eq PollEntry where x == y = pollId x == pollId y ------------------------------------------------------------------------ -- | Defines the type of a 'PollEntry'; -- the names of the constructors are similar -- to the corresponding ZMQ socket types. ------------------------------------------------------------------------ data AccessType = -- | Represents a server and expects connections from clients; -- corresponds to ZMQ Socket Type 'Z.Rep' ServerT -- | Represents a client and connects to a server; -- corresponds to ZMQ Socket Type 'Z.Req' | ClientT -- | Represents a load balancer, -- expecting connections from clients; -- corresponds to ZMQ Socket Type 'Z.XRep' | RouterT -- | Represents a router -- expecting connections from servers; -- corresponds to ZMQ Socket Type 'Z.XReq' | DealerT -- | Represents a publisher; -- corresponds to ZMQ Socket Type 'Z.Pub' | PubT -- | Represents a subscriber; -- corresponds to ZMQ Socket Type 'Z.Sub' | SubT -- | Represents a Pipe; -- corresponds to ZMQ Socket Type 'Z.Push' | PipeT -- | Represents a Puller; -- corresponds to ZMQ Socket Type 'Z.Pull' | PullT -- | Represents a Peer; -- corresponds to ZMQ Socket Type 'Z.Pair' | PeerT deriving (Eq, Show, Read) ------------------------------------------------------------------------ -- | Safely read 'AccessType'; -- ignores the case of the input string -- (/e.g./ \"servert\" -> 'ServerT') ------------------------------------------------------------------------ parseAccess :: String -> Maybe AccessType parseAccess s = case map toLower s of "servert" -> Just ServerT "clientt" -> Just ClientT "routert" -> Just RouterT "dealert" -> Just DealerT "pubt" -> Just PubT "subt" -> Just SubT "pipet" -> Just PipeT "pullt" -> Just PullT "peert" -> Just PeerT _ -> Nothing ------------------------------------------------------------------------- -- | Safely read 'LinkType'; -- ignores the case of the input string -- and, besides \"bind\" and \"connect\", -- also accepts \"bin\", \"con\" and \"conn\"; -- intended for use with command line parameters ------------------------------------------------------------------------- parseLink :: String -> Maybe LinkType parseLink s = case map toLower s of "bind" -> Just Bind "bin" -> Just Bind "b" -> Just Bind "c" -> Just Connect "con" -> Just Connect "conn" -> Just Connect "connect" -> Just Connect _ -> Nothing ------------------------------------------------------------------------- -- | Binds or connects a socket to an address ------------------------------------------------------------------------- link :: LinkType -> Z.Socket a -> String -> [Z.SocketOption] -> IO () link t s add os = do setSockOs s os case t of Bind -> Z.bind s add Connect -> trycon s add 10 ------------------------------------------------------------------------ -- some helpers ------------------------------------------------------------------------ retries :: Int retries = 100 ------------------------------------------------------------------------ -- try n times to connect -- this is particularly useful for "inproc" sockets: -- the socket, in this case, must be bound before we can connect to it. ------------------------------------------------------------------------ trycon :: Z.Socket a -> String -> Int -> IO () trycon sock add i = catch (Z.connect sock add) (\e -> if i <= 0 then throwIO (e::SomeException) else do threadDelay 1000 trycon sock add (i-1)) ------------------------------------------------------------------------- -- Sets Socket Options ------------------------------------------------------------------------- setSockOs :: Z.Socket a -> [Z.SocketOption] -> IO () setSockOs s = mapM_ (Z.setOption s) {- $example The following code implements a ping pong communication using two streamers. The code is somewhat simplistic; it does not use timeout, ignores errors and does not provide means for clean shutdown. It focuses instead on demonstrating the core of the streamer functionality. For more examples on how to use streams, you may want to refer to the MDP Broker code in Network.Mom.Patterns.Broker.Broker. > import Control.Monad.Trans > import Control.Monad (forever) > import Control.Concurrent > import qualified Data.Conduit as C > import qualified Data.ByteString.Char8 as B > import Network.Mom.Patterns.Streams > import qualified System.ZMQ as Z > main :: IO () > main = Z.withContext 1 $ \ctx -> do > ready <- newEmptyMVar > _ <- forkIO (ping ctx ready) > _ <- forkIO (pong ctx ready) > forever $ threadDelay 100000 > ping :: Z.Context -> MVar () -> IO () > ping ctx ready = withStreams ctx "pong" (-1) > [Poll "ping" "inproc://ping" PeerT Bind [] []] > (\_ -> return ()) -- no timeout > (\_ _ _ -> return ()) -- ignore errors > pinger $ \c -> do > putMVar ready () -- ping is ready > putStrLn "starting game!" > send c ["ping"] startPing -- send through controller > -- to initialise ping pong > putStrLn "game started!" > forever $ threadDelay 100000 > where startPing = C.yield $ B.pack "ping" > pong :: Z.Context -> MVar () -> IO () > pong ctx ready = do > _ <- takeMVar ready -- wait for ping getting ready > withStreams ctx "ping" (-1) > [Poll "pong" "inproc://ping" PeerT Connect [] []] > (\_ -> return ()) > (\_ _ _ -> return ()) > pinger $ \_ -> forever $ threadDelay 100000 > pinger :: StreamSink > pinger s = C.awaitForever $ \i -> > let x = B.unpack i > in do liftIO $ putStrLn x > liftIO $ threadDelay 500000 > case x of > "ping" -> stream s ["pong"] [B.pack "pong"] > "pong" -> stream s ["ping"] [B.pack "ping"] > _ -> return () -}