{-# LANGUAGE GADTs #-} -- | -- Module : System.ZMQ3 -- Copyright : (c) 2010-2013 Toralf Wittner -- License : MIT -- Maintainer : Toralf Wittner -- Stability : experimental -- Portability : non-portable -- -- 0MQ haskell binding. The API closely follows the C-API of 0MQ with -- the main difference that sockets are typed. -- The documentation of the individual socket types is copied from -- 0MQ's man pages authored by Martin Sustrik. For details please -- refer to -- -- Differences to zeromq-haskell 2.x -- -- /Socket Types/ -- -- * 'System.ZMQ.Up' and 'System.ZMQ.Down' no longer exist. -- -- * 'XReq' is renamed to 'Dealer' and 'XRep' is renamed to 'Router' -- (in accordance with libzmq). 'XReq' and 'XRep' are available as -- deprecated aliases. -- -- * Renamed type-classes: -- @'SType' -\> 'SocketType'@, @'SubsType' -\> 'Subscriber'@. -- -- * New type-classes: -- 'Sender', 'Receiver' -- -- /Socket Options/ -- -- Instead of a single 'SocketOption' data-type, getter and setter -- functions are provided, e.g. one would write: @'affinity' sock@ instead of -- @getOption sock (Affinity 0)@ -- -- /Restrictions/ -- -- Many option setters use a 'Restriction' to further constrain the -- range of possible values of their integral types. For example -- the maximum message size can be given as -1, which means no limit -- or by greater values, which denote the message size in bytes. The -- type of 'setMaxMessageSize' is therefore: -- -- @setMaxMessageSize :: 'Integral' i => 'Restricted' 'Nneg1' 'Int64' i -> 'Socket' a -> 'IO' ()@ -- -- which means any integral value in the range of @-1@ to -- (@'maxBound' :: 'Int64'@) can be given. To create a restricted -- value from plain value, use 'toRestricted' or 'restrict'. -- -- /Devices/ -- -- Devices are no longer present in 0MQ 3.x and consequently have been -- removed form this binding as well. -- -- /Error Handling/ -- -- The type 'ZMQError' is introduced, together with inspection functions 'errno', -- 'source' and 'message'. @zmq_strerror@ is used underneath to retrieve the -- correct error message. ZMQError will be thrown when native 0MQ procedures return -- an error status and it can be 'catch'ed as an 'Exception'. module System.ZMQ3 ( -- * Type Definitions Size , Context , Socket , Flag (SendMore) , Switch (..) , Timeout , Event (..) , EventType (..) , EventMsg (..) , Poll (..) -- ** Type Classes , SocketType , Sender , Receiver , Subscriber , SocketLike -- ** Socket Types , Pair(..) , Pub(..) , Sub(..) , XPub(..) , XSub(..) , Req(..) , Rep(..) , Dealer(..) , Router(..) , XReq , XRep , Pull(..) , Push(..) -- * General Operations , withContext , withSocket , bind , unbind , connect , send , send' , sendMulti , receive , receiveMulti , version , monitor , poll , System.ZMQ3.subscribe , System.ZMQ3.unsubscribe -- * Context Options (Read) , ioThreads , maxSockets -- * Context Options (Write) , setIoThreads , setMaxSockets -- * Socket Options (Read) , System.ZMQ3.affinity , System.ZMQ3.backlog , System.ZMQ3.delayAttachOnConnect , System.ZMQ3.events , System.ZMQ3.fileDescriptor , System.ZMQ3.identity , System.ZMQ3.ipv4Only , System.ZMQ3.lastEndpoint , System.ZMQ3.linger , System.ZMQ3.maxMessageSize , System.ZMQ3.mcastHops , System.ZMQ3.moreToReceive , System.ZMQ3.rate , System.ZMQ3.receiveBuffer , System.ZMQ3.receiveHighWM , System.ZMQ3.receiveTimeout , System.ZMQ3.reconnectInterval , System.ZMQ3.reconnectIntervalMax , System.ZMQ3.recoveryInterval , System.ZMQ3.sendBuffer , System.ZMQ3.sendHighWM , System.ZMQ3.sendTimeout , System.ZMQ3.tcpKeepAlive , System.ZMQ3.tcpKeepAliveCount , System.ZMQ3.tcpKeepAliveIdle , System.ZMQ3.tcpKeepAliveInterval -- * Socket Options (Write) , setAffinity , setBacklog , setDelayAttachOnConnect , setIdentity , setIpv4Only , setLinger , setMaxMessageSize , setMcastHops , setRate , setReceiveBuffer , setReceiveHighWM , setReceiveTimeout , setReconnectInterval , setReconnectIntervalMax , setRecoveryInterval , setRouterMandatory , setSendBuffer , setSendHighWM , setSendTimeout , setTcpAcceptFilter , setTcpKeepAlive , setTcpKeepAliveCount , setTcpKeepAliveIdle , setTcpKeepAliveInterval , setXPubVerbose -- * Restrictions , Data.Restricted.restrict , Data.Restricted.toRestricted -- * Error Handling , ZMQError , errno , source , message -- * Low-level Functions , init , term , context , destroy , socket , close , waitRead , waitWrite -- * Utils , proxy ) where import Prelude hiding (init) import Control.Applicative import Control.Exception import Control.Monad (unless) import Control.Monad.IO.Class import Data.List (intersect, foldl') import Data.List.NonEmpty (NonEmpty) import Data.Restricted import Data.Traversable (forM) import Foreign hiding (throwIf, throwIf_, throwIfNull, void) import Foreign.C.String import Foreign.C.Types (CInt, CShort) import System.Posix.Types (Fd(..)) import System.ZMQ3.Base import System.ZMQ3.Internal import System.ZMQ3.Error import qualified Data.ByteString as SB import qualified Data.ByteString.Lazy as LB import qualified Data.List.NonEmpty as S import qualified Prelude as P import qualified System.ZMQ3.Base as B import GHC.Conc (threadWaitRead, threadWaitWrite) -- | Socket to communicate with a single peer. Allows for only a -- single connect or a single bind. There's no message routing -- or message filtering involved. /Compatible peer sockets/: 'Pair'. data Pair = Pair -- | Socket to distribute data. 'receive' function is not -- implemented for this socket type. Messages are distributed in -- fanout fashion to all the peers. /Compatible peer sockets/: 'Sub'. data Pub = Pub -- | Socket to subscribe for data. 'send' function is not implemented -- for this socket type. Initially, socket is subscribed for no -- messages. Use 'subscribe' to specify which messages to subscribe for. -- /Compatible peer sockets/: 'Pub'. data Sub = Sub -- | Same as 'Pub' except that you can receive subscriptions from the -- peers in form of incoming messages. Subscription message is a byte 1 -- (for subscriptions) or byte 0 (for unsubscriptions) followed by the -- subscription body. -- /Compatible peer sockets/: 'Sub', 'XSub'. data XPub = XPub -- | Same as 'Sub' except that you subscribe by sending subscription -- messages to the socket. Subscription message is a byte 1 (for subscriptions) -- or byte 0 (for unsubscriptions) followed by the subscription body. -- /Compatible peer sockets/: 'Pub', 'XPub'. data XSub = XSub -- | Socket to send requests and receive replies. Requests are -- load-balanced among all the peers. This socket type allows only an -- alternated sequence of send's and recv's. -- /Compatible peer sockets/: 'Rep', 'Router'. data Req = Req -- | Socket to receive requests and send replies. This socket type -- allows only an alternated sequence of receive's and send's. Each -- send is routed to the peer that issued the last received request. -- /Compatible peer sockets/: 'Req', 'Dealer'. data Rep = Rep -- | Each message sent is round-robined among all connected peers, -- and each message received is fair-queued from all connected peers. -- /Compatible peer sockets/: 'Router', 'Req', 'Rep'. data Dealer = Dealer -- | /Deprecated Alias/ type XReq = Dealer {-# DEPRECATED XReq "Use Dealer" #-} -- | When receiving messages a Router socket shall prepend a message -- part containing the identity of the originating peer to -- the message before passing it to the application. Messages -- received are fair-queued from among all connected peers. When -- sending messages a Router socket shall remove the first part of -- the message and use it to determine the identity of the peer the -- message shall be routed to. If the peer does not exist anymore -- the message shall be silently discarded. -- /Compatible peer sockets/: 'Dealer', 'Req', 'Rep'. data Router = Router -- | /Deprecated Alias/ type XRep = Router {-# DEPRECATED XRep "Use Router" #-} -- | A socket of type Pull is used by a pipeline node to receive -- messages from upstream pipeline nodes. Messages are fair-queued from -- among all connected upstream nodes. The zmq_send() function is not -- implemented for this socket type. data Pull = Pull -- | A socket of type Push is used by a pipeline node to send messages -- to downstream pipeline nodes. Messages are load-balanced to all connected -- downstream nodes. The zmq_recv() function is not implemented for this -- socket type. -- -- When a Push socket enters an exceptional state due to having reached -- the high water mark for all downstream nodes, or if there are no -- downstream nodes at all, then any zmq_send(3) operations on the socket -- shall block until the exceptional state ends or at least one downstream -- node becomes available for sending; messages are not discarded. data Push = Push -- | Sockets which can 'subscribe'. class Subscriber a -- | Sockets which can 'send'. class Sender a -- | Sockets which can 'receive'. class Receiver a instance SocketType Pair where zmqSocketType = const pair instance Sender Pair instance Receiver Pair instance SocketType Pub where zmqSocketType = const pub instance Sender Pub instance SocketType Sub where zmqSocketType = const sub instance Subscriber Sub instance Receiver Sub instance SocketType XPub where zmqSocketType = const xpub instance Sender XPub instance Receiver XPub instance SocketType XSub where zmqSocketType = const xsub instance Sender XSub instance Receiver XSub instance SocketType Req where zmqSocketType = const request instance Sender Req instance Receiver Req instance SocketType Rep where zmqSocketType = const response instance Sender Rep instance Receiver Rep instance SocketType Dealer where zmqSocketType = const dealer instance Sender Dealer instance Receiver Dealer instance SocketType Router where zmqSocketType = const router instance Sender Router instance Receiver Router instance SocketType Pull where zmqSocketType = const pull instance Receiver Pull instance SocketType Push where zmqSocketType = const push instance Sender Push -- | Socket events. data Event = In -- ^ ZMQ_POLLIN (incoming messages) | Out -- ^ ZMQ_POLLOUT (outgoing messages, i.e. at least 1 byte can be written) | Err -- ^ ZMQ_POLLERR deriving (Eq, Ord, Read, Show) -- | A 'Poll' value contains the object to poll (a 0MQ socket or a file -- descriptor), the set of 'Event's which are of interest and--optionally-- -- a callback-function which is invoked iff the set of interested events -- overlaps with the actual events. data Poll s m where Sock :: s t -> [Event] -> Maybe ([Event] -> m ()) -> Poll s m File :: Fd -> [Event] -> Maybe ([Event] -> m ()) -> Poll s m -- | Return the runtime version of the underlying 0MQ library as a -- (major, minor, patch) triple. version :: IO (Int, Int, Int) version = with 0 $ \major_ptr -> with 0 $ \minor_ptr -> with 0 $ \patch_ptr -> c_zmq_version major_ptr minor_ptr patch_ptr >> tupleUp <$> peek major_ptr <*> peek minor_ptr <*> peek patch_ptr where tupleUp a b c = (fromIntegral a, fromIntegral b, fromIntegral c) init :: Size -> IO Context init n = do c <- context setIoThreads n c return c {-# DEPRECATED init "Use context" #-} -- | Initialize a 0MQ context (cf. zmq_ctx_new for details). You should -- normally prefer to use 'withContext' instead. context :: IO Context context = Context <$> throwIfNull "init" c_zmq_ctx_new term :: Context -> IO () term = destroy {-# DEPRECATED term "Use destroy" #-} -- | Terminate a 0MQ context (cf. zmq_ctx_destroy). You should normally -- prefer to use 'withContext' instead. destroy :: Context -> IO () destroy c = throwIfMinus1Retry_ "term" . c_zmq_ctx_destroy . _ctx $ c -- | Run an action with a 0MQ context. The 'Context' supplied to your -- action will /not/ be valid after the action either returns or -- throws an exception. withContext :: (Context -> IO a) -> IO a withContext act = bracket (throwIfNull "withContext (new)" $ c_zmq_ctx_new) (throwIfMinus1Retry_ "withContext (destroy)" . c_zmq_ctx_destroy) (act . Context) -- | Run an action with a 0MQ socket. The socket will be closed after running -- the supplied action even if an error occurs. The socket supplied to your -- action will /not/ be valid after the action terminates. withSocket :: SocketType a => Context -> a -> (Socket a -> IO b) -> IO b withSocket c t = bracket (socket c t) close -- | Create a new 0MQ socket within the given context. 'withSocket' provides -- automatic socket closing and may be safer to use. socket :: SocketType a => Context -> a -> IO (Socket a) socket c t = Socket <$> mkSocketRepr t c -- | Close a 0MQ socket. 'withSocket' provides automatic socket closing and may -- be safer to use. close :: Socket a -> IO () close = closeSock . _socketRepr -- | Subscribe Socket to given subscription. subscribe :: Subscriber a => Socket a -> SB.ByteString -> IO () subscribe s = setByteStringOpt s B.subscribe -- | Unsubscribe Socket from given subscription. unsubscribe :: Subscriber a => Socket a -> SB.ByteString -> IO () unsubscribe s = setByteStringOpt s B.unsubscribe -- Read Only -- | Cf. @zmq_getsockopt ZMQ_EVENTS@ events :: Socket a -> IO [Event] events s = toEvents <$> getIntOpt s B.events 0 -- | Cf. @zmq_getsockopt ZMQ_FD@ fileDescriptor :: Socket a -> IO Fd fileDescriptor s = Fd . fromIntegral <$> getInt32Option B.filedesc s -- | Cf. @zmq_getsockopt ZMQ_RCVMORE@ moreToReceive :: Socket a -> IO Bool moreToReceive s = (== 1) <$> getInt32Option B.receiveMore s -- Read -- | Cf. @zmq_ctx_get ZMQ_IO_THREADS@ ioThreads :: Context -> IO Word ioThreads = ctxIntOption "ioThreads" _ioThreads -- | Cf. @zmq_ctx_get ZMQ_MAX_SOCKETS@ maxSockets :: Context -> IO Word maxSockets = ctxIntOption "maxSockets" _maxSockets -- | Cf. @zmq_getsockopt ZMQ_IDENTITY@ identity :: Socket a -> IO SB.ByteString identity s = getByteStringOpt s B.identity -- | Cf. @zmq_getsockopt ZMQ_AFFINITY@ affinity :: Socket a -> IO Word64 affinity s = getIntOpt s B.affinity 0 -- | Cf. @zmq_getsockopt ZMQ_MAXMSGSIZE@ maxMessageSize :: Socket a -> IO Int64 maxMessageSize s = getIntOpt s B.maxMessageSize 0 -- | Cf. @zmq_getsockopt ZMQ_IPV4ONLY@ ipv4Only :: Socket a -> IO Bool ipv4Only s = (== 1) <$> getInt32Option B.ipv4Only s -- | Cf. @zmq_getsockopt ZMQ_BACKLOG@ backlog :: Socket a -> IO Int backlog = getInt32Option B.backlog -- | Cf. @zmq_getsockopt ZMQ_DELAY_ATTACH_ON_CONNECT@ delayAttachOnConnect :: Socket a -> IO Bool delayAttachOnConnect s = (== 1) <$> getInt32Option B.delayAttachOnConnect s -- | Cf. @zmq_getsockopt ZMQ_LINGER@ linger :: Socket a -> IO Int linger = getInt32Option B.linger -- | Cf. @zmq_getsockopt ZMQ_LAST_ENDPOINT@ lastEndpoint :: Socket a -> IO String lastEndpoint s = getStrOpt s B.lastEndpoint -- | Cf. @zmq_getsockopt ZMQ_RATE@ rate :: Socket a -> IO Int rate = getInt32Option B.rate -- | Cf. @zmq_getsockopt ZMQ_RCVBUF@ receiveBuffer :: Socket a -> IO Int receiveBuffer = getInt32Option B.receiveBuf -- | Cf. @zmq_getsockopt ZMQ_RECONNECT_IVL@ reconnectInterval :: Socket a -> IO Int reconnectInterval = getInt32Option B.reconnectIVL -- | Cf. @zmq_getsockopt ZMQ_RECONNECT_IVL_MAX@ reconnectIntervalMax :: Socket a -> IO Int reconnectIntervalMax = getInt32Option B.reconnectIVLMax -- | Cf. @zmq_getsockopt ZMQ_RECOVERY_IVL@ recoveryInterval :: Socket a -> IO Int recoveryInterval = getInt32Option B.recoveryIVL -- | Cf. @zmq_getsockopt ZMQ_SNDBUF@ sendBuffer :: Socket a -> IO Int sendBuffer = getInt32Option B.sendBuf -- | Cf. @zmq_getsockopt ZMQ_MULTICAST_HOPS@ mcastHops :: Socket a -> IO Int mcastHops = getInt32Option B.mcastHops -- | Cf. @zmq_getsockopt ZMQ_RCVHWM@ receiveHighWM :: Socket a -> IO Int receiveHighWM = getInt32Option B.receiveHighWM -- | Cf. @zmq_getsockopt ZMQ_RCVTIMEO@ receiveTimeout :: Socket a -> IO Int receiveTimeout = getInt32Option B.receiveTimeout -- | Cf. @zmq_getsockopt ZMQ_SNDTIMEO@ sendTimeout :: Socket a -> IO Int sendTimeout = getInt32Option B.sendTimeout -- | Cf. @zmq_getsockopt ZMQ_SNDHWM@ sendHighWM :: Socket a -> IO Int sendHighWM = getInt32Option B.sendHighWM -- | Cf. @zmq_getsockopt ZMQ_TCP_KEEPALIVE@ tcpKeepAlive :: Socket a -> IO Switch tcpKeepAlive s = getInt32Option B.tcpKeepAlive s >>= convert . toSwitch where convert Nothing = throwError "Invalid value for ZMQ_TCP_KEEPALIVE" convert (Just i) = return i -- | Cf. @zmq_getsockopt ZMQ_TCP_KEEPALIVE_CNT@ tcpKeepAliveCount :: Socket a -> IO Int tcpKeepAliveCount = getInt32Option B.tcpKeepAliveCount -- | Cf. @zmq_getsockopt ZMQ_TCP_KEEPALIVE_IDLE@ tcpKeepAliveIdle :: Socket a -> IO Int tcpKeepAliveIdle = getInt32Option B.tcpKeepAliveIdle -- | Cf. @zmq_getsockopt ZMQ_TCP_KEEPALIVE_INTVL@ tcpKeepAliveInterval :: Socket a -> IO Int tcpKeepAliveInterval = getInt32Option B.tcpKeepAliveInterval -- Write -- | Cf. @zmq_ctx_set ZMQ_IO_THREADS@ setIoThreads :: Word -> Context -> IO () setIoThreads n = setCtxIntOption "ioThreads" _ioThreads n -- | Cf. @zmq_ctx_set ZMQ_MAX_SOCKETS@ setMaxSockets :: Word -> Context -> IO () setMaxSockets n = setCtxIntOption "maxSockets" _maxSockets n -- | Cf. @zmq_setsockopt ZMQ_IDENTITY@ setIdentity :: Restricted N1 N254 SB.ByteString -> Socket a -> IO () setIdentity x s = setByteStringOpt s B.identity (rvalue x) -- | Cf. @zmq_setsockopt ZMQ_AFFINITY@ setAffinity :: Word64 -> Socket a -> IO () setAffinity x s = setIntOpt s B.affinity x -- | Cf. @zmq_setsockopt ZMQ_DELAY_ATTACH_ON_CONNECT@ setDelayAttachOnConnect :: Bool -> Socket a -> IO () setDelayAttachOnConnect x s = setIntOpt s B.delayAttachOnConnect (bool2cint x) -- | Cf. @zmq_setsockopt ZMQ_MAXMSGSIZE@ setMaxMessageSize :: Integral i => Restricted Nneg1 Int64 i -> Socket a -> IO () setMaxMessageSize x s = setIntOpt s B.maxMessageSize ((fromIntegral . rvalue $ x) :: Int64) -- | Cf. @zmq_setsockopt ZMQ_IPV4ONLY@ setIpv4Only :: Bool -> Socket a -> IO () setIpv4Only x s = setIntOpt s B.ipv4Only (bool2cint x) -- | Cf. @zmq_setsockopt ZMQ_LINGER@ setLinger :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO () setLinger = setInt32OptFromRestricted B.linger -- | Cf. @zmq_setsockopt ZMQ_RCVTIMEO@ setReceiveTimeout :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO () setReceiveTimeout = setInt32OptFromRestricted B.receiveTimeout -- | Cf. @zmq_setsockopt ZMQ_ROUTER_MANDATORY@ setRouterMandatory :: Bool -> Socket Router -> IO () setRouterMandatory x s = setIntOpt s B.routerMandatory (bool2cint x) -- | Cf. @zmq_setsockopt ZMQ_SNDTIMEO@ setSendTimeout :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO () setSendTimeout = setInt32OptFromRestricted B.sendTimeout -- | Cf. @zmq_setsockopt ZMQ_RATE@ setRate :: Integral i => Restricted N1 Int32 i -> Socket a -> IO () setRate = setInt32OptFromRestricted B.rate -- | Cf. @zmq_setsockopt ZMQ_MULTICAST_HOPS@ setMcastHops :: Integral i => Restricted N1 Int32 i -> Socket a -> IO () setMcastHops = setInt32OptFromRestricted B.mcastHops -- | Cf. @zmq_setsockopt ZMQ_BACKLOG@ setBacklog :: Integral i => Restricted N0 Int32 i -> Socket a -> IO () setBacklog = setInt32OptFromRestricted B.backlog -- | Cf. @zmq_setsockopt ZMQ_RCVBUF@ setReceiveBuffer :: Integral i => Restricted N0 Int32 i -> Socket a -> IO () setReceiveBuffer = setInt32OptFromRestricted B.receiveBuf -- | Cf. @zmq_setsockopt ZMQ_RECONNECT_IVL@ setReconnectInterval :: Integral i => Restricted N0 Int32 i -> Socket a -> IO () setReconnectInterval = setInt32OptFromRestricted B.reconnectIVL -- | Cf. @zmq_setsockopt ZMQ_RECONNECT_IVL_MAX@ setReconnectIntervalMax :: Integral i => Restricted N0 Int32 i -> Socket a -> IO () setReconnectIntervalMax = setInt32OptFromRestricted B.reconnectIVLMax -- | Cf. @zmq_setsockopt ZMQ_SNDBUF@ setSendBuffer :: Integral i => Restricted N0 Int32 i -> Socket a -> IO () setSendBuffer = setInt32OptFromRestricted B.sendBuf -- | Cf. @zmq_setsockopt ZMQ_RECOVERY_IVL@ setRecoveryInterval :: Integral i => Restricted N0 Int32 i -> Socket a -> IO () setRecoveryInterval = setInt32OptFromRestricted B.recoveryIVL -- | Cf. @zmq_setsockopt ZMQ_RCVHWM@ setReceiveHighWM :: Integral i => Restricted N0 Int32 i -> Socket a -> IO () setReceiveHighWM = setInt32OptFromRestricted B.receiveHighWM -- | Cf. @zmq_setsockopt ZMQ_SNDHWM@ setSendHighWM :: Integral i => Restricted N0 Int32 i -> Socket a -> IO () setSendHighWM = setInt32OptFromRestricted B.sendHighWM -- | Cf. @zmq_setsockopt ZMQ_TCP_ACCEPT_FILTER@ setTcpAcceptFilter :: Maybe SB.ByteString -> Socket a -> IO () setTcpAcceptFilter Nothing sock = onSocket "setTcpAcceptFilter" sock $ \s -> throwIfMinus1Retry_ "setStrOpt" $ c_zmq_setsockopt s (optVal tcpAcceptFilter) nullPtr 0 setTcpAcceptFilter (Just dat) sock = setByteStringOpt sock tcpAcceptFilter dat -- | Cf. @zmq_setsockopt ZMQ_TCP_KEEPALIVE@ setTcpKeepAlive :: Switch -> Socket a -> IO () setTcpKeepAlive x s = setIntOpt s B.tcpKeepAlive (fromSwitch x :: CInt) -- | Cf. @zmq_setsockopt ZMQ_TCP_KEEPALIVE_CNT@ setTcpKeepAliveCount :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO () setTcpKeepAliveCount = setInt32OptFromRestricted B.tcpKeepAliveCount -- | Cf. @zmq_setsockopt ZMQ_TCP_KEEPALIVE_IDLE@ setTcpKeepAliveIdle :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO () setTcpKeepAliveIdle = setInt32OptFromRestricted B.tcpKeepAliveIdle -- | Cf. @zmq_setsockopt ZMQ_TCP_KEEPALIVE_INTVL@ setTcpKeepAliveInterval :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO () setTcpKeepAliveInterval = setInt32OptFromRestricted B.tcpKeepAliveInterval -- | Cf. @zmq_setsockopt ZMQ_XPUB_VERBOSE@ setXPubVerbose :: Bool -> Socket XPub -> IO () setXPubVerbose x s = setIntOpt s B.xpubVerbose (bool2cint x) -- | Bind the socket to the given address (cf. zmq_bind) bind :: Socket a -> String -> IO () bind sock str = onSocket "bind" sock $ throwIfMinus1Retry_ "bind" . withCString str . c_zmq_bind -- | Unbind the socket from the given address (cf. zmq_unbind) unbind :: Socket a -> String -> IO () unbind sock str = onSocket "unbind" sock $ throwIfMinus1Retry_ "unbind" . withCString str . c_zmq_unbind -- | Connect the socket to the given address (cf. zmq_connect). connect :: Socket a -> String -> IO () connect sock str = onSocket "connect" sock $ throwIfMinus1Retry_ "connect" . withCString str . c_zmq_connect -- | Send the given 'SB.ByteString' over the socket (cf. zmq_sendmsg). -- -- /Note/: This function always calls @zmq_sendmsg@ in a non-blocking way, -- i.e. there is no need to provide the @ZMQ_DONTWAIT@ flag as this is used -- by default. Still 'send' is blocking the thread as long as the message -- can not be queued on the socket using GHC's 'threadWaitWrite'. send :: Sender a => Socket a -> [Flag] -> SB.ByteString -> IO () send sock fls val = bracket (messageOf val) messageClose $ \m -> onSocket "send" sock $ \s -> retry "send" (waitWrite sock) $ c_zmq_sendmsg s (msgPtr m) (combineFlags (DontWait : fls)) -- | Send the given 'LB.ByteString' over the socket (cf. zmq_sendmsg). -- This is operationally identical to @send socket (Strict.concat -- (Lazy.toChunks lbs)) flags@ but may be more efficient. -- -- /Note/: This function always calls @zmq_sendmsg@ in a non-blocking way, -- i.e. there is no need to provide the @ZMQ_DONTWAIT@ flag as this is used -- by default. Still 'send'' is blocking the thread as long as the message -- can not be queued on the socket using GHC's 'threadWaitWrite'. send' :: Sender a => Socket a -> [Flag] -> LB.ByteString -> IO () send' sock fls val = bracket (messageOfLazy val) messageClose $ \m -> onSocket "send'" sock $ \s -> retry "send'" (waitWrite sock) $ c_zmq_sendmsg s (msgPtr m) (combineFlags (DontWait : fls)) -- | Send a multi-part message. -- This function applies the 'SendMore' 'Flag' between all message parts. -- 0MQ guarantees atomic delivery of a multi-part message -- (cf. zmq_sendmsg for details). sendMulti :: Sender a => Socket a -> NonEmpty SB.ByteString -> IO () sendMulti sock msgs = do mapM_ (send sock [SendMore]) (S.init msgs) send sock [] (S.last msgs) -- | Receive a 'ByteString' from socket (cf. zmq_recvmsg). -- -- /Note/: This function always calls @zmq_recvmsg@ in a non-blocking way, -- i.e. there is no need to provide the @ZMQ_DONTWAIT@ flag as this is used -- by default. Still 'receive' is blocking the thread as long as no data -- is available using GHC's 'threadWaitRead'. receive :: Receiver a => Socket a -> IO (SB.ByteString) receive sock = bracket messageInit messageClose $ \m -> onSocket "receive" sock $ \s -> do retry "receive" (waitRead sock) $ c_zmq_recvmsg s (msgPtr m) (flagVal dontWait) data_ptr <- c_zmq_msg_data (msgPtr m) size <- c_zmq_msg_size (msgPtr m) SB.packCStringLen (data_ptr, fromIntegral size) -- | Receive a multi-part message. -- This function collects all message parts send via 'sendMulti'. receiveMulti :: Receiver a => Socket a -> IO [SB.ByteString] receiveMulti sock = recvall [] where recvall acc = do msg <- receive sock moreToReceive sock >>= next (msg:acc) next acc True = recvall acc next acc False = return (reverse acc) -- | Setup socket monitoring, i.e. a 'Pair' socket which -- sends monitoring events about the given 'Socket' to the -- given address. socketMonitor :: [EventType] -> String -> Socket a -> IO () socketMonitor es addr soc = onSocket "socketMonitor" soc $ \s -> withCString addr $ \a -> throwIfMinus1_ "zmq_socket_monitor" $ c_zmq_socket_monitor s a (events2cint es) -- | Monitor socket events. -- This function returns a function which can be invoked to retrieve -- the next socket event, potentially blocking until the next one becomes -- available. When applied to 'False', monitoring will terminate, i.e. -- internal monitoring resources will be disposed. Consequently after -- 'monitor' has been invoked, the returned function must be applied -- /once/ to 'False'. monitor :: [EventType] -> Context -> Socket a -> IO (Bool -> IO (Maybe EventMsg)) monitor es ctx sock = do let addr = "inproc://" ++ show (_socket . _socketRepr $ sock) s <- socket ctx Pair socketMonitor es addr sock connect s addr next s <$> messageInit where next soc m False = messageClose m `finally` close soc >> return Nothing next soc m True = onSocket "recv" soc $ \s -> do retry "recv" (waitRead soc) $ c_zmq_recvmsg s (msgPtr m) (flagVal dontWait) ptr <- c_zmq_msg_data (msgPtr m) str <- peekByteOff ptr zmqEventAddrOffset >>= SB.packCString dat <- peekByteOff ptr zmqEventDataOffset :: IO CInt tag <- peek ptr :: IO CInt return . Just $ eventMessage str dat (ZMQEventType tag) -- | Polls for events on the given 'Poll' descriptors. Returns a list of -- events per descriptor which have occured. poll :: (SocketLike s, MonadIO m) => Timeout -> [Poll s m] -> m [[Event]] poll _ [] = return [] poll to desc = do let len = length desc let ps = map toZMQPoll desc ps' <- liftIO $ withArray ps $ \ptr -> do throwIfMinus1Retry_ "poll" $ c_zmq_poll ptr (fromIntegral len) (fromIntegral to) peekArray len ptr mapM fromZMQPoll (zip desc ps') where toZMQPoll :: (SocketLike s, MonadIO m) => Poll s m -> ZMQPoll toZMQPoll (Sock s e _) = ZMQPoll (_socket . _socketRepr . toSocket $ s) 0 (combine (map fromEvent e)) 0 toZMQPoll (File (Fd s) e _) = ZMQPoll nullPtr (fromIntegral s) (combine (map fromEvent e)) 0 fromZMQPoll :: (SocketLike s, MonadIO m) => (Poll s m, ZMQPoll) -> m [Event] fromZMQPoll (p, zp) = do let e = toEvents . fromIntegral . pRevents $ zp let (e', f) = case p of (Sock _ x g) -> (x, g) (File _ x g) -> (x, g) forM f (unless (null (e `intersect` e')) . ($ e)) >> return e fromEvent :: Event -> CShort fromEvent In = fromIntegral . pollVal $ pollIn fromEvent Out = fromIntegral . pollVal $ pollOut fromEvent Err = fromIntegral . pollVal $ pollerr -- Convert bit-masked word into Event list. toEvents :: Word32 -> [Event] toEvents e = foldl' (\es f -> f e es) [] tests where tests = [ \i xs -> if i .&. (fromIntegral . pollVal $ pollIn) /= 0 then In:xs else xs , \i xs -> if i .&. (fromIntegral . pollVal $ pollOut) /= 0 then Out:xs else xs , \i xs -> if i .&. (fromIntegral . pollVal $ pollerr) /= 0 then Err:xs else xs ] retry :: String -> IO () -> IO CInt -> IO () retry msg wait act = throwIfMinus1RetryMayBlock_ msg act wait wait' :: (Fd -> IO ()) -> ZMQPollEvent -> Socket a -> IO () wait' w f s = do fd <- getIntOpt s B.filedesc 0 w (Fd fd) evs <- getInt32Option B.events s unless (testev evs) $ wait' w f s where testev e = e .&. fromIntegral (pollVal f) /= 0 -- | Wait until data is available for reading from the given Socket. -- After this function returns, a call to 'receive' will essentially be -- non-blocking. waitRead :: Socket a -> IO () waitRead = wait' threadWaitRead pollIn -- | Wait until data can be written to the given Socket. -- After this function returns, a call to 'send' will essentially be -- non-blocking. waitWrite :: Socket a -> IO () waitWrite = wait' threadWaitWrite pollOut -- | Starts built-in 0MQ proxy. -- -- Proxy connects front to back socket -- -- Before calling proxy all sockets should be binded -- -- If the capture socket is not Nothing, the proxy shall send all -- messages, received on both frontend and backend, to the capture socket. proxy :: Socket a -> Socket b -> Maybe (Socket c) -> IO () proxy front back capture = onSocket "proxy-front" front $ \f -> onSocket "proxy-back" back $ \b -> throwIfMinus1Retry_ "c_zmq_proxy" $ c_zmq_proxy f b c where c = maybe nullPtr (_socket . _socketRepr) capture