-- | -- Module: Network.Transport.ZMQ -- Copyright: (C) 2014, EURL Tweag -- License: BSD-3 {-# LANGUAGE DeriveGeneric, StandaloneDeriving, OverloadedStrings, DeriveDataTypeable, CPP #-} module Network.Transport.ZMQ ( -- * Main API createTransport -- :: ZMQParameters -> ByteString -> IO (Either ZMQError Transport) , ZMQParameters(..) , ZMQAuthType(..) , defaultZMQParameters -- :: ZMQParameters -- * Internals -- $internals , createTransportEx , breakConnectionEndPoint -- :: ZMQTransport -> EndPointAddress -> EndPointAddress -> IO () , breakConnection -- :: ZMQTransport -> EndPointAddress -> EndPointAddress -> IO () , unsafeConfigurePush -- * Design -- $design -- ** Multicast -- $multicast ) where import Network.Transport.ZMQ.Types import Control.Applicative import Control.Concurrent ( yield , threadDelay ) import qualified Control.Concurrent.Async as Async import Control.Concurrent.MVar import Control.Concurrent.STM import Control.Concurrent.STM.TMChan import Control.Monad ( void , forever , unless , join , forM_ , foldM , when , (<=<) ) import Control.Exception ( AsyncException ) import Control.Monad.Catch ( finally , try , throwM , Exception , SomeException , fromException , mask , mask_ , uninterruptibleMask_ , onException ) import Data.Binary import Data.ByteString (ByteString) import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Char8 as B8 import Data.Foldable ( traverse_ ) import qualified Data.Foldable as Foldable import Data.IORef ( newIORef , modifyIORef , readIORef , writeIORef ) import Data.List.NonEmpty import qualified Data.Map.Strict as Map -- import Data.Set (Set) import qualified Data.Set as Set import qualified Data.Traversable as Traversable import Data.Typeable import Data.Void import GHC.Generics ( Generic ) import Network.Transport import Network.Transport.ZMQ.Types import System.IO ( fixIO ) import System.ZMQ4 ( Context ) import qualified System.ZMQ4 as ZMQ import qualified System.ZMQ4.Utils as ZMQ import Text.Printf -------------------------------------------------------------------------------- --- Internal datatypes -- -------------------------------------------------------------------------------- -- $design -- -- In the zeromq backend we are using following address scheme: -- -- @ -- tcp:\/\/host:port\/ -- | | | -- | +---+---------------- can be configured by user, one host, -- | port pair per distributed process -- | instance -- +---------------------------- In feature it will be possible to add another schemas. -- @ -- -- Transport specifies host that will be used, and port will be -- automatically generated by the transport for each endpoint. -- -- * Connections Reliability -- -- Currently only reliable connections are supportred. In case if -- Unreliable type was passed reliable connection will be created. -- This may be changed in future versions. -- -- Network-transport-zeromq maintains one thread for each endpoint that is -- used to read incomming requests, all sends and connection requests are -- handled from the user thread and mostly asynchronous. -- -- Each endpoint obtains one pull socket and push socket for each remote -- end point, all lightweight threads abrigged into one heavyweight -- connection. -- -- Virually connections looks like the following plot: -- -- @ -- +------------------+ +--------------+ -- | Endpoint | | Endpoint | -- | | | | -- | +------+ +------+ | -- | | pull |<-----------------------| push | | -- + +------+ +------+ | -- | | +--------------------+ | | -- + +------+/~~~~ connection 1 ~~~~\\+------+ | -- | | push |~~~~~ connection 2 ~~~~~| pull | | -- | +------+\\ /+------+ | -- | | +--------------------+ | | -- +------------------+ +--------------+ -- @ -- -- Physically 0mq may choose better representations and mapping on -- a real connections. -- -- ZeroMQ connection takes care of reliability thus for correct lifeness, -- it stores messages so in case of connecdtion break in may be restarted -- with no message loss. So heartbeating procedure should be introduced on -- the top on network-transport, and 'breakConnection' should be used to notify -- connection death. However if High Water Mark will be reached connection -- to endpoint considered failed and connection break procedure started -- automatically. -- -- Naming conventions. -- api* -- functions that can be called by user -- localEndPoint -- internal functions that are used in endpoints -- remoteEndPoint -- internal functions that are used in endpoints -- -- Transport -- |--- Valid transport itself -- | |--- newEndpoint -- | |--- close -- |--- Closed -- -- LocaEndPoint -- RemoteEndPoint -- Connection -- Connection closing procedure. -- Once endpoint is gracefully closed by user or by transport close -- RemoteEndPoint goes into 'RemoteEndPointClosing' state and sends -- a message to remote endpoint if link is known as alive, in -- RemoteEndPointClosing all messages are ignored and EndPointClosed -- is returned. Uppon a EndPointClose delivery endpoint replies with -- EndPointClose message, and starts cleanup procedure (marking link -- as not alive). When endpoint receives EndPointCloseOk message it -- moved RemoteEndPoint to close state and removes all data structures. -- Together with marking a endpoint as closed asynchronous timeout is -- set, and if EndPointCloseOk is not delivered withing that timeperiod -- EndPoint is closed. -- -- endpoint-1 endpoint-2 -- 1. mark as closing -- [RemoteEndPoint:Closing] ----endPointClose-> [remoteEndPoint:Valid] -- 2. mark as closed -- <-endPointCloseOk- [remoteEndPoint:Closed] -- 2. mark as closed -- [RemoteEndPoint:Closed] 4. cleanup remote end point -- -- EndPoint can be closed in a two ways: normally and abnormally (in case -- of exception or invariant vionation on a remove side). If EndPoint is -- closed normally all opened connections will recevice ConnectionClosed -- events, otherwise. -- -- Multicast group schema: -- -- +--------------+ -- | EndPoint |<----------------+ -- +--------------+ | events -- | | -- +------------------+ | -- | | -- +------------------------------+ -- | Multicast Group (Server) | -- +--+ +--+ +--+ +---+ -- | Rep | | Pub | | Sub | -- +-----+ +-----+ +-----+ -- ^ | ^ -- request | | | -- +------------+ +--------+ messages -- | | -- | +-----------+ -- | | messages -- +-----+ +-----+ -- | Req | | Sub | -- +---+ +--+ +--+ -- | Multicast Group | -- +---------------------+ -- -- Resolve Group automatically subscribes to the group to receive events, -- however unless 'subscribe' will be called endpoint will not reveive any -- message. This is done in order to keep ability to notify receiver about -- multcast group close. -- -- Current solution incorrectly keeps track of incomming connections, this -- means that we can't really guarantee that close message is delivered to -- every recipient and it disconnected correctly. -- -- $internals -- Internal function provides additional 0mq specific set of configuration -- options, this functionality should be used with caution as it may break -- required socket properties. -- | Messages data ZMQMessage = MessageConnect !EndPointAddress -- ^ Connection greeting | MessageInitConnection !EndPointAddress !ConnectionId !Reliability | MessageInitConnectionOk !EndPointAddress !ConnectionId !ConnectionId | MessageCloseConnection !ConnectionId | MessageData !ConnectionId | MessageEndPointClose !EndPointAddress !Bool | MessageEndPointCloseOk !EndPointAddress deriving (Generic) {- instance Binary ZMQMessage where put (MessageConnect ep) = putWord64le 0 >> put ep put (MessageInitConnection ep cid rel) = putWord64le 2 >> put ep >> put cid >> put rel put (MessageInitConnectionOk ep cid rid) = putWord64le 3 >> put ep >> put cid >> put rid put (MessageCloseConnection cid) = putWord64le 4 >> put cid put (MessageEndPointClose ep) = putWord64le 5 >> put ep put (MessageData cid) = putWord64le cid get = do x <- getWord64be case x of 0 -> MessageConnect <$> get 2 -> MessageInitConnection <$> get <*> get <*> get 3 -> MessageInitConnectionOk <$> get <*> get <*> get 4 -> MessageCloseConnection <$> get 5 -> MessageEndPointClose <$> get 6 -> MessageData <$> pure x reservedConnectionId = 7 -} instance Binary ZMQMessage data ZMQError = InvariantViolation String | IncorrectState String | ConnectionFailed deriving (Typeable, Show) instance Exception ZMQError -- | Create 0MQ based transport. createTransport :: ZMQParameters -> ByteString -- ^ Transport address (IP or hostname) -> IO (Either (TransportError Void) Transport) createTransport z b = fmap (fmap snd) (createTransportEx z b) -- | Create 'Transport' and export internal transport state that may be -- used in API. createTransportEx :: ZMQParameters -- ^ Transport features. -> ByteString -- ^ Host name or IP address -> IO (Either (TransportError Void) (ZMQTransport, Transport)) createTransportEx params host = do ctx <- ZMQ.context mtid <- case authorizationType params of ZMQAuthPlain user pass -> Just <$> ZMQ.authManager ctx user pass _ -> return Nothing transport <- ZMQTransport <$> pure addr <*> newMVar (TransportValid $ ValidTransportState ctx Map.empty mtid) return $ Right (transport, Transport { newEndPoint = apiNewEndPoint params transport , closeTransport = apiTransportClose transport }) where addr = B.concat ["tcp://", host] -- Synchronous apiTransportClose :: ZMQTransport -> IO () apiTransportClose transport = mask_ $ do old <- swapMVar (_transportState transport) TransportClosed case old of TransportClosed -> return () TransportValid (ValidTransportState ctx m mtid) -> do case mtid of Nothing -> return () Just tid -> Async.cancel tid forM_ (Map.elems m) $ apiCloseEndPoint transport ZMQ.term ctx apiNewEndPoint :: ZMQParameters -> ZMQTransport -> IO (Either (TransportError NewEndPointErrorCode) EndPoint) apiNewEndPoint params transport = do -- printf "[transport] endpoint create\n" elep <- modifyMVar (_transportState transport) $ \case TransportClosed -> return (TransportClosed, Left $ TransportError NewEndPointFailed "Transport is closed.") v@(TransportValid i@(ValidTransportState ctx _ _)) -> do eEndPoint <- endPointCreate params ctx (B8.unpack addr) case eEndPoint of Right (_port, ep, chan) -> return ( TransportValid i{_transportEndPoints = Map.insert (localEndPointAddress ep) ep (_transportEndPoints i)} , Right (ep, ctx, chan)) Left _ -> return (v, Left $ TransportError NewEndPointFailed "Failed to create new endpoint.") case elep of Right (ep,ctx, chOut) -> return $ Right $ EndPoint { receive = atomically $ do mx <- readTMChan chOut case mx of Nothing -> error "channel is closed" Just x -> return x , address = localEndPointAddress ep , connect = apiConnect params ctx ep , closeEndPoint = apiCloseEndPoint transport ep , newMulticastGroup = apiNewMulticastGroup params transport ep , resolveMulticastGroup = apiResolveMulticastGroup transport ep } Left x -> return $ Left x where addr = transportAddress transport -- | Asynchronous operation, shutdown of the remote end point may take a while apiCloseEndPoint :: ZMQTransport -> LocalEndPoint -> IO () apiCloseEndPoint transport lep = mask_ $ do -- printf "[%s][go] close endpoint\n" -- (B8.unpack $ endPointAddressToByteString $ _localEndPointAddress lep) old <- readMVar (localEndPointState lep) case old of LocalEndPointValid (ValidLocalEndPoint x _ _ threadId _ _) -> do -- close channel, no events will be received atomically $ do writeTMChan x EndPointClosed closeTMChan x Async.cancel threadId void $ Async.waitCatch threadId LocalEndPointClosed -> return () modifyMVar_ (_transportState transport) $ \case TransportClosed -> return TransportClosed TransportValid v -> return $ TransportValid v{_transportEndPoints = Map.delete (localEndPointAddress lep) (_transportEndPoints v)} endPointCreate :: ZMQParameters -> Context -> String -> IO (Either (TransportError NewEndPointErrorCode) (Int,LocalEndPoint, TMChan Event)) endPointCreate params ctx addr = do em <- try $ do pull <- ZMQ.socket ctx ZMQ.Pull case authorizationType params of ZMQNoAuth -> return () ZMQAuthPlain{} -> do ZMQ.setPlainServer True pull ZMQ.setSendHighWM (ZMQ.restrict (highWaterMark params)) pull port <- ZMQ.bindFromRangeRandom pull addr (minPort params) (maxPort params) (maxTries params) return (port, pull) case em of Right (port, pull) -> (do chOut <- newTMChanIO lep <- LocalEndPoint <$> pure (EndPointAddress $ B8.pack (addr ++ ":" ++ show port)) <*> newEmptyMVar <*> pure port opened <- newIORef True mask $ \restore -> do thread <- Async.async $ (restore (receiver pull lep chOut)) `finally` finalizeEndPoint lep port pull putMVar (localEndPointState lep) $ LocalEndPointValid (ValidLocalEndPoint chOut (Counter 0 Map.empty) Map.empty thread opened Map.empty) return $ Right (port, lep, chOut)) `onException` (ZMQ.close pull) Left (_e::SomeException) -> do return $ Left $ TransportError NewEndPointInsufficientResources "no free sockets" where finalizer pull ourEp = forever $ do (cmd:_) <- ZMQ.receiveMulti pull case decode' cmd of MessageEndPointCloseOk theirAddress -> getRemoteEndPoint ourEp theirAddress >>= \case Nothing -> return () Just rep -> do state <- swapMVar (remoteEndPointState rep) RemoteEndPointClosed closeRemoteEndPoint ourEp rep state _ -> return () -- XXX: send exception receiver :: ZMQ.Socket ZMQ.Pull -> LocalEndPoint -> TMChan Event -> IO () receiver pull ourEp chan = forever $ do (cmd:msgs) <- ZMQ.receiveMulti pull case decode' cmd of MessageData idx -> atomically $ writeTMChan chan (Received idx msgs) MessageConnect theirAddress -> do -- printf "[%s] message connect from %s\n" -- (B8.unpack $ endPointAddressToByteString ourAddr) -- (B8.unpack $ endPointAddressToByteString theirAddress) void $ createOrGetRemoteEndPoint params ctx ourEp theirAddress MessageInitConnection theirAddress theirId rel -> do -- printf "[%s] message init connection from %s\n" -- (B8.unpack $ endPointAddressToByteString ourAddr) -- (B8.unpack $ endPointAddressToByteString theirAddress) join $ do modifyMVar (localEndPointState ourEp) $ \case LocalEndPointValid v -> case theirAddress `Map.lookup` r of Nothing -> return (LocalEndPointValid v, throwM $ InvariantViolation "Remote endpoint should exist.") Just rep -> modifyMVar (remoteEndPointState rep) $ \case RemoteEndPointFailed -> throwM $ InvariantViolation "RemoteEndPoint should be valid." RemoteEndPointClosed -> throwM $ InvariantViolation "RemoteEndPoint should be valid." RemoteEndPointClosing{} -> throwM $ InvariantViolation "RemoteEndPoint should be valid." w@RemoteEndPointValid{} -> do conn <- ZMQConnection <$> pure ourEp <*> pure rep <*> pure rel <*> newMVar (ZMQConnectionValid $ ValidZMQConnection Nothing i) <*> newEmptyMVar w' <- register (succ i) w return ( w' , ( LocalEndPointValid v{ _localEndPointConnections = Counter (succ i) (Map.insert (succ i) conn m) } , return ()) ) z@(RemoteEndPointPending w) -> do conn <- ZMQConnection <$> pure ourEp <*> pure rep <*> pure rel <*> newMVar (ZMQConnectionValid $ ValidZMQConnection Nothing i) <*> newEmptyMVar modifyIORef w (\xs -> (register (succ i)) : xs) return ( z , ( LocalEndPointValid v{ _localEndPointConnections = Counter (succ i) (Map.insert (succ i) conn m) } , return ()) ) where r = _localEndPointRemotes v (Counter i m) = _localEndPointConnections v _ -> throwM $ InvariantViolation "RemoteEndPoint should be valid." where register i RemoteEndPointFailed = do atomically $ do writeTMChan chan (ConnectionOpened i rel theirAddress) writeTMChan chan (ConnectionClosed i) return RemoteEndPointFailed register i RemoteEndPointClosed = do atomically $ do writeTMChan chan (ConnectionOpened i rel theirAddress) writeTMChan chan (ConnectionClosed i) return RemoteEndPointClosed register i (RemoteEndPointClosing x) = do atomically $ do writeTMChan chan (ConnectionOpened i rel theirAddress) writeTMChan chan (ConnectionClosed i) return $ RemoteEndPointClosing x register _ RemoteEndPointPending{} = throwM $ InvariantViolation "RemoteEndPoint can't be pending." register i (RemoteEndPointValid v@(ValidRemoteEndPoint sock _ s _)) = do ZMQ.send sock [] $ encode' (MessageInitConnectionOk ourAddr theirId i) atomically $ writeTMChan chan (ConnectionOpened i rel theirAddress) return $ RemoteEndPointValid v{_remoteEndPointIncommingConnections = Set.insert i s} MessageCloseConnection idx -> join $ do -- printf "[%s] message close connection: %i\n" -- (B8.unpack $ endPointAddressToByteString ourAddr) -- idx modifyMVar (localEndPointState ourEp) $ \case LocalEndPointValid v -> case idx `Map.lookup` m of Nothing -> return (LocalEndPointValid v, return ()) Just conn -> do old <- swapMVar (connectionState conn) ZMQConnectionFailed return ( LocalEndPointValid v{ _localEndPointConnections = Counter i (idx `Map.delete` m)} , case old of ZMQConnectionFailed -> return () ZMQConnectionInit -> return () -- throwM InvariantViolation ZMQConnectionClosed -> return () ZMQConnectionValid (ValidZMQConnection _ _) -> do atomically $ writeTMChan chan (ConnectionClosed idx) connectionCleanup (connectionRemoteEndPoint conn) idx) where (Counter i m) = _localEndPointConnections v LocalEndPointClosed -> return (LocalEndPointClosed, return ()) MessageInitConnectionOk theirAddress ourId theirId -> do -- printf "[%s] message init connection ok: %i -> %i\n" -- (B8.unpack $ endPointAddressToByteString ourAddr) -- ourId -- theirId join $ withMVar (localEndPointState ourEp) $ \case LocalEndPointValid v -> case theirAddress `Map.lookup` r of Nothing -> return (return ()) -- XXX: send message to the host Just rep -> modifyMVar (remoteEndPointState rep) $ \case RemoteEndPointFailed -> return (RemoteEndPointFailed, return ()) RemoteEndPointClosed -> throwM $ InvariantViolation "RemoteEndPoint should be valid or failed." RemoteEndPointClosing{} -> throwM $ InvariantViolation "RemoteEndPoint should be valid or failed." t@(RemoteEndPointValid (ValidRemoteEndPoint sock (Counter x m) s z)) -> do case ourId `Map.lookup` m of Nothing -> return (t, return ()) -- XXX: send message to the hostv Just c -> mask_ $ do return (RemoteEndPointValid (ValidRemoteEndPoint sock (Counter x (ourId `Map.delete` m)) s (z+1)) , do modifyMVar_ (connectionState c) $ \case ZMQConnectionFailed -> return ZMQConnectionFailed ZMQConnectionInit -> return $ ZMQConnectionValid (ValidZMQConnection (Just sock) theirId) ZMQConnectionClosed -> do ZMQ.send sock [] $ encode' (MessageCloseConnection theirId) return ZMQConnectionClosed ZMQConnectionValid _ -> throwM $ InvariantViolation "RemoteEndPoint should be closed" void $ tryPutMVar (connectionReady c) () ) RemoteEndPointPending p -> return (RemoteEndPointPending p, undefined) where r = _localEndPointRemotes v LocalEndPointClosed -> return $ return () -- printf "[%s] message init connection ok [ok]\n" -- (B8.unpack $ endPointAddressToByteString ourAddr) MessageEndPointClose theirAddress True -> getRemoteEndPoint ourEp theirAddress >>= \case Nothing -> return () Just rep -> do onValidRemote rep $ \v -> ZMQ.send (_remoteEndPointChan v) [] (encode' $ MessageEndPointCloseOk $ localEndPointAddress ourEp) remoteEndPointClose True ourEp rep MessageEndPointClose theirAddress False -> getRemoteEndPoint ourEp theirAddress >>= \case Nothing -> return () Just rep -> do mst <- cleanupRemoteEndPoint ourEp rep Nothing case mst of Nothing -> return () Just st -> do onValidEndPoint ourEp $ \v -> atomically $ writeTMChan (_localEndPointChan v) $ ErrorEvent $ TransportError (EventConnectionLost theirAddress) "Exception on remote side" closeRemoteEndPoint ourEp rep st MessageEndPointCloseOk theirAddress -> getRemoteEndPoint ourEp theirAddress >>= \case Nothing -> return () Just rep -> do state <- swapMVar (remoteEndPointState rep) RemoteEndPointClosed closeRemoteEndPoint ourEp rep state where ourAddr = localEndPointAddress ourEp finalizeEndPoint ourEp port pull = do -- printf "[%s] finalize-end-point\n" -- (B8.unpack $ endPointAddressToByteString $ _localEndPointAddress ourEp) join $ withMVar (localEndPointState ourEp) $ \case LocalEndPointClosed -> afterP () LocalEndPointValid v -> do writeIORef (_localEndPointOpened v) False return $ do tid <- Async.async $ finalizer pull ourEp void $ Async.mapConcurrently (remoteEndPointClose False ourEp) $ _localEndPointRemotes v Async.cancel tid ZMQ.unbind pull (addr ++ ":" ++ show port) ZMQ.close pull void $ swapMVar (localEndPointState ourEp) LocalEndPointClosed apiSend :: ZMQConnection -> [ByteString] -> IO (Either (TransportError SendErrorCode) ()) #ifdef UNSAFE_SEND apiSend c@(ZMQConnection l e _ s _) b = mask_ $ join $ withMVar s $ \case ZMQConnectionInit -> return $ yield >> apiSend c b ZMQConnectionClosed -> afterP $ Left $ TransportError SendClosed "Connection is closed" ZMQConnectionFailed -> afterP $ Left $ TransportError SendFailed "Connection is failed" ZMQConnectionValid (ValidZMQConnection (Just ch) idx) -> do o <- readIORef (remoteEndPointOpened e) if o then do evs <- ZMQ.events ch if ZMQ.Out `elem` evs then do ZMQ.sendMulti ch $ encode' (MessageData idx) :| b afterP $ Right () else return $ do mz <- cleanupRemoteEndPoint l e Nothing case mz of Nothing -> return () Just z -> do onValidEndPoint l $ \v -> atomically $ writeTMChan (_localEndPointChan v) $ ErrorEvent $ TransportError (EventConnectionLost (remoteEndPointAddress e)) "Exception on remote side" closeRemoteEndPoint l e z return $ Left $ TransportError SendFailed "Connection broken." else afterP $ Left $ TransportError SendFailed "Connection broken." _ -> afterP $ Left $ TransportError SendFailed "Incorrect channel." #else apiSend (ZMQConnection l e _ s _) b = mask_ $ do result <- trySome inner case result of Left ex -> do cleanup return $ Left $ TransportError SendFailed (show ex) Right x -> return x where inner = join $ withMVar (remoteEndPointState e) $ \x -> case x of RemoteEndPointFailed -> afterP $ Left $ TransportError SendFailed "Remote end point is failed." RemoteEndPointClosed -> afterP $ Left $ TransportError SendFailed "Remote end point is closed." RemoteEndPointClosing{} -> afterP $ Left $ TransportError SendFailed "Remote end point is closing." RemoteEndPointPending{} -> return $ yield >> inner RemoteEndPointValid v -> withMVar s $ \case ZMQConnectionInit -> return $ yield >> inner -- readMVar (connectionReady c) >> inner ZMQConnectionClosed -> afterP $ Left $ TransportError SendClosed "Connection is closed" ZMQConnectionFailed -> afterP $ Left $ TransportError SendFailed "Connection is failed" ZMQConnectionValid (ValidZMQConnection _ idx) -> do evs <- ZMQ.events (_remoteEndPointChan v) if ZMQ.Out `elem` evs then do ZMQ.sendMulti (_remoteEndPointChan v) $ encode' (MessageData idx) :| b afterP $ Right () else return $ do mz <- cleanupRemoteEndPoint l e Nothing case mz of Nothing -> return () Just z -> do onValidEndPoint l $ \w -> atomically $ writeTMChan (_localEndPointChan w) $ ErrorEvent $ TransportError (EventConnectionLost (remoteEndPointAddress e)) "Exception on remote side" closeRemoteEndPoint l e z return $ Left $ TransportError SendFailed "Connection broken." cleanup = do void $ cleanupRemoteEndPoint l e (Just $ \v -> ZMQ.send (_remoteEndPointChan v) [] $ encode' (MessageEndPointClose (localEndPointAddress l) False)) onValidEndPoint l $ \v -> atomically $ do writeTMChan (_localEndPointChan v) $ ErrorEvent $ TransportError (EventConnectionLost (remoteEndPointAddress e)) "Exception on send." #endif -- 'apiClose' function is asynchronous, as connection may not exists by the -- time of the calling to this function. In this case function just marks -- connection as closed, so all subsequent calls from the user side will -- "think" that the connection is closed, and remote side will be contified -- only after connection will be up. apiClose :: ZMQConnection -> IO () apiClose (ZMQConnection _ e _ s _) = uninterruptibleMask_ $ join $ do modifyMVar s $ \case ZMQConnectionInit -> return (ZMQConnectionClosed, return ()) ZMQConnectionClosed -> return (ZMQConnectionClosed, return ()) ZMQConnectionFailed -> return (ZMQConnectionClosed, return ()) ZMQConnectionValid (ValidZMQConnection _ idx) -> do return (ZMQConnectionClosed, do modifyMVar_ (remoteEndPointState e) $ \case v@RemoteEndPointClosed -> return v v@RemoteEndPointClosing{} -> return v v@RemoteEndPointFailed -> return v v@RemoteEndPointValid{} -> notify idx v v@(RemoteEndPointPending p) -> modifyIORef p (\xs -> notify idx : xs) >> return v ) where notify _ RemoteEndPointFailed = return RemoteEndPointFailed notify _ (RemoteEndPointClosing x) = return $ RemoteEndPointClosing x notify _ RemoteEndPointClosed = return RemoteEndPointClosed notify _ RemoteEndPointPending{} = throwM $ InvariantViolation "RemoteEndPoint can't be pending." notify idx w@(RemoteEndPointValid (ValidRemoteEndPoint sock _ _ _)) = do ZMQ.send sock [] $ encode' (MessageCloseConnection idx) return w apiConnect :: ZMQParameters -> Context -> LocalEndPoint -> EndPointAddress -> Reliability -> ConnectHints -> IO (Either (TransportError ConnectErrorCode) Connection) apiConnect params ctx ourEp theirAddr reliability _hints = do -- printf "[%s] apiConnect to %s\n" -- (B8.unpack $ endPointAddressToByteString $ _localEndPointAddress ourEp) -- (B8.unpack $ endPointAddressToByteString theirAddr) eRep <- createOrGetRemoteEndPoint params ctx ourEp theirAddr case eRep of Left{} -> return $ Left $ TransportError ConnectFailed "LocalEndPoint is closed." Right rep -> do conn <- ZMQConnection <$> pure ourEp <*> pure rep <*> pure reliability <*> newMVar ZMQConnectionInit <*> newEmptyMVar let apiConn = Connection { send = apiSend conn , close = apiClose conn } join $ modifyMVar (remoteEndPointState rep) $ \w -> case w of RemoteEndPointClosed -> do return ( RemoteEndPointClosed , return $ Left $ TransportError ConnectFailed "Transport is closed.") RemoteEndPointClosing x -> do return ( RemoteEndPointClosing x , return $ Left $ TransportError ConnectFailed "RemoteEndPoint closed.") RemoteEndPointValid _ -> do s' <- go conn w return (s', waitReady conn apiConn) RemoteEndPointPending z -> do modifyIORef z (\zs -> go conn : zs) return ( RemoteEndPointPending z, waitReady conn apiConn) RemoteEndPointFailed -> return ( RemoteEndPointFailed , return $ Left $ TransportError ConnectFailed "RemoteEndPoint failed.") where waitReady conn apiConn = join $ withMVar (connectionState conn) $ \case ZMQConnectionInit{} -> return $ yield {-readMVar (connectionReady conn)-} >> waitReady conn apiConn ZMQConnectionValid{} -> afterP $ Right apiConn ZMQConnectionFailed{} -> afterP $ Left $ TransportError ConnectFailed "Connection failed." ZMQConnectionClosed{} -> throwM $ InvariantViolation "Connection closed." ourAddr = localEndPointAddress ourEp go conn s = inner conn s inner _ RemoteEndPointClosed = return RemoteEndPointClosed inner _ RemoteEndPointPending{} = throwM $ InvariantViolation "Connection pending." inner _ (RemoteEndPointClosing x) = return $ RemoteEndPointClosing x inner _ RemoteEndPointFailed = return RemoteEndPointFailed inner conn (RemoteEndPointValid (ValidRemoteEndPoint sock (Counter i m) s z)) = do ZMQ.send sock [] $ encode' (MessageInitConnection ourAddr i' reliability) return $ RemoteEndPointValid (ValidRemoteEndPoint sock (Counter i' (Map.insert i' conn m)) s z) where i' = succ i getRemoteEndPoint :: LocalEndPoint -> EndPointAddress -> IO (Maybe RemoteEndPoint) getRemoteEndPoint ourEp theirAddr = do withMVar (localEndPointState ourEp) $ \case LocalEndPointValid v -> return $ theirAddr `Map.lookup` (_localEndPointRemotes v) LocalEndPointClosed -> return Nothing createOrGetRemoteEndPoint :: ZMQParameters -> Context -> LocalEndPoint -> EndPointAddress -> IO (Either ZMQError RemoteEndPoint) createOrGetRemoteEndPoint params ctx ourEp theirAddr = join $ do -- printf "[%s] apiConnect to %s\n" -- saddr -- (B8.unpack $ endPointAddressToByteString theirAddr) modifyMVar (localEndPointState ourEp) $ \case LocalEndPointValid v@(ValidLocalEndPoint _ _ m _ o _) -> do opened <- readIORef o if opened then do case theirAddr `Map.lookup` m of Nothing -> create v m Just rep -> do withMVar (remoteEndPointState rep) $ \case RemoteEndPointFailed -> create v m _ -> return (LocalEndPointValid v, return $ Right rep) else return (LocalEndPointValid v, return $ Left $ IncorrectState "EndPointClosing") LocalEndPointClosed -> return ( LocalEndPointClosed , return $ Left $ IncorrectState "EndPoint is closed" ) where create v m = do -- printf "[%s] apiConnect: remoteEndPoint not found, creating %s\n" -- (B8.unpack $ endPointAddressToByteString $ _localEndPointAddress ourEp) -- (B8.unpack $ endPointAddressToByteString theirAddr) push <- ZMQ.socket ctx ZMQ.Push case authorizationType params of ZMQNoAuth -> return () ZMQAuthPlain p u -> do ZMQ.setPlainPassword (ZMQ.restrict p) push ZMQ.setPlainUserName (ZMQ.restrict u) push state <- newMVar . RemoteEndPointPending =<< newIORef [] opened <- newIORef False let rep = RemoteEndPoint theirAddr state opened return ( LocalEndPointValid v{ _localEndPointRemotes = Map.insert theirAddr rep m} , initialize push rep >> return (Right rep)) ourAddr = localEndPointAddress ourEp initialize push rep = do lock <- newEmptyMVar x <- Async.async $ do takeMVar lock ZMQ.connect push (B8.unpack $ endPointAddressToByteString theirAddr) monitor <- ZMQ.monitor [ZMQ.ConnectedEvent] ctx push putMVar lock () r <- Async.race (threadDelay 100000) (monitor True) void $ monitor False case r of Left _ -> do _ <- swapMVar (remoteEndPointState rep) RemoteEndPointFailed Async.cancel x ZMQ.closeZeroLinger push Right _ -> do ZMQ.send push [] $ encode' $ MessageConnect ourAddr let v = ValidRemoteEndPoint push (Counter 0 Map.empty) Set.empty 0 modifyMVar_ (remoteEndPointState rep) $ \case RemoteEndPointPending p -> do z <- foldM (\y f -> f y) (RemoteEndPointValid v) . Prelude.reverse =<< readIORef p modifyIORef (remoteEndPointOpened rep) (const True) return z RemoteEndPointValid _ -> throwM $ InvariantViolation "RemoteEndPoint valid." RemoteEndPointClosed -> return RemoteEndPointClosed RemoteEndPointClosing j -> return (RemoteEndPointClosing j) RemoteEndPointFailed -> return RemoteEndPointFailed -- | Close all endpoint connections, return previous state in case -- if it was alive, for further cleanup actions. cleanupRemoteEndPoint :: LocalEndPoint -> RemoteEndPoint -> (Maybe (ValidRemoteEndPoint -> IO ())) -> IO (Maybe RemoteEndPointState) cleanupRemoteEndPoint lep rep actions = modifyMVar (localEndPointState lep) $ \case LocalEndPointValid v -> do modifyIORef (remoteEndPointOpened rep) (const False) oldState <- swapMVar (remoteEndPointState rep) newState case oldState of RemoteEndPointValid w -> do let (Counter _ cn) = _remoteEndPointPendingConnections w traverse_ (\c -> void $ swapMVar (connectionState c) ZMQConnectionFailed) cn cn' <- foldM (\(Counter i' cn') idx -> case idx `Map.lookup` cn of Nothing -> return (Counter i' cn') Just c -> do void $ swapMVar (connectionState c) ZMQConnectionFailed return $ Counter i' (Map.delete idx cn') ) (_localEndPointConnections v) (Set.toList (_remoteEndPointIncommingConnections w)) case actions of Nothing -> return () Just f -> f w return ( LocalEndPointValid v { _localEndPointConnections=cn' } , Just oldState) _ -> return (LocalEndPointValid v, Nothing) c -> return (c, Nothing) where newState = RemoteEndPointFailed -- | Close, all network connections. closeRemoteEndPoint :: LocalEndPoint -> RemoteEndPoint -> RemoteEndPointState -> IO () closeRemoteEndPoint lep rep state = step1 >> step2 state where step1 = modifyMVar_ (localEndPointState lep) $ \case LocalEndPointValid v -> return $ LocalEndPointValid v{_localEndPointRemotes=Map.delete (remoteEndPointAddress rep) (_localEndPointRemotes v)} c -> return c step2 (RemoteEndPointValid v) = do ZMQ.closeZeroLinger (_remoteEndPointChan v) step2 (RemoteEndPointClosing (ClosingRemoteEndPoint sock rd)) = do _ <- readMVar rd ZMQ.closeZeroLinger sock step2 _ = return () remoteEndPointClose :: Bool -> LocalEndPoint -> RemoteEndPoint -> IO () remoteEndPointClose silent lep rep = do -- printf "[???] remoteEndPointClose %s\n" -- (B8.unpack $ endPointAddressToByteString $ lAddr) -- (B8.unpack $ endPointAddressToByteString $ remoteEndPointAddress rep) modifyIORef (remoteEndPointOpened rep) (const False) join $ modifyMVar (remoteEndPointState rep) $ \o -> case o of RemoteEndPointFailed -> return (o, return ()) RemoteEndPointClosed -> return (o, return ()) RemoteEndPointClosing (ClosingRemoteEndPoint _ l) -> return (o, void $ readMVar l) RemoteEndPointPending _ -> closing undefined o -- XXX: store socket, or delay RemoteEndPointValid v -> closing (_remoteEndPointChan v) o where closing sock old = do lock <- newEmptyMVar return (RemoteEndPointClosing (ClosingRemoteEndPoint sock lock), go lock old) go lock old@(RemoteEndPointValid (ValidRemoteEndPoint c _ s i)) = do -- close all connections void $ cleanupRemoteEndPoint lep rep Nothing withMVar (localEndPointState lep) $ \case LocalEndPointClosed -> return () LocalEndPointValid v -> do -- notify about all connections close (?) do we really want it? traverse_ (atomically . writeTMChan (_localEndPointChan v) . ConnectionClosed) (Set.toList s) -- if we have outgoing connections, then we have connection error when (i > 0) $ atomically $ writeTMChan (_localEndPointChan v) $ ErrorEvent $ TransportError (EventConnectionLost (remoteEndPointAddress rep)) "Remote end point closed." -- notify other side about closing connection unless silent $ do ZMQ.send c [] (encode' (MessageEndPointClose (localEndPointAddress lep) True)) yield void $ Async.race (readMVar lock) (threadDelay 1000000) void $ tryPutMVar lock () closeRemoteEndPoint lep rep old return () go _ _ = return () connectionCleanup :: RemoteEndPoint -> ConnectionId -> IO () connectionCleanup rep cid = modifyMVar_ (remoteEndPointState rep) $ \case RemoteEndPointValid v -> return $ RemoteEndPointValid v{_remoteEndPointIncommingConnections = Set.delete cid (_remoteEndPointIncommingConnections v)} c -> return c encode' :: Binary a => a -> ByteString encode' = B.concat . BL.toChunks . encode decode' :: Binary a => ByteString -> a decode' s = decode . BL.fromChunks $ [s] onValidEndPoint :: LocalEndPoint -> (ValidLocalEndPoint -> IO ()) -> IO () onValidEndPoint lep f = withMVar (localEndPointState lep) $ \case LocalEndPointValid v -> f v _ -> return () onValidRemote :: RemoteEndPoint -> (ValidRemoteEndPoint -> IO ()) -> IO () onValidRemote rep f = withMVar (remoteEndPointState rep) $ \case RemoteEndPointValid v -> f v _ -> return () afterP :: a -> IO (IO a) afterP = return . return trySome :: IO a -> IO (Either SomeException a) trySome f = try f >>= \case Left e -> case (fromException e) of Just m -> throwM (m::AsyncException) Nothing -> return $ Left e Right x -> return $ Right x -- | Break endpoint connection, all endpoints that will be affected. breakConnection :: ZMQTransport -> EndPointAddress -> EndPointAddress -> IO () breakConnection zmqt from to = Foldable.sequence_ <=< withMVar (_transportState zmqt) $ \case TransportValid v -> Traversable.forM (_transportEndPoints v) $ \x -> withMVar (localEndPointState x) $ \case LocalEndPointValid w -> return $ Foldable.sequence_ $ flip Map.mapWithKey (_localEndPointRemotes w) $ \key rep -> if onDeadHost key then do mz <- cleanupRemoteEndPoint x rep Nothing flip traverse_ mz $ \z -> do atomically $ writeTMChan (_localEndPointChan w) $ ErrorEvent $ TransportError (EventConnectionLost key) "Manual connection break" closeRemoteEndPoint x rep z else return () LocalEndPointClosed -> afterP () TransportClosed -> return Map.empty where dh = B8.init . fst $ B8.spanEnd (/=':') (endPointAddressToByteString to) onDeadHost = B8.isPrefixOf dh . endPointAddressToByteString -- | Break connection between two endpoints, other endpoints on the same -- remove will not be affected. breakConnectionEndPoint :: ZMQTransport -> EndPointAddress -> EndPointAddress -> IO () breakConnectionEndPoint zmqt from to = one from to >> one to from where one f t = join $ withMVar (_transportState zmqt) $ \case TransportValid v -> case f `Map.lookup` _transportEndPoints v of Nothing -> afterP () Just x -> withMVar (localEndPointState x) $ \case LocalEndPointValid w -> case t `Map.lookup` _localEndPointRemotes w of Nothing -> afterP () Just y -> return $ do mz <- cleanupRemoteEndPoint x y Nothing case mz of Nothing -> return () Just z -> do onValidEndPoint x $ \j -> atomically $ writeTMChan (_localEndPointChan j) $ ErrorEvent $ TransportError (EventConnectionLost to) "Exception on remote side" closeRemoteEndPoint x y z LocalEndPointClosed -> afterP () TransportClosed -> afterP () -- | Configure socket after creation unsafeConfigurePush :: ZMQTransport -> EndPointAddress -> EndPointAddress -> (ZMQ.Socket ZMQ.Push -> IO ()) -> IO () unsafeConfigurePush zmqt from to f = withMVar (_transportState zmqt) $ \case TransportValid v -> case from `Map.lookup` _transportEndPoints v of Nothing -> return () Just x -> withMVar (localEndPointState x) $ \case LocalEndPointValid w -> case to `Map.lookup` _localEndPointRemotes w of Nothing -> return () Just y -> onValidRemote y $ f . _remoteEndPointChan LocalEndPointClosed -> return () TransportClosed -> return () apiNewMulticastGroup :: ZMQParameters -> ZMQTransport -> LocalEndPoint -> IO ( Either (TransportError NewMulticastGroupErrorCode) MulticastGroup) apiNewMulticastGroup params zmq lep = withMVar (_transportState zmq) $ \case TransportClosed -> return $ Left $ TransportError NewMulticastGroupFailed "Transport is closed." TransportValid vt -> modifyMVar (localEndPointState lep) $ \case LocalEndPointClosed -> return (LocalEndPointClosed, Left $ TransportError NewMulticastGroupFailed "Transport is closed.") LocalEndPointValid vl -> mask_ $ do (pub, portPub, rep, portRep, wrkThread) <- mkPublisher vt let addr = MulticastAddress $ B8.concat [transportAddress zmq,":",B8.pack (show portPub), ":",B8.pack (show portRep)] subAddr = extractPubAddress addr repAddr = extractRepAddress addr -- printf "new multicastaddress: %s\n" (show addr) -- printf "[%s] subscribe address: %s\n" (show addr) (B8.unpack subAddr) -- printf "[%s] send address: %s\n" (show addr) (B8.unpack repAddr) -- subscriber api sub <- ZMQ.socket (_transportContext vt) ZMQ.Sub ZMQ.connect sub (B8.unpack subAddr) ZMQ.subscribe sub "" subscribed <- newIORef False (subThread, mgroup) <- fixIO $ \ ~(tid,_) -> do v <- newMVar (MulticastGroupValid (ValidMulticastGroup subscribed)) let mgroup = ZMQMulticastGroup v (apiDeleteMulticastGroupLocal v lep addr rep repAddr pub sub subAddr (Just tid) wrkThread) tid' <- Async.async $ do repeatWhile $ do (ctrl: msg) <- ZMQ.receiveMulti sub if B8.null ctrl then do opened <- readIORef subscribed when opened $ atomically $ writeTMChan (_localEndPointChan vl) $ ReceivedMulticast addr msg return True else return False return (tid', mgroup) return ( LocalEndPointValid vl , Right $ MulticastGroup { multicastAddress = addr , deleteMulticastGroup = apiDeleteMulticastGroupLocal (multicastGroupState mgroup) lep addr rep repAddr pub sub subAddr (Just subThread) wrkThread , maxMsgSize = Nothing , multicastSend = apiMulticastSendLocal mgroup pub , multicastSubscribe = apiMulticastSubscribe mgroup , multicastUnsubscribe = apiMulticastUnsubscribe mgroup , multicastClose = apiMulticastClose } ) where mkPublisher vt = do pub <- ZMQ.socket (_transportContext vt) ZMQ.Pub portPub <- ZMQ.bindFromRangeRandom pub (B8.unpack $ transportAddress zmq) (minPort params) (maxPort params) (maxTries params) rep <- ZMQ.socket (_transportContext vt) ZMQ.Rep portRep <- ZMQ.bindFromRangeRandom rep (B8.unpack $ transportAddress zmq) (minPort params) (maxPort params) (maxTries params) wrkThread <- Async.async $ forever $ do msg <- ZMQ.receiveMulti rep ZMQ.sendMulti pub $ "" :| msg ZMQ.send rep [] "OK" return (pub,portPub,rep,portRep, wrkThread) apiResolveMulticastGroup :: ZMQTransport -> LocalEndPoint -> MulticastAddress -> IO (Either (TransportError ResolveMulticastGroupErrorCode) MulticastGroup) apiResolveMulticastGroup zmq lep addr = withMVar (_transportState zmq) $ \case TransportClosed -> return $ Left $ TransportError ResolveMulticastGroupFailed "Transport is closed." TransportValid vt -> modifyMVar (localEndPointState lep) $ \case LocalEndPointClosed -> return (LocalEndPointClosed, Left $ TransportError ResolveMulticastGroupFailed "Transport is closed.") LocalEndPointValid vl -> mask_ $ do -- socket allocation req <- ZMQ.socket (_transportContext vt) ZMQ.Req ZMQ.connect req (B8.unpack reqAddr) sub <- ZMQ.socket (_transportContext vt) ZMQ.Sub ZMQ.connect sub (B8.unpack subAddr) ZMQ.subscribe sub "" subscribed <- newIORef False (subThread, mgroup) <- fixIO $ \ ~(tid,_) -> do v <- newMVar (MulticastGroupValid (ValidMulticastGroup subscribed)) let mgroup = ZMQMulticastGroup v (apiDeleteMulticastGroupRemote v lep addr req reqAddr sub subAddr (Just tid)) tid' <- Async.async $ repeatWhile $ do (ctrl: msg) <- ZMQ.receiveMulti sub if B8.null ctrl then do opened <- readIORef subscribed when opened $ atomically $ writeTMChan (_localEndPointChan vl) $ ReceivedMulticast addr msg return True else do apiDeleteMulticastGroupRemote v lep addr req reqAddr sub subAddr Nothing return False return (tid', mgroup) return ( LocalEndPointValid vl{_localEndPointMulticastGroups = Map.insert addr mgroup (_localEndPointMulticastGroups vl)} , Right $ MulticastGroup { multicastAddress = addr , deleteMulticastGroup = apiDeleteMulticastGroupRemote (multicastGroupState mgroup) lep addr req reqAddr sub subAddr (Just subThread) , maxMsgSize = Nothing , multicastSend = apiMulticastSendRemote mgroup req , multicastSubscribe = apiMulticastSubscribe mgroup , multicastUnsubscribe = apiMulticastUnsubscribe mgroup , multicastClose = apiMulticastClose } ) where reqAddr = extractRepAddress addr subAddr = extractPubAddress addr apiDeleteMulticastGroupRemote :: MVar MulticastGroupState -> LocalEndPoint -> MulticastAddress -> ZMQ.Socket ZMQ.Req -> ByteString -> ZMQ.Socket ZMQ.Sub -> ByteString -> Maybe (Async.Async ()) -> IO () apiDeleteMulticastGroupRemote mstate lep addr req reqAddr sub subAddr mtid = mask_ $ do Foldable.traverse_ (\tid -> Async.cancel tid >> void (Async.waitCatch tid)) mtid modifyMVar_ mstate $ \case MulticastGroupClosed -> return MulticastGroupClosed MulticastGroupValid v -> do modifyIORef (_multicastGroupSubscribed v) (const False) -- ZMQ.disconnect req (B8.unpack reqAddr) ZMQ.close req ZMQ.unsubscribe sub "" -- ZMQ.disconnect sub (B8.unpack subAddr) ZMQ.close sub return MulticastGroupClosed modifyMVar_ (localEndPointState lep) $ \case LocalEndPointClosed -> return LocalEndPointClosed LocalEndPointValid v -> return $ LocalEndPointValid v{_localEndPointMulticastGroups = Map.delete addr (_localEndPointMulticastGroups v)} apiDeleteMulticastGroupLocal :: MVar MulticastGroupState -> LocalEndPoint -> MulticastAddress -> ZMQ.Socket ZMQ.Rep -> ByteString -> ZMQ.Socket ZMQ.Pub -> ZMQ.Socket ZMQ.Sub -> ByteString -> Maybe (Async.Async ()) -> Async.Async () -> IO () apiDeleteMulticastGroupLocal mstate lep addr rep repAddr pub sub pubAddr mtid wrk = mask_ $ do Foldable.traverse_ (\tid -> Async.cancel tid >> void (Async.waitCatch tid)) mtid void $ Async.cancel wrk >> Async.waitCatch wrk modifyMVar_ mstate $ \case MulticastGroupClosed -> return MulticastGroupClosed MulticastGroupValid v -> do modifyIORef (_multicastGroupSubscribed v) (const False) ZMQ.sendMulti pub $ "C" :| [] threadDelay 50000 ZMQ.unbind rep (B8.unpack repAddr) ZMQ.close rep ZMQ.unsubscribe sub "" -- ZMQ.disconnect sub (B8.unpack pubAddr) ZMQ.close sub ZMQ.unbind pub (B8.unpack pubAddr) ZMQ.close pub return MulticastGroupClosed modifyMVar_ (localEndPointState lep) $ \case LocalEndPointClosed -> return LocalEndPointClosed LocalEndPointValid v -> return $ LocalEndPointValid v{_localEndPointMulticastGroups = Map.delete addr (_localEndPointMulticastGroups v)} apiMulticastSendLocal :: (ZMQ.Sender a) => ZMQMulticastGroup -> ZMQ.Socket a -> [ByteString] -> IO () apiMulticastSendLocal mgroup sock msg = withMVar (multicastGroupState mgroup) $ \case MulticastGroupClosed -> return () MulticastGroupValid _ -> do ZMQ.sendMulti sock $ "" :| msg apiMulticastSendRemote :: (ZMQ.Sender a, ZMQ.Receiver a) => ZMQMulticastGroup -> ZMQ.Socket a -> [ByteString] -> IO () apiMulticastSendRemote _ _ [] = return () apiMulticastSendRemote mgroup sock (m:msg) = withMVar (multicastGroupState mgroup) $ \case MulticastGroupClosed -> return () MulticastGroupValid _ -> do ZMQ.sendMulti sock $ m :| msg "OK" <- ZMQ.receive sock -- XXX: timeout return () apiMulticastSubscribe :: ZMQMulticastGroup -> IO () apiMulticastSubscribe mgroup = withMVar (multicastGroupState mgroup) $ \case MulticastGroupClosed -> return () MulticastGroupValid v -> do modifyIORef (_multicastGroupSubscribed v) (const True) apiMulticastUnsubscribe :: ZMQMulticastGroup -> IO () apiMulticastUnsubscribe mgroup = withMVar (multicastGroupState mgroup) $ \case MulticastGroupClosed -> return () MulticastGroupValid v -> modifyIORef (_multicastGroupSubscribed v) (const False) apiMulticastClose :: IO () apiMulticastClose = return () extractRepAddress :: MulticastAddress -> ByteString extractRepAddress (MulticastAddress bs) = B8.concat [a,":",p] where (x,p) = B8.spanEnd (/=':') bs a = B8.init . fst . B8.spanEnd (/=':') . B8.init $ x extractPubAddress :: MulticastAddress -> ByteString extractPubAddress (MulticastAddress bs) = B8.init . fst $ B8.spanEnd (/=':') bs repeatWhile :: Monad m => (m Bool) -> m () repeatWhile f = f >>= \x -> if x then repeatWhile f else return ()