-- | -- Copyright: (C) 2014 EURL Tweag -- -- 0MQ implemmentation of the network-transport API. -- -- The 0MQ implementation guarantees that only a single connection (socket) will -- be used between endpoints, provided that the addresses specified are -- canonical. If /A/ connects to /B/ and reports its address as -- @192.168.0.1:8080@ and /B/ subsequently connects tries to connect to /A/ as -- @client1.local:http-alt@ then the transport layer will not realize that the -- TCP connection can be reused. -- -- This module is intended to be imported qualified. {-# LANGUAGE BangPatterns #-} module Network.Transport.ZMQ ( -- * Main API createTransport , ZMQParameters(..) , SecurityMechanism(..) , defaultZMQParameters -- * ZeroMQ specific functionality -- $zeromqs , Hints(..) , defaultHints , apiNewEndPoint -- * Internals -- $internals , createTransportExposeInternals , breakConnectionEndPoint , breakConnection , unsafeConfigurePush -- $cleanup , registerCleanupAction , registerValidCleanupAction , applyCleanupAction -- * Design -- $design -- ** Multicast -- $multicast ) where import Prelude hiding (sequence_) import Network.Transport.ZMQ.Internal.Types import qualified Network.Transport.ZMQ.Internal as ZMQ import Control.Applicative import Control.Category ( (>>>) ) import Control.Concurrent ( yield , threadDelay ) import qualified Control.Concurrent.Async as Async import Control.Concurrent.MVar import Control.Concurrent.STM import Control.Concurrent.STM.TMChan import Control.Monad ( void , forever , unless , join , foldM , when , (<=<) , liftM2 ) import Control.Exception ( mapException , catches , throwIO , Handler(..) , evaluate ) import Control.Monad.Catch ( finally , try , Exception , MonadCatch , SomeException(..) , mask , mask_ , uninterruptibleMask_ , throwM ) import Data.Binary import Data.ByteString (ByteString) import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Char8 as B8 import Data.Foldable ( traverse_ , sequence_ ) import qualified Data.Foldable as Foldable import Data.IORef ( newIORef , modifyIORef , readIORef , writeIORef , atomicModifyIORef' ) import qualified Data.IntMap.Strict as IntMap import Data.List.NonEmpty import qualified Data.Map.Strict as Map import qualified Data.Set as Set import qualified Data.Traversable as Traversable import Data.Typeable import Data.Unique import GHC.Generics ( Generic ) import Network.Transport import System.IO ( fixIO , hPutStrLn , stderr ) import System.ZMQ4 ( Context ) import qualified System.ZMQ4 as ZMQ import Data.Accessor ((^.), (^=), (^:) ) import Text.Printf -------------------------------------------------------------------------------- --- Internal datatypes -- -------------------------------------------------------------------------------- -- $design -- -- In the zeromq backend we are using following address scheme: -- -- @ -- tcp:\/\/host:port\/ -- | | | -- | +---+---------------- can be configured by user, one host, -- | port pair per distributed process -- | instance -- +---------------------------- In future it will be possible to add other -- schemas. -- @ -- -- Transport specifies host that will be used, and port will be -- automatically generated by the transport for each endpoint. -- -- * Connections Reliability -- -- Currently only reliable connections are supported. In case 'Unreliable' was -- passed the connection will nonetheless be reliable, since it is not incorrect -- to do so. This may be changed in future versions. -- -- Network-transport-zeromq maintains one thread for each endpoint that is -- used to read incoming requests. All sends and connection requests are -- handled from the user thread and are mostly asynchronous. -- -- Each endpoint obtains one pull socket and one push socket for each remote end -- point. All lightweight threads abrigged into one heavyweight connection. -- -- Virually connections looks like the following plot: -- -- @ -- +------------------+ +--------------+ -- | Endpoint | | Endpoint | -- | | | | -- | +------+ +------+ | -- | | pull |<-----------------------| push | | -- + +------+ +------+ | -- | | +--------------------+ | | -- + +------+/~~~~ connection 1 ~~~~\\+------+ | -- | | push |~~~~~ connection 2 ~~~~~| pull | | -- | +------+\\ /+------+ | -- | | +--------------------+ | | -- +------------------+ +--------------+ -- @ -- -- Physically ZeroMQ may choose better representations and mapping on a real -- connections. -- -- The ZeroMQ library takes care of reliability. It keeps message in a queue -- such that in case of connection break, it may be restarted with no message -- loss. So heartbeating procedure should be introduced on the top on -- network-transport, and 'breakConnection' should be used to notify connection -- death. However if High Water Mark will be reached connection to endpoint -- considered failed and connection break procedure started automatically. -- Naming conventions. -- api* -- functions that can be called by user -- localEndPoint -- internal functions that are used in endpoints -- remoteEndPoint -- internal functions that are used in endpoints -- -- Transport -- |--- Valid transport itself -- | |--- newEndpoint -- | |--- close -- |--- Closed -- -- LocaEndPoint -- RemoteEndPoint -- Connection -- Connection closing procedure. -- Once endpoint is gracefully closed by user or by transport close -- 'RemoteEndPoint' goes into 'RemoteEndPointClosing' state and sends -- a message to remote endpoint if link is known as alive, in -- 'RemoteEndPointClosing' all messages are ignored and 'EndPointClosed' -- is returned. Uppon a 'EndPointClose' delivery endpoint replies with -- 'EndPointClose' message, and starts cleanup procedure (marking link -- as not alive). When endpoint receives 'EndPointCloseOk' message it -- moved 'RemoteEndPoint' to close state and removes all data structures. -- Together with marking a endpoint as closed asynchronous timeout is -- set, and if 'EndPointCloseOk' is not delivered withing that timeperiod -- 'EndPoint' is closed. -- -- endpoint-1 endpoint-2 -- 1. mark as closing -- [RemoteEndPoint:Closing] ----endPointClose-> [remoteEndPoint:Valid] -- 2. mark as closed -- <-endPointCloseOk- [remoteEndPoint:Closed] -- 2. mark as closed -- [RemoteEndPoint:Closed] 4. cleanup remote end point -- -- 'EndPoint' can be closed in a two ways: normally and abnormally (in case -- of exception or invariant vionation on a remove side). If 'EndPoint' is -- closed normally all opened connections will recevice ConnectionClosed -- events, otherwise. -- -- Multicast group schema: -- -- +--------------+ -- | EndPoint |<----------------+ -- +--------------+ | events -- | | -- +------------------+ | -- | | -- +------------------------------+ -- | Multicast Group (Server) | -- +--+ +--+ +--+ +---+ -- | Rep | | Pub | | Sub | -- +-----+ +-----+ +-----+ -- ^ | ^ -- request | | | -- +------------+ +--------+ messages -- | | -- | +-----------+ -- | | messages -- +-----+ +-----+ -- | Req | | Sub | -- +---+ +--+ +--+ -- | Multicast Group | -- +---------------------+ -- -- Resolve Group automatically subscribes to the group to receive events, -- however unless 'subscribe' will be called endpoint will not reveive any -- message. This is done in order to keep ability to notify receiver about -- multcast group close. -- -- Current solution incorrectly keeps track of incoming connections. This means -- that we can't really guarantee that a "close"" message is delivered to every -- recipient and it disconnected correctly. -- $internals Internal functions provide additional ZeroMQ specific set of -- configuration options. This functionality should be used with caution as it -- may break required socket properties. -- | Messages. data ZMQMessage = MessageConnect !EndPointAddress -- ^ Connection greeting | MessageInitConnection !EndPointAddress !ConnectionId !Reliability | MessageInitConnectionOk !EndPointAddress !ConnectionId !ConnectionId | MessageCloseConnection !ConnectionId | MessageData !ConnectionId | MessageEndPointClose !EndPointAddress !Bool | MessageEndPointCloseOk !EndPointAddress deriving (Generic) instance Binary ZMQMessage data ZMQError = InvariantViolation String | IncorrectState String | ConnectionFailed | DriverError ZMQ.ZMQError deriving (Typeable, Show) instance Exception ZMQError -- | Create 0MQ based transport. createTransport :: ZMQParameters -> ByteString -- ^ Transport address (IP or hostname) -> IO Transport createTransport z b = (fmap snd) (createTransportExposeInternals z b) -- | You should probably not use this function (used for unit testing only) createTransportExposeInternals :: ZMQParameters -- ^ Configuration parameters for ZeroMQ -> ByteString -- ^ Host name or IP address -> IO (TransportInternals, Transport) createTransportExposeInternals params host = do ctx <- ZMQ.context mtid <- Traversable.sequenceA $ fmap (\(SecurityPlain user pass) -> ZMQ.authManager ctx user pass) $ zmqSecurityMechanism params transport <- TransportInternals <$> pure addr <*> (newMVar =<< mkTransportState ctx mtid) <*> pure params return $ (transport, Transport { newEndPoint = apiNewEndPoint defaultHints transport , closeTransport = apiTransportClose transport }) where addr = B.concat ["tcp://", host] -- | Close Transport on the current node. -- This operation is synchronous. apiTransportClose :: TransportInternals -> IO () apiTransportClose transport = mask_ $ do old <- swapMVar (transportState transport) TransportClosed case old of TransportClosed -> return () TransportValid v -> either errorLog return <=< tryZMQ $ do traverse_ (liftM2 (>>) Async.cancel Async.waitCatch) (v ^. transportAuth) traverse_ (apiCloseEndPoint transport) (v ^. transportEndPoints) sequence_ =<< atomicModifyIORef' (v ^. transportSockets) (\x -> (IntMap.empty, x)) ZMQ.term (v ^. transportContext) -- | Creates a new endpoint on the transport specified and applies all hints. apiNewEndPoint :: Hints -- ^ Hints to apply. -> TransportInternals -- ^ Internal transport state. -> IO (Either (TransportError NewEndPointErrorCode) EndPoint) apiNewEndPoint hints transport = try $ mapZMQException (TransportError NewEndPointFailed . show) $ modifyMVar (transportState transport) $ \case TransportClosed -> throwM $ TransportError NewEndPointFailed "Transport is closed." TransportValid i -> do (ep, chan) <- endPointCreate hints (transportParameters transport) (i ^. transportContext) (B8.unpack addr) let !cntx = i ^. transportContext return $ ( TransportValid . (transportEndPoints ^: (Map.insert (localEndPointAddress ep) ep)) $ i , EndPoint { receive = atomically $ do mx <- readTMChan chan case mx of Nothing -> error "channel is closed" Just x -> return x , address = localEndPointAddress ep , connect = apiConnect (transportParameters transport) cntx ep , closeEndPoint = apiCloseEndPoint transport ep , newMulticastGroup = apiNewMulticastGroup defaultHints transport ep , resolveMulticastGroup = apiResolveMulticastGroup transport ep } ) where addr = transportAddress transport -- | Asynchronous operation, shutdown of the remote end point may take a while apiCloseEndPoint :: TransportInternals -> LocalEndPoint -> IO () apiCloseEndPoint transport lep = mask_ $ either errorLog return <=< tryZMQ $ do -- we don't close endpoint here because other threads, -- should be able to access local endpoint state old <- readMVar (localEndPointState lep) case old of LocalEndPointValid (ValidLocalEndPoint x _ _ threadId _ _) -> do -- close channel, no events will be received atomically $ do writeTMChan x EndPointClosed closeTMChan x Async.cancel threadId void $ Async.waitCatch threadId LocalEndPointClosed -> return () void $ swapMVar (localEndPointState lep) LocalEndPointClosed modifyMVar_ (transportState transport) $ \case TransportClosed -> return TransportClosed TransportValid v -> return $ TransportValid . (transportEndPoints ^: (Map.delete (localEndPointAddress lep))) $ v endPointCreate :: Hints -> ZMQParameters -> Context -> String -> IO (LocalEndPoint, TMChan Event) endPointCreate hints params ctx addr = promoteZMQException $ do pull <- ZMQ.socket ctx ZMQ.Pull case zmqSecurityMechanism params of Nothing -> return () Just SecurityPlain{} -> do ZMQ.setPlainServer True pull ZMQ.setSendHighWM (ZMQ.restrict (zmqHighWaterMark params)) pull port <- case hintsPort hints of Nothing -> ZMQ.bindRandomPort pull addr Just i -> do ZMQ.bind pull (addr ++ ":" ++ show i) return i chOut <- newTMChanIO lep <- LocalEndPoint <$> pure (EndPointAddress $ B8.pack (addr ++ ":" ++ show port)) <*> newEmptyMVar <*> pure port opened <- newIORef True mask $ \restore -> do thread <- Async.async $ (restore (receiver pull lep chOut)) `finally` finalizeEndPoint lep port pull putMVar (localEndPointState lep) $ LocalEndPointValid (ValidLocalEndPoint chOut (Counter 0 Map.empty) Map.empty thread opened Map.empty) return (lep, chOut) where finalizer pull ourEp = forever $ do (cmd:_) <- ZMQ.receiveMulti pull mask_ $ case decode' cmd of MessageEndPointCloseOk theirAddress -> getRemoteEndPoint ourEp theirAddress >>= traverse_ (\rep -> do state <- swapMVar (remoteEndPointState rep) RemoteEndPointClosed closeRemoteEndPoint ourEp rep state) _ -> return () -- XXX: send exception receiver :: ZMQ.Socket ZMQ.Pull -> LocalEndPoint -> TMChan Event -> IO () receiver pull ourEp chan = forever $ mask_ $ do (cmd:msgs) <- ZMQ.receiveMulti pull case decode' cmd of MessageData idx -> atomically $ writeTMChan chan (Received idx msgs) MessageConnect theirAddress -> do void $ createOrGetRemoteEndPoint params ctx ourEp theirAddress MessageInitConnection theirAddress theirId rel -> do join $ do modifyMVar (localEndPointState ourEp) $ \case LocalEndPointValid v -> case v ^. localEndPointRemoteAt theirAddress of Nothing -> return (LocalEndPointValid v, throwM $ InvariantViolation "Remote endpoint should exist.") Just rep -> modifyMVar (remoteEndPointState rep) $ \case RemoteEndPointFailed -> throwM $ InvariantViolation "RemoteEndPoint should be valid." RemoteEndPointClosed -> throwM $ InvariantViolation "RemoteEndPoint should be valid." RemoteEndPointClosing{} -> throwM $ InvariantViolation "RemoteEndPoint should be valid." w@RemoteEndPointValid{} -> do conn <- ZMQConnection <$> pure ourEp <*> pure rep <*> pure rel <*> newMVar (ZMQConnectionValid $ ValidZMQConnection Nothing i) <*> newEmptyMVar w' <- register (succ i) w return ( w' , ( LocalEndPointValid (localEndPointConnections ^= Counter (succ i) (Map.insert (succ i) conn m) $ v) , return ()) ) z@(RemoteEndPointPending w) -> do conn <- ZMQConnection <$> pure ourEp <*> pure rep <*> pure rel <*> newMVar (ZMQConnectionValid $ ValidZMQConnection Nothing i) <*> newEmptyMVar modifyIORef w (\xs -> (register (succ i)) : xs) return ( z , ( LocalEndPointValid (localEndPointConnections ^= Counter (succ i) (Map.insert (succ i) conn m) $ v) , return ()) ) where (Counter i m) = v ^. localEndPointConnections _ -> throwM $ InvariantViolation "RemoteEndPoint should be valid." where register i RemoteEndPointFailed = do atomically $ do writeTMChan chan (ConnectionOpened i rel theirAddress) writeTMChan chan (ConnectionClosed i) return RemoteEndPointFailed register i RemoteEndPointClosed = do atomically $ do writeTMChan chan (ConnectionOpened i rel theirAddress) writeTMChan chan (ConnectionClosed i) return RemoteEndPointClosed register i (RemoteEndPointClosing x) = do atomically $ do writeTMChan chan (ConnectionOpened i rel theirAddress) writeTMChan chan (ConnectionClosed i) return $ RemoteEndPointClosing x register _ RemoteEndPointPending{} = throwM $ InvariantViolation "RemoteEndPoint can't be pending." register i (RemoteEndPointValid v@(ValidRemoteEndPoint sock _ s _)) = do ZMQ.send sock [] $ encode' (MessageInitConnectionOk ourAddr theirId i) atomically $ writeTMChan chan (ConnectionOpened i rel theirAddress) return $ RemoteEndPointValid v{_remoteEndPointIncommingConnections = Set.insert i s} MessageCloseConnection idx -> join $ do modifyMVar (localEndPointState ourEp) $ \case LocalEndPointValid v -> case v ^. localEndPointConnectionAt idx of Nothing -> return (LocalEndPointValid v, return ()) Just conn -> do old <- swapMVar (connectionState conn) ZMQConnectionFailed return ( LocalEndPointValid (localEndPointConnections ^: counterValues ^: (Map.delete idx) $ v) , case old of ZMQConnectionFailed -> return () ZMQConnectionInit -> return () -- throwM InvariantViolation ZMQConnectionClosed -> return () ZMQConnectionValid (ValidZMQConnection _ _) -> do atomically $ writeTMChan chan (ConnectionClosed idx) connectionCleanup (connectionRemoteEndPoint conn) idx) LocalEndPointClosed -> return (LocalEndPointClosed, return ()) MessageInitConnectionOk theirAddress ourId theirId -> do join $ withMVar (localEndPointState ourEp) $ \case LocalEndPointValid v -> case v ^. localEndPointRemoteAt theirAddress of Nothing -> return (return ()) -- XXX: send message to the host Just rep -> modifyMVar (remoteEndPointState rep) $ \case RemoteEndPointFailed -> return (RemoteEndPointFailed, return ()) RemoteEndPointClosed -> throwM $ InvariantViolation "RemoteEndPoint should be valid or failed." RemoteEndPointClosing{} -> throwM $ InvariantViolation "RemoteEndPoint should be valid or failed." t@(RemoteEndPointValid (ValidRemoteEndPoint sock (Counter x m) s z)) -> return $ case ourId `Map.lookup` m of Nothing -> (t, return ()) -- XXX: send message to the hostv Just c -> (RemoteEndPointValid (ValidRemoteEndPoint sock (Counter x (ourId `Map.delete` m)) s (z+1)) , do modifyMVar_ (connectionState c) $ \case ZMQConnectionFailed -> return ZMQConnectionFailed ZMQConnectionInit -> return $ ZMQConnectionValid (ValidZMQConnection (Just sock) theirId) ZMQConnectionClosed -> do ZMQ.send sock [] $ encode' (MessageCloseConnection theirId) return ZMQConnectionClosed ZMQConnectionValid _ -> throwM $ InvariantViolation "RemoteEndPoint should be closed" void $ tryPutMVar (connectionReady c) () ) RemoteEndPointPending p -> return (RemoteEndPointPending p, throwM $ InvariantViolation "RemoteEndPoint should be closed") LocalEndPointClosed -> return $ return () MessageEndPointClose theirAddress True -> getRemoteEndPoint ourEp theirAddress >>= traverse_ (\rep -> do onValidRemote rep $ \v -> ZMQ.send (remoteEndPointSocket v) [] (encode' $ MessageEndPointCloseOk $ localEndPointAddress ourEp) remoteEndPointClose True ourEp rep) MessageEndPointClose theirAddress False -> getRemoteEndPoint ourEp theirAddress >>= traverse_ (\rep -> do mst <- cleanupRemoteEndPoint ourEp rep Nothing case mst of Nothing -> return () Just st -> do onValidEndPoint ourEp $ \v -> atomically $ writeTMChan (localEndPointChan v) $ ErrorEvent $ TransportError (EventConnectionLost theirAddress) "Exception on remote side" closeRemoteEndPoint ourEp rep st) MessageEndPointCloseOk theirAddress -> getRemoteEndPoint ourEp theirAddress >>= traverse_ (\rep -> do state <- swapMVar (remoteEndPointState rep) RemoteEndPointClosed closeRemoteEndPoint ourEp rep state) where ourAddr = localEndPointAddress ourEp finalizeEndPoint ourEp _port pull = do join $ withMVar (localEndPointState ourEp) $ \case LocalEndPointClosed -> afterP () LocalEndPointValid v -> do writeIORef (localEndPointOpened v) False return $ do tid <- Async.async $ finalizer pull ourEp void $ Async.mapConcurrently (remoteEndPointClose False ourEp) $ v ^. localEndPointRemotes Async.cancel tid void $ Async.waitCatch tid ZMQ.closeZeroLinger pull void $ swapMVar (localEndPointState ourEp) LocalEndPointClosed apiSend :: ZMQConnection -> [ByteString] -> IO (Either (TransportError SendErrorCode) ()) apiSend (ZMQConnection l e _ s _) b = do eb <- try $ mapM_ evaluate b case eb of -- Tests in network-transport-tests require to convert exceptions -- during evaluation of the argument to returned error values. Left ex -> do cleanup return $ Left $ TransportError SendFailed (show (ex::SomeException)) Right _ -> (fmap Right inner) `catches` [ Handler $ \ex -> -- TransportError - return, as all require -- actions were performed return $ Left (ex :: TransportError SendErrorCode) , Handler $ \ex -> do -- ZMQError appeared exception cleanup return $ Left $ TransportError SendFailed (show (ex::ZMQError)) ] where inner :: IO () inner = join $ withMVar (remoteEndPointState e) $ \x -> case x of RemoteEndPointPending{} -> return $ yield >> inner RemoteEndPointFailed -> throwIO $ TransportError SendFailed "Remote end point is failed." RemoteEndPointClosed -> throwIO $ TransportError SendFailed "Remote end point is closed." RemoteEndPointClosing{} -> throwIO $ TransportError SendFailed "Remote end point is closing." RemoteEndPointValid v -> withMVar s $ \case ZMQConnectionInit -> return $ yield >> inner -- readMVar (connectionReady c) >> inner ZMQConnectionClosed -> throwIO $ TransportError SendClosed "Connection is closed" ZMQConnectionFailed -> throwIO $ TransportError SendFailed "Connection is failed" ZMQConnectionValid (ValidZMQConnection _ idx) -> do evs <- ZMQ.events (remoteEndPointSocket v) if ZMQ.Out `elem` evs then do ZMQ.sendMulti (remoteEndPointSocket v) $ encode' (MessageData idx) :| b afterP () else return $ do rep <- cleanupRemoteEndPoint l e Nothing traverse_ (\z -> do onValidEndPoint l $ \w -> atomically $ writeTMChan (localEndPointChan w) $ ErrorEvent $ TransportError (EventConnectionLost (remoteEndPointAddress e)) "Exception on remote side" closeRemoteEndPoint l e z) rep throwIO $ TransportError SendFailed "Connection broken." cleanup = do void $ cleanupRemoteEndPoint l e (Just $ \v -> ZMQ.send (remoteEndPointSocket v) [] $ encode' (MessageEndPointClose (localEndPointAddress l) False)) onValidEndPoint l $ \v -> atomically $ do writeTMChan (localEndPointChan v) $ ErrorEvent $ TransportError (EventConnectionLost (remoteEndPointAddress e)) "Exception on send." -- 'apiClose' function is asynchronous, as connection may not exists by the -- time of the calling to this function. In this case function just marks -- connection as closed, so all subsequent calls from the user side will -- "think" that the connection is closed. -- only after connection will be up. apiClose :: ZMQConnection -> IO () apiClose (ZMQConnection _ e _ s _) = either errorLog return <=< tryZMQ $ uninterruptibleMask_ $ join $ do modifyMVar s $ \case ZMQConnectionValid (ValidZMQConnection _ idx) -> do return (ZMQConnectionClosed, do modifyMVar_ (remoteEndPointState e) $ \case v@RemoteEndPointValid{} -> notify idx v v@(RemoteEndPointPending p) -> modifyIORef p (\xs -> notify idx : xs) >> return v v -> return v ) _ -> return (ZMQConnectionClosed, return ()) where notify _ RemoteEndPointFailed = return RemoteEndPointFailed notify _ (RemoteEndPointClosing x) = return $ RemoteEndPointClosing x notify _ RemoteEndPointClosed = return RemoteEndPointClosed notify _ RemoteEndPointPending{} = throwM $ InvariantViolation "RemoteEndPoint can't be pending." notify idx w@(RemoteEndPointValid (ValidRemoteEndPoint sock _ _ _)) = do ZMQ.send sock [] $ encode' (MessageCloseConnection idx) return w apiConnect :: ZMQParameters -> Context -> LocalEndPoint -> EndPointAddress -> Reliability -> ConnectHints -> IO (Either (TransportError ConnectErrorCode) Connection) apiConnect params ctx ourEp theirAddr reliability _hints = fmap (either Left id) $ try $ mapZMQException (TransportError ConnectFailed . show) $ do eRep <- createOrGetRemoteEndPoint params ctx ourEp theirAddr case eRep of Left{} -> return $ Left $ TransportError ConnectFailed "LocalEndPoint is closed." Right rep -> do conn <- ZMQConnection <$> pure ourEp <*> pure rep <*> pure reliability <*> newMVar ZMQConnectionInit <*> newEmptyMVar let apiConn = Connection { send = apiSend conn , close = apiClose conn } join $ modifyMVar (remoteEndPointState rep) $ \w -> case w of RemoteEndPointClosed -> do return ( RemoteEndPointClosed , return $ Left $ TransportError ConnectFailed "Transport is closed.") RemoteEndPointClosing x -> do return ( RemoteEndPointClosing x , return $ Left $ TransportError ConnectFailed "RemoteEndPoint closed.") RemoteEndPointValid _ -> do s' <- go conn w return (s', waitReady conn apiConn) RemoteEndPointPending z -> do modifyIORef z (\zs -> go conn : zs) return ( RemoteEndPointPending z, waitReady conn apiConn) RemoteEndPointFailed -> return ( RemoteEndPointFailed , return $ Left $ TransportError ConnectFailed "RemoteEndPoint failed.") where waitReady conn apiConn = join $ withMVar (connectionState conn) $ \case ZMQConnectionInit{} -> return $ yield >> waitReady conn apiConn ZMQConnectionValid{} -> afterP $ Right apiConn ZMQConnectionFailed{} -> afterP $ Left $ TransportError ConnectFailed "Connection failed." ZMQConnectionClosed{} -> throwM $ InvariantViolation "Connection closed." ourAddr = localEndPointAddress ourEp go conn s = inner conn s inner _ RemoteEndPointClosed = return RemoteEndPointClosed inner _ RemoteEndPointPending{} = throwM $ InvariantViolation "Connection pending." inner _ (RemoteEndPointClosing x) = return $ RemoteEndPointClosing x inner _ RemoteEndPointFailed = return RemoteEndPointFailed inner conn (RemoteEndPointValid (ValidRemoteEndPoint sock (Counter i m) s z)) = do ZMQ.send sock [] $ encode' (MessageInitConnection ourAddr i' reliability) return $ RemoteEndPointValid (ValidRemoteEndPoint sock (Counter i' (Map.insert i' conn m)) s z) where i' = succ i getRemoteEndPoint :: LocalEndPoint -> EndPointAddress -> IO (Maybe RemoteEndPoint) getRemoteEndPoint ourEp theirAddr = do withMVar (localEndPointState ourEp) $ \case LocalEndPointValid v -> return $ theirAddr `Map.lookup` (_localEndPointRemotes v) LocalEndPointClosed -> return Nothing createOrGetRemoteEndPoint :: ZMQParameters -> Context -> LocalEndPoint -> EndPointAddress -> IO (Either ZMQError RemoteEndPoint) createOrGetRemoteEndPoint params ctx ourEp theirAddr = join $ do modifyMVar (localEndPointState ourEp) $ \case LocalEndPointValid v@(ValidLocalEndPoint _ _ _ _ o _) -> do opened <- readIORef o if opened then do case v ^. localEndPointRemoteAt theirAddr of Nothing -> create v Just rep -> do withMVar (remoteEndPointState rep) $ \case RemoteEndPointFailed -> create v _ -> return (LocalEndPointValid v, return $ Right rep) else return (LocalEndPointValid v, return $ Left $ IncorrectState "EndPointClosing") LocalEndPointClosed -> return ( LocalEndPointClosed , return $ Left $ IncorrectState "EndPoint is closed" ) where create v = do push <- ZMQ.socket ctx ZMQ.Push case zmqSecurityMechanism params of Nothing -> return () Just (SecurityPlain p u) -> do ZMQ.setPlainPassword (ZMQ.restrict p) push ZMQ.setPlainUserName (ZMQ.restrict u) push state <- newMVar . RemoteEndPointPending =<< newIORef [] opened <- newIORef False let rep = RemoteEndPoint theirAddr state opened return ( LocalEndPointValid . (localEndPointRemotes ^: (Map.insert theirAddr rep)) $ v , initialize push rep >> return (Right rep)) ourAddr = localEndPointAddress ourEp initialize push rep = do lock <- newEmptyMVar x <- Async.async $ do takeMVar lock ZMQ.connect push (B8.unpack $ endPointAddressToByteString theirAddr) monitor <- ZMQ.monitor [ZMQ.ConnectedEvent] ctx push putMVar lock () r <- Async.race (threadDelay 100000) (monitor True) void $ monitor False case r of Left _ -> do _ <- swapMVar (remoteEndPointState rep) RemoteEndPointFailed Async.cancel x ZMQ.closeZeroLinger push Right _ -> do ZMQ.send push [] $ encode' $ MessageConnect ourAddr let v = ValidRemoteEndPoint push (Counter 0 Map.empty) Set.empty 0 modifyMVar_ (remoteEndPointState rep) $ \case RemoteEndPointPending p -> do z <- foldM (\y f -> f y) (RemoteEndPointValid v) . Prelude.reverse =<< readIORef p modifyIORef (remoteEndPointOpened rep) (const True) return z RemoteEndPointValid _ -> throwM $ InvariantViolation "RemoteEndPoint valid." RemoteEndPointClosed -> return RemoteEndPointClosed RemoteEndPointClosing j -> return (RemoteEndPointClosing j) RemoteEndPointFailed -> return RemoteEndPointFailed -- | Close all endpoint connections, return previous state in case -- if it was alive, for further cleanup actions. cleanupRemoteEndPoint :: LocalEndPoint -> RemoteEndPoint -> (Maybe (ValidRemoteEndPoint -> IO ())) -> IO (Maybe RemoteEndPointState) cleanupRemoteEndPoint lep rep actions = modifyMVar (localEndPointState lep) $ \case LocalEndPointValid v -> do modifyIORef (remoteEndPointOpened rep) (const False) oldState <- swapMVar (remoteEndPointState rep) newState case oldState of RemoteEndPointValid w -> do let cn = w ^. (remoteEndPointPendingConnections >>> counterValues) traverse_ (\c -> void $ swapMVar (connectionState c) ZMQConnectionFailed) cn cn' <- foldM (\c' idx -> case idx `Map.lookup` cn of Nothing -> return c' Just c -> do void $ swapMVar (connectionState c) ZMQConnectionFailed return $ (counterValues ^: (Map.delete idx)) $ c' ) (v ^. localEndPointConnections) (Set.toList (_remoteEndPointIncommingConnections w)) case actions of Nothing -> return () Just f -> f w return ( LocalEndPointValid (localEndPointConnections ^= cn' $ v) , Just oldState) _ -> return (LocalEndPointValid v, Nothing) c -> return (c, Nothing) where newState = RemoteEndPointFailed -- | Close, all network connections. closeRemoteEndPoint :: LocalEndPoint -> RemoteEndPoint -> RemoteEndPointState -> IO () closeRemoteEndPoint lep rep state = step1 >> step2 state where step1 = modifyMVar_ (localEndPointState lep) $ \case LocalEndPointValid v -> return $ LocalEndPointValid . (localEndPointRemotes ^: (Map.delete (remoteEndPointAddress rep))) $ v c -> return c step2 (RemoteEndPointValid v) = do ZMQ.closeZeroLinger (remoteEndPointSocket v) step2 (RemoteEndPointClosing (ClosingRemoteEndPoint sock rd)) = do _ <- readMVar rd ZMQ.closeZeroLinger sock step2 _ = return () remoteEndPointClose :: Bool -> LocalEndPoint -> RemoteEndPoint -> IO () remoteEndPointClose silent lep rep = do modifyIORef (remoteEndPointOpened rep) (const False) join $ modifyMVar (remoteEndPointState rep) $ \o -> case o of RemoteEndPointFailed -> return (o, return ()) RemoteEndPointClosed -> return (o, return ()) RemoteEndPointClosing (ClosingRemoteEndPoint _ l) -> return (o, void $ readMVar l) RemoteEndPointPending _ -> closing (error "Pending actions should not be executed") o -- XXX: store socket, or delay RemoteEndPointValid v -> closing (remoteEndPointSocket v) o where closing sock old = do lock <- newEmptyMVar return (RemoteEndPointClosing (ClosingRemoteEndPoint sock lock), go lock old) go lock old@(RemoteEndPointValid (ValidRemoteEndPoint c _ s i)) = do -- close all connections void $ cleanupRemoteEndPoint lep rep Nothing withMVar (localEndPointState lep) $ \case LocalEndPointClosed -> return () LocalEndPointValid v -> do -- notify about all connections close (?) do we really want it? traverse_ (atomically . writeTMChan (localEndPointChan v) . ConnectionClosed) (Set.toList s) -- if we have outgoing connections, then we have connection error when (i > 0) $ atomically $ writeTMChan (localEndPointChan v) $ ErrorEvent $ TransportError (EventConnectionLost (remoteEndPointAddress rep)) "Remote end point closed." -- notify other side about closing connection unless silent $ do ZMQ.send c [] (encode' (MessageEndPointClose (localEndPointAddress lep) True)) yield void $ Async.race (readMVar lock) (threadDelay 1000000) void $ tryPutMVar lock () closeRemoteEndPoint lep rep old return () go _ _ = return () connectionCleanup :: RemoteEndPoint -> ConnectionId -> IO () connectionCleanup rep cid = modifyMVar_ (remoteEndPointState rep) $ \case RemoteEndPointValid v -> return $ RemoteEndPointValid v{_remoteEndPointIncommingConnections = Set.delete cid (_remoteEndPointIncommingConnections v)} c -> return c encode' :: Binary a => a -> ByteString encode' = B.concat . BL.toChunks . encode decode' :: Binary a => ByteString -> a decode' s = decode . BL.fromChunks $ [s] onValidEndPoint :: LocalEndPoint -> (ValidLocalEndPoint -> IO ()) -> IO () onValidEndPoint lep f = withMVar (localEndPointState lep) $ \case LocalEndPointValid v -> f v _ -> return () onValidRemote :: RemoteEndPoint -> (ValidRemoteEndPoint -> IO ()) -> IO () onValidRemote rep f = withMVar (remoteEndPointState rep) $ \case RemoteEndPointValid v -> f v _ -> return () afterP :: a -> IO (IO a) afterP = return . return tryZMQ :: (MonadCatch m) => m a -> m (Either ZMQError a) tryZMQ = try . promoteZMQException mapZMQException :: (Exception e) => (ZMQError -> e) -> a -> a mapZMQException = mapException -- | Break endpoint connection, all endpoints that will be affected. breakConnection :: TransportInternals -> EndPointAddress -> EndPointAddress -> IO () breakConnection zmqt _from to = Foldable.sequence_ <=< withMVar (transportState zmqt) $ \case TransportValid v -> Traversable.forM (v ^. transportEndPoints) $ \x -> withMVar (localEndPointState x) $ \case LocalEndPointValid w -> return $ Foldable.sequence_ $ flip Map.mapWithKey (w ^. localEndPointRemotes) $ \key rep -> if onDeadHost key then do mz <- cleanupRemoteEndPoint x rep Nothing flip traverse_ mz $ \z -> do atomically $ writeTMChan (localEndPointChan w) $ ErrorEvent $ TransportError (EventConnectionLost key) "Manual connection break" closeRemoteEndPoint x rep z else return () LocalEndPointClosed -> afterP () TransportClosed -> return Map.empty where dh = B8.init . fst $ B8.spanEnd (/=':') (endPointAddressToByteString to) onDeadHost = B8.isPrefixOf dh . endPointAddressToByteString -- | Break connection between two endpoints, other endpoints on the same -- remove will not be affected. breakConnectionEndPoint :: TransportInternals -> EndPointAddress -> EndPointAddress -> IO () breakConnectionEndPoint zmqt from to = one from to >> one to from where one f t = join $ withMVar (transportState zmqt) $ \case TransportValid v -> case v ^. transportEndPointAt f of Nothing -> afterP () Just x -> withMVar (localEndPointState x) $ \case LocalEndPointValid w -> case w ^. localEndPointRemoteAt t of Nothing -> afterP () Just y -> return $ do mz <- cleanupRemoteEndPoint x y Nothing case mz of Nothing -> return () Just z -> do onValidEndPoint x $ \j -> atomically $ writeTMChan (localEndPointChan j) $ ErrorEvent $ TransportError (EventConnectionLost to) "Exception on remote side" closeRemoteEndPoint x y z LocalEndPointClosed -> afterP () TransportClosed -> afterP () -- | Configure socket after creation unsafeConfigurePush :: TransportInternals -> EndPointAddress -> EndPointAddress -> (ZMQ.Socket ZMQ.Push -> IO ()) -> IO () unsafeConfigurePush zmqt from to f = withMVar (transportState zmqt) $ \case TransportValid v -> Foldable.traverse_ (\x -> withMVar (localEndPointState x) $ \case LocalEndPointValid w -> case w ^. localEndPointRemoteAt to of Nothing -> return () Just y -> onValidRemote y $ f . remoteEndPointSocket LocalEndPointClosed -> return () ) (v ^. transportEndPointAt from) TransportClosed -> return () -- | Create a new multicast group associated with an EndPoint. -- -- For multicast addresses zeromq uses two sockets. Is uses Pub-Sub sockets for multicast -- delivery and one Req-Rep socket for sending messages to the EndPoint. That -- endpoint will retransmit messages to all subscribers. -- -- If Hints are used to specify ports then the address will have the form: -- -- > host:Port:ControlPort -- -- where Port is 'hintsPort' and ControlPort is 'hintsControlPort'. If the hint port -- is not specified then random ports will be used. apiNewMulticastGroup :: Hints -- ^ Multicast group hints. -> TransportInternals -- ^ Internal transport state. -> LocalEndPoint -- ^ EndPoint that is associated with the multicast group. -> IO ( Either (TransportError NewMulticastGroupErrorCode) MulticastGroup) apiNewMulticastGroup hints zmq lep = withMVar (transportState zmq) $ \case TransportClosed -> return $ Left $ TransportError NewMulticastGroupFailed "Transport is closed." TransportValid vt -> modifyMVar (localEndPointState lep) $ \case LocalEndPointClosed -> return (LocalEndPointClosed, Left $ TransportError NewMulticastGroupFailed "Transport is closed.") LocalEndPointValid vl -> mask_ $ do (pub, portPub, rep, portRep, wrkThread) <- mkPublisher vt let addr = MulticastAddress $ B8.concat [transportAddress zmq,":",B8.pack (show portPub), ":",B8.pack (show portRep)] subAddr = extractPubAddress addr repAddr = extractRepAddress addr -- subscriber api sub <- ZMQ.socket (vt ^. transportContext) ZMQ.Sub ZMQ.connect sub (B8.unpack subAddr) ZMQ.subscribe sub "" subscribed <- newIORef False (subThread, mgroup) <- fixIO $ \ ~(tid,_) -> do v <- newMVar (MulticastGroupValid (ValidMulticastGroup subscribed)) let mgroup = ZMQMulticastGroup v (apiDeleteMulticastGroupLocal v lep addr rep repAddr pub sub subAddr (Just tid) wrkThread) tid' <- Async.async $ do repeatWhile $ do (ctrl: msg) <- ZMQ.receiveMulti sub if B8.null ctrl then do opened <- readIORef subscribed when opened $ atomically $ writeTMChan (localEndPointChan vl) $ ReceivedMulticast addr msg return True else return False return (tid', mgroup) return ( LocalEndPointValid vl , Right $ MulticastGroup { multicastAddress = addr , deleteMulticastGroup = apiDeleteMulticastGroupLocal (multicastGroupState mgroup) lep addr rep repAddr pub sub subAddr (Just subThread) wrkThread , maxMsgSize = Nothing , multicastSend = apiMulticastSendLocal mgroup pub , multicastSubscribe = apiMulticastSubscribe mgroup , multicastUnsubscribe = apiMulticastUnsubscribe mgroup , multicastClose = apiMulticastClose } ) where mkPublisher vt = do pub <- ZMQ.socket (vt^.transportContext) ZMQ.Pub portPub <- case hintsPort hints of Nothing -> ZMQ.bindRandomPort pub (B8.unpack $ transportAddress zmq) Just i -> do ZMQ.bind pub (B8.unpack (transportAddress zmq) ++ ":" ++ show i) return i rep <- ZMQ.socket (vt^.transportContext) ZMQ.Rep portRep <- case hintsControlPort hints of Nothing -> ZMQ.bindRandomPort rep (B8.unpack $ transportAddress zmq) Just i -> do ZMQ.bind rep (B8.unpack (transportAddress zmq) ++ ":" ++ show i) return i wrkThread <- Async.async $ forever $ do msg <- ZMQ.receiveMulti rep ZMQ.sendMulti pub $ "" :| msg ZMQ.send rep [] "OK" return (pub,portPub,rep,portRep, wrkThread) apiResolveMulticastGroup :: TransportInternals -> LocalEndPoint -> MulticastAddress -> IO (Either (TransportError ResolveMulticastGroupErrorCode) MulticastGroup) apiResolveMulticastGroup zmq lep addr = withMVar (transportState zmq) $ \case TransportClosed -> return $ Left $ TransportError ResolveMulticastGroupFailed "Transport is closed." TransportValid vt -> modifyMVar (localEndPointState lep) $ \case LocalEndPointClosed -> return (LocalEndPointClosed, Left $ TransportError ResolveMulticastGroupFailed "Transport is closed.") LocalEndPointValid vl -> mask_ $ do -- socket allocation req <- ZMQ.socket (vt^.transportContext) ZMQ.Req ZMQ.connect req (B8.unpack reqAddr) sub <- ZMQ.socket (vt^.transportContext) ZMQ.Sub ZMQ.connect sub (B8.unpack subAddr) ZMQ.subscribe sub "" subscribed <- newIORef False (subThread, mgroup) <- fixIO $ \ ~(tid,_) -> do v <- newMVar (MulticastGroupValid (ValidMulticastGroup subscribed)) let mgroup = ZMQMulticastGroup v (apiDeleteMulticastGroupRemote v lep addr req reqAddr sub subAddr (Just tid)) tid' <- Async.async $ repeatWhile $ do (ctrl: msg) <- ZMQ.receiveMulti sub if B8.null ctrl then do opened <- readIORef subscribed when opened $ atomically $ writeTMChan (localEndPointChan vl) $ ReceivedMulticast addr msg return True else do apiDeleteMulticastGroupRemote v lep addr req reqAddr sub subAddr Nothing return False return (tid', mgroup) return ( LocalEndPointValid vl{_localEndPointMulticastGroups = Map.insert addr mgroup (_localEndPointMulticastGroups vl)} , Right $ MulticastGroup { multicastAddress = addr , deleteMulticastGroup = apiDeleteMulticastGroupRemote (multicastGroupState mgroup) lep addr req reqAddr sub subAddr (Just subThread) , maxMsgSize = Nothing , multicastSend = apiMulticastSendRemote mgroup req , multicastSubscribe = apiMulticastSubscribe mgroup , multicastUnsubscribe = apiMulticastUnsubscribe mgroup , multicastClose = apiMulticastClose } ) where reqAddr = extractRepAddress addr subAddr = extractPubAddress addr apiDeleteMulticastGroupRemote :: MVar MulticastGroupState -> LocalEndPoint -> MulticastAddress -> ZMQ.Socket ZMQ.Req -> ByteString -> ZMQ.Socket ZMQ.Sub -> ByteString -> Maybe (Async.Async ()) -> IO () apiDeleteMulticastGroupRemote mstate lep addr req _reqAddr sub _subAddr mtid = mask_ $ do Foldable.traverse_ (\tid -> Async.cancel tid >> void (Async.waitCatch tid)) mtid modifyMVar_ mstate $ \case MulticastGroupClosed -> return MulticastGroupClosed MulticastGroupValid v -> do modifyIORef (_multicastGroupSubscribed v) (const False) ZMQ.closeZeroLinger req ZMQ.unsubscribe sub "" ZMQ.closeZeroLinger sub return MulticastGroupClosed modifyMVar_ (localEndPointState lep) $ \case LocalEndPointClosed -> return LocalEndPointClosed LocalEndPointValid v -> return $ LocalEndPointValid v{_localEndPointMulticastGroups = Map.delete addr (_localEndPointMulticastGroups v)} apiDeleteMulticastGroupLocal :: MVar MulticastGroupState -> LocalEndPoint -> MulticastAddress -> ZMQ.Socket ZMQ.Rep -> ByteString -> ZMQ.Socket ZMQ.Pub -> ZMQ.Socket ZMQ.Sub -> ByteString -> Maybe (Async.Async ()) -> Async.Async () -> IO () apiDeleteMulticastGroupLocal mstate lep addr rep _repAddr pub sub _pubAddr mtid wrk = mask_ $ do Foldable.traverse_ (\tid -> Async.cancel tid >> void (Async.waitCatch tid)) mtid void $ Async.cancel wrk >> Async.waitCatch wrk modifyMVar_ mstate $ \case MulticastGroupClosed -> return MulticastGroupClosed MulticastGroupValid v -> do modifyIORef (_multicastGroupSubscribed v) (const False) ZMQ.sendMulti pub $ "C" :| [] threadDelay 50000 ZMQ.closeZeroLinger rep ZMQ.unsubscribe sub "" ZMQ.closeZeroLinger sub ZMQ.closeZeroLinger pub return MulticastGroupClosed modifyMVar_ (localEndPointState lep) $ \case LocalEndPointClosed -> return LocalEndPointClosed LocalEndPointValid v -> return $ LocalEndPointValid v{_localEndPointMulticastGroups = Map.delete addr (_localEndPointMulticastGroups v)} apiMulticastSendLocal :: (ZMQ.Sender a) => ZMQMulticastGroup -> ZMQ.Socket a -> [ByteString] -> IO () apiMulticastSendLocal mgroup sock msg = withMVar (multicastGroupState mgroup) $ \case MulticastGroupClosed -> return () MulticastGroupValid _ -> do ZMQ.sendMulti sock $ "" :| msg apiMulticastSendRemote :: (ZMQ.Sender a, ZMQ.Receiver a) => ZMQMulticastGroup -> ZMQ.Socket a -> [ByteString] -> IO () apiMulticastSendRemote _ _ [] = return () apiMulticastSendRemote mgroup sock (m:msg) = withMVar (multicastGroupState mgroup) $ \case MulticastGroupClosed -> return () MulticastGroupValid _ -> do ZMQ.sendMulti sock $ m :| msg "OK" <- ZMQ.receive sock -- XXX: timeout return () apiMulticastSubscribe :: ZMQMulticastGroup -> IO () apiMulticastSubscribe mgroup = withMVar (multicastGroupState mgroup) $ \case MulticastGroupClosed -> return () MulticastGroupValid v -> do modifyIORef (_multicastGroupSubscribed v) (const True) apiMulticastUnsubscribe :: ZMQMulticastGroup -> IO () apiMulticastUnsubscribe mgroup = withMVar (multicastGroupState mgroup) $ \case MulticastGroupClosed -> return () MulticastGroupValid v -> modifyIORef (_multicastGroupSubscribed v) (const False) apiMulticastClose :: IO () apiMulticastClose = return () -- $cleanup -- Cleanup API is prepared to store cleanup actions for example socket -- close for objects that uses network-transport-zeromq resources. Theese -- actions will be fired on transport close. -- If you need to clean resource beforehead use 'applyCleanupAction'. -- | Register action that will be fired when transport will be closed. registerCleanupAction :: TransportInternals -> IO () -> IO (Maybe Unique) registerCleanupAction zmq fn = withMVar (transportState zmq) $ \case TransportValid v -> registerValidCleanupAction v fn TransportClosed -> return Nothing -- | Register action on a locked transport. registerValidCleanupAction :: ValidTransportState -> IO () -> IO (Maybe Unique) registerValidCleanupAction v fn = Just <$> do u <- newUnique atomicModifyIORef' (v ^. transportSockets) (\m -> (IntMap.insert (hashUnique u) fn m, u)) -- | Call cleanup action before transport close. applyCleanupAction :: TransportInternals -> Unique -> IO () applyCleanupAction zmq u = withMVar (transportState zmq) $ \case TransportValid v -> mask_ $ traverse_ id =<< atomicModifyIORef' (v ^. transportSockets) (liftA2 (,) (IntMap.delete (hashUnique u)) (IntMap.lookup (hashUnique u))) TransportClosed -> return () extractRepAddress :: MulticastAddress -> ByteString extractRepAddress (MulticastAddress bs) = B8.concat [a,":",p] where (x,p) = B8.spanEnd (/=':') bs a = B8.init . fst . B8.spanEnd (/=':') . B8.init $ x extractPubAddress :: MulticastAddress -> ByteString extractPubAddress (MulticastAddress bs) = B8.init . fst $ B8.spanEnd (/=':') bs repeatWhile :: Monad m => (m Bool) -> m () repeatWhile f = f >>= \x -> if x then repeatWhile f else return () -- | Map ZMQ.ZMQError to network-transport ZMQError promoteZMQException :: a -> a promoteZMQException = mapException DriverError -- | Print error to standart output, this function should be used for -- errors that could not be handled in a normal way. errorLog :: Show a => a -> IO () errorLog s = hPutStrLn stderr (printf "[network-transport-zeromq] Unhandled error: %s" $ show s) -- $zeromqs -- network-transport ZeroMQ has a big number of additional options that can be used for socket -- configuration and that are not exposed in network-transport abstraction layer. In order to -- to use specific options of network-transport-zeromq, the function 'apiNewEndPoint' was introduced. -- This function uses 'TransportInternals' and a notion of 'Hints' -- a special data type that contains -- the possible configuration options. -- -- __Example: Bootstrapping problem.__ -- -- Due to the network-transport-zeromq design, a new endpoint is bound to a new socket address, -- making it impossible to bootstrap systems without an additional communication mechanism. -- This can be avoided if the new function is used to create an endpoint on a specified port: -- -- > (intenals, transport) <- createTransportExposeInternals defaultZMQParameters "127.0.0.1" -- > ep <- apiNewEndpoint Hints{hintsPort=8888} internals -- -- Allow to create a endpoint on a specified port and as a result bootstrap the system.