module System.ZMQ4
(
Pair (..)
, Pub (..)
, Sub (..)
, XPub (..)
, XSub (..)
, Req (..)
, Rep (..)
, Dealer (..)
, Router (..)
, XReq
, XRep
, Pull (..)
, Push (..)
, Stream (..)
, SocketType
, Sender
, Receiver
, Subscriber
, SocketLike
, Conflatable
, SendProbe
, Size
, Context
, Socket
, Flag (..)
, Switch (..)
, Timeout
, Event (..)
, EventType (..)
, EventMsg (..)
, Poll (..)
, KeyFormat (..)
, SecurityMechanism (..)
, withContext
, withSocket
, bind
, unbind
, connect
, disconnect
, send
, send'
, sendMulti
, receive
, receiveMulti
, version
, monitor
, socketMonitor
, poll
, System.ZMQ4.subscribe
, System.ZMQ4.unsubscribe
, ioThreads
, maxSockets
, setIoThreads
, setMaxSockets
, System.ZMQ4.affinity
, System.ZMQ4.backlog
, System.ZMQ4.conflate
, System.ZMQ4.curvePublicKey
, System.ZMQ4.curveSecretKey
, System.ZMQ4.curveServerKey
, System.ZMQ4.delayAttachOnConnect
, System.ZMQ4.events
, System.ZMQ4.fileDescriptor
, System.ZMQ4.identity
, System.ZMQ4.immediate
, System.ZMQ4.ipv4Only
, System.ZMQ4.ipv6
, System.ZMQ4.lastEndpoint
, System.ZMQ4.linger
, System.ZMQ4.maxMessageSize
, System.ZMQ4.mcastHops
, System.ZMQ4.mechanism
, System.ZMQ4.moreToReceive
, System.ZMQ4.plainServer
, System.ZMQ4.plainPassword
, System.ZMQ4.plainUserName
, System.ZMQ4.rate
, System.ZMQ4.receiveBuffer
, System.ZMQ4.receiveHighWM
, System.ZMQ4.receiveTimeout
, System.ZMQ4.reconnectInterval
, System.ZMQ4.reconnectIntervalMax
, System.ZMQ4.recoveryInterval
, System.ZMQ4.sendBuffer
, System.ZMQ4.sendHighWM
, System.ZMQ4.sendTimeout
, System.ZMQ4.tcpKeepAlive
, System.ZMQ4.tcpKeepAliveCount
, System.ZMQ4.tcpKeepAliveIdle
, System.ZMQ4.tcpKeepAliveInterval
, System.ZMQ4.zapDomain
, setAffinity
, setBacklog
, setConflate
, setCurveServer
, setCurvePublicKey
, setCurveSecretKey
, setCurveServerKey
, setDelayAttachOnConnect
, setIdentity
, setImmediate
, setIpv4Only
, setIpv6
, setLinger
, setMaxMessageSize
, setMcastHops
, setPlainServer
, setPlainPassword
, setPlainUserName
, setProbeRouter
, setRate
, setReceiveBuffer
, setReceiveHighWM
, setReceiveTimeout
, setReconnectInterval
, setReconnectIntervalMax
, setRecoveryInterval
, setReqCorrelate
, setReqRelaxed
, setRouterMandatory
, setSendBuffer
, setSendHighWM
, setSendTimeout
, setTcpAcceptFilter
, setTcpKeepAlive
, setTcpKeepAliveCount
, setTcpKeepAliveIdle
, setTcpKeepAliveInterval
, setXPubVerbose
, Data.Restricted.restrict
, Data.Restricted.toRestricted
, ZMQError
, errno
, source
, message
, init
, term
, shutdown
, context
, socket
, close
, waitRead
, waitWrite
, z85Encode
, z85Decode
, proxy
, curveKeyPair
) where
import Prelude hiding (init)
import Control.Applicative
import Control.Exception
import Control.Monad (unless)
import Control.Monad.IO.Class
import Data.List (intersect, foldl')
import Data.List.NonEmpty (NonEmpty)
import Data.Restricted
import Data.Traversable (forM)
import Data.Typeable
import Foreign hiding (throwIf, throwIf_, throwIfNull, void)
import Foreign.C.String
import Foreign.C.Types (CInt, CShort)
import System.Posix.Types (Fd(..))
import System.ZMQ4.Base
import System.ZMQ4.Internal
import System.ZMQ4.Error
import qualified Data.ByteString as SB
import qualified Data.ByteString.Lazy as LB
import qualified Data.List.NonEmpty as S
import qualified Prelude as P
import qualified System.ZMQ4.Base as B
import GHC.Conc (threadWaitRead)
import GHC.Generics(Generic)
data Pair = Pair deriving (Eq, Typeable, Generic)
data Pub = Pub deriving (Eq, Typeable, Generic)
data Sub = Sub deriving (Eq, Typeable, Generic)
data XPub = XPub deriving (Eq, Typeable, Generic)
data XSub = XSub deriving (Eq, Typeable, Generic)
data Req = Req deriving (Eq, Typeable, Generic)
data Rep = Rep deriving (Eq, Typeable, Generic)
data Dealer = Dealer deriving (Eq, Typeable, Generic)
data Router = Router deriving (Eq, Typeable, Generic)
data Pull = Pull deriving (Eq, Typeable, Generic)
data Push = Push deriving (Eq, Typeable, Generic)
data Stream = Stream deriving (Eq, Typeable, Generic)
type XReq = Dealer
type XRep = Router
class Subscriber a
class Sender a
class Receiver a
class Conflatable a
class SendProbe a
instance SocketType Pair where zmqSocketType = const pair
instance Sender Pair
instance Receiver Pair
instance SocketType Pub where zmqSocketType = const pub
instance Sender Pub
instance Conflatable Pub
instance SocketType Sub where zmqSocketType = const sub
instance Subscriber Sub
instance Receiver Sub
instance Conflatable Sub
instance SocketType XPub where zmqSocketType = const xpub
instance Sender XPub
instance Receiver XPub
instance SocketType XSub where zmqSocketType = const xsub
instance Sender XSub
instance Receiver XSub
instance SocketType Req where zmqSocketType = const request
instance Sender Req
instance Receiver Req
instance SendProbe Req
instance SocketType Rep where zmqSocketType = const response
instance Sender Rep
instance Receiver Rep
instance SocketType Dealer where zmqSocketType = const dealer
instance Sender Dealer
instance Receiver Dealer
instance Conflatable Dealer
instance SendProbe Dealer
instance SocketType Router where zmqSocketType = const router
instance Sender Router
instance Receiver Router
instance SendProbe Router
instance SocketType Pull where zmqSocketType = const pull
instance Receiver Pull
instance Conflatable Pull
instance SocketType Push where zmqSocketType = const push
instance Sender Push
instance Conflatable Push
instance SocketType Stream where zmqSocketType = const stream
instance Sender Stream
instance Receiver Stream
data Event =
In
| Out
| Err
deriving (Eq, Ord, Read, Show)
data Poll s m where
Sock :: s t -> [Event] -> Maybe ([Event] -> m ()) -> Poll s m
File :: Fd -> [Event] -> Maybe ([Event] -> m ()) -> Poll s m
version :: IO (Int, Int, Int)
version =
with 0 $ \major_ptr ->
with 0 $ \minor_ptr ->
with 0 $ \patch_ptr ->
c_zmq_version major_ptr minor_ptr patch_ptr >>
tupleUp <$> peek major_ptr <*> peek minor_ptr <*> peek patch_ptr
where
tupleUp a b c = (fromIntegral a, fromIntegral b, fromIntegral c)
init :: Size -> IO Context
init n = do
c <- context
setIoThreads n c
return c
context :: IO Context
context = Context <$> throwIfNull "init" c_zmq_ctx_new
term :: Context -> IO ()
term c = throwIfMinus1Retry_ "term" . c_zmq_ctx_term . _ctx $ c
shutdown :: Context -> IO ()
shutdown = throwIfMinus1_ "shutdown" . c_zmq_ctx_shutdown . _ctx
withContext :: (Context -> IO a) -> IO a
withContext act =
bracket (throwIfNull "withContext (new)" $ c_zmq_ctx_new)
(throwIfMinus1Retry_ "withContext (term)" . c_zmq_ctx_term)
(act . Context)
withSocket :: SocketType a => Context -> a -> (Socket a -> IO b) -> IO b
withSocket c t = bracket (socket c t) close
socket :: SocketType a => Context -> a -> IO (Socket a)
socket c t = Socket <$> mkSocketRepr t c
close :: Socket a -> IO ()
close = closeSock . _socketRepr
subscribe :: Subscriber a => Socket a -> SB.ByteString -> IO ()
subscribe s = setByteStringOpt s B.subscribe
unsubscribe :: Subscriber a => Socket a -> SB.ByteString -> IO ()
unsubscribe s = setByteStringOpt s B.unsubscribe
events :: Socket a -> IO [Event]
events s = toEvents <$> getIntOpt s B.events 0
fileDescriptor :: Socket a -> IO Fd
fileDescriptor s = Fd . fromIntegral <$> getInt32Option B.filedesc s
moreToReceive :: Socket a -> IO Bool
moreToReceive s = (== 1) <$> getInt32Option B.receiveMore s
ioThreads :: Context -> IO Word
ioThreads = ctxIntOption "ioThreads" _ioThreads
maxSockets :: Context -> IO Word
maxSockets = ctxIntOption "maxSockets" _maxSockets
conflate :: Conflatable a => Socket a -> IO Bool
conflate s = (== 1) <$> getInt32Option B.conflate s
immediate :: Socket a -> IO Bool
immediate s = (== 1) <$> getInt32Option B.immediate s
identity :: Socket a -> IO SB.ByteString
identity s = getByteStringOpt s B.identity
affinity :: Socket a -> IO Word64
affinity s = getIntOpt s B.affinity 0
maxMessageSize :: Socket a -> IO Int64
maxMessageSize s = getIntOpt s B.maxMessageSize 0
ipv4Only :: Socket a -> IO Bool
ipv4Only s = (== 1) <$> getInt32Option B.ipv4Only s
ipv6 :: Socket a -> IO Bool
ipv6 s = (== 1) <$> getInt32Option B.ipv6 s
backlog :: Socket a -> IO Int
backlog = getInt32Option B.backlog
delayAttachOnConnect :: Socket a -> IO Bool
delayAttachOnConnect s = (== 1) <$> getInt32Option B.delayAttachOnConnect s
linger :: Socket a -> IO Int
linger = getInt32Option B.linger
lastEndpoint :: Socket a -> IO String
lastEndpoint s = getStrOpt s B.lastEndpoint
rate :: Socket a -> IO Int
rate = getInt32Option B.rate
receiveBuffer :: Socket a -> IO Int
receiveBuffer = getInt32Option B.receiveBuf
reconnectInterval :: Socket a -> IO Int
reconnectInterval = getInt32Option B.reconnectIVL
reconnectIntervalMax :: Socket a -> IO Int
reconnectIntervalMax = getInt32Option B.reconnectIVLMax
recoveryInterval :: Socket a -> IO Int
recoveryInterval = getInt32Option B.recoveryIVL
sendBuffer :: Socket a -> IO Int
sendBuffer = getInt32Option B.sendBuf
mcastHops :: Socket a -> IO Int
mcastHops = getInt32Option B.mcastHops
receiveHighWM :: Socket a -> IO Int
receiveHighWM = getInt32Option B.receiveHighWM
receiveTimeout :: Socket a -> IO Int
receiveTimeout = getInt32Option B.receiveTimeout
sendTimeout :: Socket a -> IO Int
sendTimeout = getInt32Option B.sendTimeout
sendHighWM :: Socket a -> IO Int
sendHighWM = getInt32Option B.sendHighWM
tcpKeepAlive :: Socket a -> IO Switch
tcpKeepAlive = fmap (toSwitch "Invalid ZMQ_TCP_KEEPALIVE")
. getInt32Option B.tcpKeepAlive
tcpKeepAliveCount :: Socket a -> IO Int
tcpKeepAliveCount = getInt32Option B.tcpKeepAliveCount
tcpKeepAliveIdle :: Socket a -> IO Int
tcpKeepAliveIdle = getInt32Option B.tcpKeepAliveIdle
tcpKeepAliveInterval :: Socket a -> IO Int
tcpKeepAliveInterval = getInt32Option B.tcpKeepAliveInterval
mechanism :: Socket a -> IO SecurityMechanism
mechanism = fmap (fromMechanism "Invalid ZMQ_MECHANISM")
. getInt32Option B.mechanism
plainServer :: Socket a -> IO Bool
plainServer = fmap (== 1) . getInt32Option B.plainServer
plainUserName :: Socket a -> IO SB.ByteString
plainUserName s = getByteStringOpt s B.plainUserName
plainPassword :: Socket a -> IO SB.ByteString
plainPassword s = getByteStringOpt s B.plainPassword
zapDomain :: Socket a -> IO SB.ByteString
zapDomain s = getByteStringOpt s B.zapDomain
curvePublicKey :: KeyFormat f -> Socket a -> IO SB.ByteString
curvePublicKey f s = getKey f s B.curvePublicKey
curveServerKey :: KeyFormat f -> Socket a -> IO SB.ByteString
curveServerKey f s = getKey f s B.curveServerKey
curveSecretKey :: KeyFormat f -> Socket a -> IO SB.ByteString
curveSecretKey f s = getKey f s B.curveSecretKey
setIoThreads :: Word -> Context -> IO ()
setIoThreads n = setCtxIntOption "ioThreads" _ioThreads n
setMaxSockets :: Word -> Context -> IO ()
setMaxSockets n = setCtxIntOption "maxSockets" _maxSockets n
setConflate :: Conflatable a => Bool -> Socket a -> IO ()
setConflate x s = setIntOpt s B.conflate (bool2cint x)
setImmediate :: Bool -> Socket a -> IO ()
setImmediate x s = setIntOpt s B.immediate (bool2cint x)
setIdentity :: Restricted (N1, N254) SB.ByteString -> Socket a -> IO ()
setIdentity x s = setByteStringOpt s B.identity (rvalue x)
setAffinity :: Word64 -> Socket a -> IO ()
setAffinity x s = setIntOpt s B.affinity x
setDelayAttachOnConnect :: Bool -> Socket a -> IO ()
setDelayAttachOnConnect x s = setIntOpt s B.delayAttachOnConnect (bool2cint x)
setMaxMessageSize :: Integral i => Restricted (Nneg1, Int64) i -> Socket a -> IO ()
setMaxMessageSize x s = setIntOpt s B.maxMessageSize ((fromIntegral . rvalue $ x) :: Int64)
setIpv4Only :: Bool -> Socket a -> IO ()
setIpv4Only x s = setIntOpt s B.ipv4Only (bool2cint x)
setIpv6 :: Bool -> Socket a -> IO ()
setIpv6 x s = setIntOpt s B.ipv6 (bool2cint x)
setPlainServer :: Bool -> Socket a -> IO ()
setPlainServer x s = setIntOpt s B.plainServer (bool2cint x)
setCurveServer :: Bool -> Socket a -> IO ()
setCurveServer x s = setIntOpt s B.curveServer (bool2cint x)
setPlainUserName :: Restricted (N1, N254) SB.ByteString -> Socket a -> IO ()
setPlainUserName x s = setByteStringOpt s B.plainUserName (rvalue x)
setPlainPassword :: Restricted (N1, N254) SB.ByteString -> Socket a -> IO ()
setPlainPassword x s = setByteStringOpt s B.plainPassword (rvalue x)
setLinger :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
setLinger = setInt32OptFromRestricted B.linger
setReceiveTimeout :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
setReceiveTimeout = setInt32OptFromRestricted B.receiveTimeout
setRouterMandatory :: Bool -> Socket Router -> IO ()
setRouterMandatory x s = setIntOpt s B.routerMandatory (bool2cint x)
setSendTimeout :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
setSendTimeout = setInt32OptFromRestricted B.sendTimeout
setRate :: Integral i => Restricted (N1, Int32) i -> Socket a -> IO ()
setRate = setInt32OptFromRestricted B.rate
setMcastHops :: Integral i => Restricted (N1, Int32) i -> Socket a -> IO ()
setMcastHops = setInt32OptFromRestricted B.mcastHops
setBacklog :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
setBacklog = setInt32OptFromRestricted B.backlog
setCurvePublicKey :: KeyFormat f -> Restricted f SB.ByteString -> Socket a -> IO ()
setCurvePublicKey _ k s = setByteStringOpt s B.curvePublicKey (rvalue k)
setCurveSecretKey :: KeyFormat f -> Restricted f SB.ByteString -> Socket a -> IO ()
setCurveSecretKey _ k s = setByteStringOpt s B.curveSecretKey (rvalue k)
setCurveServerKey :: KeyFormat f -> Restricted f SB.ByteString -> Socket a -> IO ()
setCurveServerKey _ k s = setByteStringOpt s B.curveServerKey (rvalue k)
setProbeRouter :: SendProbe a => Bool -> Socket a -> IO ()
setProbeRouter x s = setIntOpt s B.probeRouter (bool2cint x)
setReceiveBuffer :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
setReceiveBuffer = setInt32OptFromRestricted B.receiveBuf
setReconnectInterval :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
setReconnectInterval = setInt32OptFromRestricted B.reconnectIVL
setReconnectIntervalMax :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
setReconnectIntervalMax = setInt32OptFromRestricted B.reconnectIVLMax
setReqCorrelate :: Bool -> Socket Req -> IO ()
setReqCorrelate x s = setIntOpt s B.reqCorrelate (bool2cint x)
setReqRelaxed :: Bool -> Socket Req -> IO ()
setReqRelaxed x s = setIntOpt s B.reqRelaxed (bool2cint x)
setSendBuffer :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
setSendBuffer = setInt32OptFromRestricted B.sendBuf
setRecoveryInterval :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
setRecoveryInterval = setInt32OptFromRestricted B.recoveryIVL
setReceiveHighWM :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
setReceiveHighWM = setInt32OptFromRestricted B.receiveHighWM
setSendHighWM :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
setSendHighWM = setInt32OptFromRestricted B.sendHighWM
setTcpAcceptFilter :: Maybe SB.ByteString -> Socket a -> IO ()
setTcpAcceptFilter Nothing sock = onSocket "setTcpAcceptFilter" sock $ \s ->
throwIfMinus1Retry_ "setStrOpt" $
c_zmq_setsockopt s (optVal tcpAcceptFilter) nullPtr 0
setTcpAcceptFilter (Just dat) sock = setByteStringOpt sock tcpAcceptFilter dat
setTcpKeepAlive :: Switch -> Socket a -> IO ()
setTcpKeepAlive x s = setIntOpt s B.tcpKeepAlive (fromSwitch x :: CInt)
setTcpKeepAliveCount :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
setTcpKeepAliveCount = setInt32OptFromRestricted B.tcpKeepAliveCount
setTcpKeepAliveIdle :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
setTcpKeepAliveIdle = setInt32OptFromRestricted B.tcpKeepAliveIdle
setTcpKeepAliveInterval :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
setTcpKeepAliveInterval = setInt32OptFromRestricted B.tcpKeepAliveInterval
setXPubVerbose :: Bool -> Socket XPub -> IO ()
setXPubVerbose x s = setIntOpt s B.xpubVerbose (bool2cint x)
bind :: Socket a -> String -> IO ()
bind sock str = onSocket "bind" sock $
throwIfMinus1Retry_ "bind" . withCString str . c_zmq_bind
unbind :: Socket a -> String -> IO ()
unbind sock str = onSocket "unbind" sock $
throwIfMinus1Retry_ "unbind" . withCString str . c_zmq_unbind
connect :: Socket a -> String -> IO ()
connect sock str = onSocket "connect" sock $
throwIfMinus1Retry_ "connect" . withCString str . c_zmq_connect
disconnect :: Socket a -> String -> IO ()
disconnect sock str = onSocket "disconnect" sock $
throwIfMinus1Retry_ "disconnect" . withCString str . c_zmq_disconnect
send :: Sender a => Socket a -> [Flag] -> SB.ByteString -> IO ()
send sock fls val = bracketOnError (messageOf val) messageClose $ \m -> do
onSocket "send" sock $ \s ->
retry "send" (waitWrite sock) $
#ifdef mingw32_HOST_OS
c_zmq_sendmsg s (msgPtr m) (combineFlags fls)
#else
c_zmq_sendmsg s (msgPtr m) (combineFlags (DontWait : fls))
#endif
messageFree m
send' :: Sender a => Socket a -> [Flag] -> LB.ByteString -> IO ()
send' sock fls val = bracketOnError (messageOfLazy val) messageClose $ \m -> do
onSocket "send'" sock $ \s ->
retry "send'" (waitWrite sock) $
#ifdef mingw32_HOST_OS
c_zmq_sendmsg s (msgPtr m) (combineFlags fls)
#else
c_zmq_sendmsg s (msgPtr m) (combineFlags (DontWait : fls))
#endif
messageFree m
sendMulti :: Sender a => Socket a -> NonEmpty SB.ByteString -> IO ()
sendMulti sock msgs = do
mapM_ (send sock [SendMore]) (S.init msgs)
send sock [] (S.last msgs)
receive :: Receiver a => Socket a -> IO (SB.ByteString)
receive sock = bracket messageInit messageClose $ \m ->
onSocket "receive" sock $ \s -> do
retry "receive" (waitRead sock) $
#ifdef mingw32_HOST_OS
c_zmq_recvmsg s (msgPtr m) 0
#else
c_zmq_recvmsg s (msgPtr m) (flagVal dontWait)
#endif
data_ptr <- c_zmq_msg_data (msgPtr m)
size <- c_zmq_msg_size (msgPtr m)
SB.packCStringLen (data_ptr, fromIntegral size)
receiveMulti :: Receiver a => Socket a -> IO [SB.ByteString]
receiveMulti sock = recvall []
where
recvall acc = do
msg <- receive sock
moreToReceive sock >>= next (msg:acc)
next acc True = recvall acc
next acc False = return (reverse acc)
socketMonitor :: [EventType] -> String -> Socket a -> IO ()
socketMonitor es addr soc = onSocket "socketMonitor" soc $ \s ->
withCString addr $ \a ->
throwIfMinus1_ "zmq_socket_monitor" $
c_zmq_socket_monitor s a (events2cint es)
monitor :: [EventType] -> Context -> Socket a -> IO (Bool -> IO (Maybe EventMsg))
monitor es ctx sock = do
let addr = "inproc://" ++ show (_socket . _socketRepr $ sock)
s <- socket ctx Pair
socketMonitor es addr sock
connect s addr
next s <$> messageInit
where
next soc m False = messageClose m `finally` close soc >> return Nothing
next soc m True = onSocket "recv" soc $ \s -> do
retry "recv" (waitRead soc) $
#ifdef mingw32_HOST_OS
c_zmq_recvmsg s (msgPtr m) 0
#else
c_zmq_recvmsg s (msgPtr m) (flagVal dontWait)
#endif
ptr <- c_zmq_msg_data (msgPtr m)
evt <- peek ptr
str <- receive soc
return . Just $ eventMessage str evt
poll :: (SocketLike s, MonadIO m) => Timeout -> [Poll s m] -> m [[Event]]
poll _ [] = return []
poll to desc = do
let len = length desc
let ps = map toZMQPoll desc
ps' <- liftIO $ withArray ps $ \ptr -> do
throwIfMinus1Retry_ "poll" $
c_zmq_poll ptr (fromIntegral len) (fromIntegral to)
peekArray len ptr
mapM fromZMQPoll (zip desc ps')
where
toZMQPoll :: (SocketLike s, MonadIO m) => Poll s m -> ZMQPoll
toZMQPoll (Sock s e _) =
ZMQPoll (_socket . _socketRepr . toSocket $ s) 0 (combine (map fromEvent e)) 0
toZMQPoll (File (Fd s) e _) =
ZMQPoll nullPtr (fromIntegral s) (combine (map fromEvent e)) 0
fromZMQPoll :: (SocketLike s, MonadIO m) => (Poll s m, ZMQPoll) -> m [Event]
fromZMQPoll (p, zp) = do
let e = toEvents . fromIntegral . pRevents $ zp
let (e', f) = case p of
(Sock _ x g) -> (x, g)
(File _ x g) -> (x, g)
forM f (unless (P.null (e `intersect` e')) . ($ e)) >> return e
fromEvent :: Event -> CShort
fromEvent In = fromIntegral . pollVal $ pollIn
fromEvent Out = fromIntegral . pollVal $ pollOut
fromEvent Err = fromIntegral . pollVal $ pollerr
toEvents :: Word32 -> [Event]
toEvents e = foldl' (\es f -> f e es) [] tests
where
tests =
[ \i xs -> if i .&. (fromIntegral . pollVal $ pollIn) /= 0 then In:xs else xs
, \i xs -> if i .&. (fromIntegral . pollVal $ pollOut) /= 0 then Out:xs else xs
, \i xs -> if i .&. (fromIntegral . pollVal $ pollerr) /= 0 then Err:xs else xs
]
retry :: String -> IO () -> IO CInt -> IO ()
retry msg wait act = throwIfMinus1RetryMayBlock_ msg act wait
wait' :: ZMQPollEvent -> Socket a -> IO ()
#ifdef mingw32_HOST_OS
wait' _ _ = return ()
#else
wait' p s = do
e <- getInt32Option B.events s
unless (testev e) $ do
fd <- getIntOpt s B.filedesc 0
threadWaitRead (Fd fd)
wait' p s
where
testev e = e .&. fromIntegral (pollVal p) /= 0
#endif
waitRead :: Socket a -> IO ()
waitRead = wait' pollIn
waitWrite :: Socket a -> IO ()
waitWrite = wait' pollOut
proxy :: Socket a -> Socket b -> Maybe (Socket c) -> IO ()
proxy front back capture =
onSocket "proxy-front" front $ \f ->
onSocket "proxy-back" back $ \b ->
throwIfMinus1Retry_ "c_zmq_proxy" $ c_zmq_proxy f b c
where
c = maybe nullPtr (_socket . _socketRepr) capture
curveKeyPair :: MonadIO m => m (Restricted Div5 SB.ByteString, Restricted Div5 SB.ByteString)
curveKeyPair = liftIO $
allocaBytes 41 $ \cstr1 ->
allocaBytes 41 $ \cstr2 -> do
throwIfMinus1_ "c_zmq_curve_keypair" $ c_zmq_curve_keypair cstr1 cstr2
public <- toRestricted <$> SB.packCString cstr1
private <- toRestricted <$> SB.packCString cstr2
maybe (fail errmsg) return ((,) <$> public <*> private)
where
errmsg = "curveKeyPair: invalid key-lengths produced"