Copyright | (c) 2010-2013 Toralf Wittner |
---|---|
License | MIT |
Maintainer | Toralf Wittner <tw@dtex.org> |
Stability | experimental |
Portability | non-portable |
Safe Haskell | None |
Language | Haskell98 |
0MQ haskell binding. The API closely follows the C-API of 0MQ with the main difference being that sockets are typed.
Notes
Many option settings use a Restriction
to further constrain the
range of possible values of their integral types. For example
the maximum message size can be given as -1, which means no limit
or by greater values, which denote the message size in bytes. The
type of setMaxMessageSize
is therefore:
setMaxMessageSize :: Integral i => Restricted (Nneg1, Int64) i -> Socket a -> IO ()
which means any integral value in the range of -1
to
(maxBound :: Int64
) can be given. To create a restricted
value from plain value, use toRestricted
or restrict
.
- data Pair = Pair
- data Pub = Pub
- data Sub = Sub
- data XPub = XPub
- data XSub = XSub
- data Req = Req
- data Rep = Rep
- data Dealer = Dealer
- data Router = Router
- type XReq = Dealer
- type XRep = Router
- data Pull = Pull
- data Push = Push
- data Stream = Stream
- class SocketType a
- class Sender a
- class Receiver a
- class Subscriber a
- class SocketLike s
- class Conflatable a
- class SendProbe a
- type Size = Word
- data Context
- data Socket a
- data Flag
- data Switch
- type Timeout = Int64
- data Event
- data EventType
- data EventMsg
- = Connected !ByteString !Fd
- | ConnectDelayed !ByteString
- | ConnectRetried !ByteString !Int
- | Listening !ByteString !Fd
- | BindFailed !ByteString !Int
- | Accepted !ByteString !Fd
- | AcceptFailed !ByteString !Int
- | Closed !ByteString !Fd
- | CloseFailed !ByteString !Int
- | Disconnected !ByteString !Fd
- | MonitorStopped !ByteString !Int
- data Poll s m where
- data KeyFormat a where
- data SecurityMechanism
- withContext :: (Context -> IO a) -> IO a
- withSocket :: SocketType a => Context -> a -> (Socket a -> IO b) -> IO b
- bind :: Socket a -> String -> IO ()
- unbind :: Socket a -> String -> IO ()
- connect :: Socket a -> String -> IO ()
- disconnect :: Socket a -> String -> IO ()
- send :: Sender a => Socket a -> [Flag] -> ByteString -> IO ()
- send' :: Sender a => Socket a -> [Flag] -> ByteString -> IO ()
- sendMulti :: Sender a => Socket a -> NonEmpty ByteString -> IO ()
- receive :: Receiver a => Socket a -> IO ByteString
- receiveMulti :: Receiver a => Socket a -> IO [ByteString]
- version :: IO (Int, Int, Int)
- monitor :: [EventType] -> Context -> Socket a -> IO (Bool -> IO (Maybe EventMsg))
- socketMonitor :: [EventType] -> String -> Socket a -> IO ()
- poll :: (SocketLike s, MonadIO m) => Timeout -> [Poll s m] -> m [[Event]]
- subscribe :: Subscriber a => Socket a -> ByteString -> IO ()
- unsubscribe :: Subscriber a => Socket a -> ByteString -> IO ()
- ioThreads :: Context -> IO Word
- maxSockets :: Context -> IO Word
- setIoThreads :: Word -> Context -> IO ()
- setMaxSockets :: Word -> Context -> IO ()
- affinity :: Socket a -> IO Word64
- backlog :: Socket a -> IO Int
- conflate :: Conflatable a => Socket a -> IO Bool
- curvePublicKey :: KeyFormat f -> Socket a -> IO ByteString
- curveSecretKey :: KeyFormat f -> Socket a -> IO ByteString
- curveServerKey :: KeyFormat f -> Socket a -> IO ByteString
- delayAttachOnConnect :: Socket a -> IO Bool
- events :: Socket a -> IO [Event]
- fileDescriptor :: Socket a -> IO Fd
- identity :: Socket a -> IO ByteString
- immediate :: Socket a -> IO Bool
- ipv4Only :: Socket a -> IO Bool
- ipv6 :: Socket a -> IO Bool
- lastEndpoint :: Socket a -> IO String
- linger :: Socket a -> IO Int
- maxMessageSize :: Socket a -> IO Int64
- mcastHops :: Socket a -> IO Int
- mechanism :: Socket a -> IO SecurityMechanism
- moreToReceive :: Socket a -> IO Bool
- plainServer :: Socket a -> IO Bool
- plainPassword :: Socket a -> IO ByteString
- plainUserName :: Socket a -> IO ByteString
- rate :: Socket a -> IO Int
- receiveBuffer :: Socket a -> IO Int
- receiveHighWM :: Socket a -> IO Int
- receiveTimeout :: Socket a -> IO Int
- reconnectInterval :: Socket a -> IO Int
- reconnectIntervalMax :: Socket a -> IO Int
- recoveryInterval :: Socket a -> IO Int
- sendBuffer :: Socket a -> IO Int
- sendHighWM :: Socket a -> IO Int
- sendTimeout :: Socket a -> IO Int
- tcpKeepAlive :: Socket a -> IO Switch
- tcpKeepAliveCount :: Socket a -> IO Int
- tcpKeepAliveIdle :: Socket a -> IO Int
- tcpKeepAliveInterval :: Socket a -> IO Int
- zapDomain :: Socket a -> IO ByteString
- setAffinity :: Word64 -> Socket a -> IO ()
- setBacklog :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setConflate :: Conflatable a => Bool -> Socket a -> IO ()
- setCurveServer :: Bool -> Socket a -> IO ()
- setCurvePublicKey :: KeyFormat f -> Restricted f ByteString -> Socket a -> IO ()
- setCurveSecretKey :: KeyFormat f -> Restricted f ByteString -> Socket a -> IO ()
- setCurveServerKey :: KeyFormat f -> Restricted f ByteString -> Socket a -> IO ()
- setDelayAttachOnConnect :: Bool -> Socket a -> IO ()
- setIdentity :: Restricted (N1, N254) ByteString -> Socket a -> IO ()
- setImmediate :: Bool -> Socket a -> IO ()
- setIpv4Only :: Bool -> Socket a -> IO ()
- setIpv6 :: Bool -> Socket a -> IO ()
- setLinger :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
- setMaxMessageSize :: Integral i => Restricted (Nneg1, Int64) i -> Socket a -> IO ()
- setMcastHops :: Integral i => Restricted (N1, Int32) i -> Socket a -> IO ()
- setPlainServer :: Bool -> Socket a -> IO ()
- setPlainPassword :: Restricted (N1, N254) ByteString -> Socket a -> IO ()
- setPlainUserName :: Restricted (N1, N254) ByteString -> Socket a -> IO ()
- setProbeRouter :: SendProbe a => Bool -> Socket a -> IO ()
- setRate :: Integral i => Restricted (N1, Int32) i -> Socket a -> IO ()
- setReceiveBuffer :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setReceiveHighWM :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setReceiveTimeout :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
- setReconnectInterval :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setReconnectIntervalMax :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setRecoveryInterval :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setReqCorrelate :: Bool -> Socket Req -> IO ()
- setReqRelaxed :: Bool -> Socket Req -> IO ()
- setRouterMandatory :: Bool -> Socket Router -> IO ()
- setSendBuffer :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setSendHighWM :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO ()
- setSendTimeout :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
- setTcpAcceptFilter :: Maybe ByteString -> Socket a -> IO ()
- setTcpKeepAlive :: Switch -> Socket a -> IO ()
- setTcpKeepAliveCount :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
- setTcpKeepAliveIdle :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
- setTcpKeepAliveInterval :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO ()
- setXPubVerbose :: Bool -> Socket XPub -> IO ()
- restrict :: Restriction r v => v -> Restricted r v
- toRestricted :: Restriction r v => v -> Maybe (Restricted r v)
- data ZMQError
- errno :: ZMQError -> Int
- source :: ZMQError -> String
- message :: ZMQError -> String
- init :: Size -> IO Context
- term :: Context -> IO ()
- shutdown :: Context -> IO ()
- context :: IO Context
- socket :: SocketType a => Context -> a -> IO (Socket a)
- close :: Socket a -> IO ()
- waitRead :: Socket a -> IO ()
- waitWrite :: Socket a -> IO ()
- z85Encode :: MonadIO m => Restricted Div4 ByteString -> m ByteString
- z85Decode :: MonadIO m => Restricted Div5 ByteString -> m ByteString
- proxy :: Socket a -> Socket b -> Maybe (Socket c) -> IO ()
- curveKeyPair :: MonadIO m => m (Restricted Div5 ByteString, Restricted Div5 ByteString)
Type Definitions
Socket Types
Socket type-classes
Sockets which can send
.
Sockets which can receive
.
class Conflatable a Source
Sockets which can be conflate
d.
Sockets which can send probes (cf. setProbeRouter
).
Various type definitions
Flags to apply on send operations (cf. man zmq_send)
Configuration switch
Socket events.
Event types to monitor.
Event Message to receive when monitoring socket events.
General Operations
withContext :: (Context -> IO a) -> IO a Source
Run an action with a 0MQ context. The Context
supplied to your
action will not be valid after the action either returns or
throws an exception.
withSocket :: SocketType a => Context -> a -> (Socket a -> IO b) -> IO b Source
Run an action with a 0MQ socket. The socket will be closed after running the supplied action even if an error occurs. The socket supplied to your action will not be valid after the action terminates.
unbind :: Socket a -> String -> IO () Source
Unbind the socket from the given address (cf. zmq_unbind).
connect :: Socket a -> String -> IO () Source
Connect the socket to the given address (cf. zmq_connect).
disconnect :: Socket a -> String -> IO () Source
Disconnect the socket from the given endpoint (cf. zmq_disconnect).
send :: Sender a => Socket a -> [Flag] -> ByteString -> IO () Source
Send the given ByteString
over the socket
(cf. zmq_sendmsg).
Note: This function always calls zmq_sendmsg
in a non-blocking way,
i.e. there is no need to provide the ZMQ_DONTWAIT
flag as this is used
by default. Still send
is blocking the thread as long as the message
can not be queued on the socket using GHC's threadWaitWrite
.
send' :: Sender a => Socket a -> [Flag] -> ByteString -> IO () Source
Send the given ByteString
over the socket
(cf. zmq_sendmsg).
This is operationally identical to send socket (Strict.concat
(Lazy.toChunks lbs)) flags
but may be more efficient.
Note: This function always calls zmq_sendmsg
in a non-blocking way,
i.e. there is no need to provide the ZMQ_DONTWAIT
flag as this is used
by default. Still send'
is blocking the thread as long as the message
can not be queued on the socket using GHC's threadWaitWrite
.
sendMulti :: Sender a => Socket a -> NonEmpty ByteString -> IO () Source
Send a multi-part message.
This function applies the SendMore
Flag
between all message parts.
0MQ guarantees atomic delivery of a multi-part message
(cf. zmq_sendmsg).
receive :: Receiver a => Socket a -> IO ByteString Source
Receive a ByteString
from socket
(cf. zmq_recvmsg).
Note: This function always calls zmq_recvmsg
in a non-blocking way,
i.e. there is no need to provide the ZMQ_DONTWAIT
flag as this is used
by default. Still receive
is blocking the thread as long as no data
is available using GHC's threadWaitRead
.
receiveMulti :: Receiver a => Socket a -> IO [ByteString] Source
Receive a multi-part message.
This function collects all message parts send via sendMulti
.
version :: IO (Int, Int, Int) Source
Return the runtime version of the underlying 0MQ library as a (major, minor, patch) triple.
monitor :: [EventType] -> Context -> Socket a -> IO (Bool -> IO (Maybe EventMsg)) Source
Monitor socket events (cf. zmq_socket_monitor).
This function returns a function which can be invoked to retrieve
the next socket event, potentially blocking until the next one becomes
available. When applied to False
, monitoring will terminate, i.e.
internal monitoring resources will be disposed. Consequently after
monitor
has been invoked, the returned function must be applied
once to False
.
subscribe :: Subscriber a => Socket a -> ByteString -> IO () Source
Subscribe Socket to given subscription.
unsubscribe :: Subscriber a => Socket a -> ByteString -> IO () Source
Unsubscribe Socket from given subscription.
Context Options (Read)
Context Options (Write)
setIoThreads :: Word -> Context -> IO () Source
setMaxSockets :: Word -> Context -> IO () Source
Socket Options (Read)
conflate :: Conflatable a => Socket a -> IO Bool Source
Restricts the outgoing and incoming socket buffers to a single message.
curvePublicKey :: KeyFormat f -> Socket a -> IO ByteString Source
curveSecretKey :: KeyFormat f -> Socket a -> IO ByteString Source
curveServerKey :: KeyFormat f -> Socket a -> IO ByteString Source
delayAttachOnConnect :: Socket a -> IO Bool Source
Deprecated: Use immediate
fileDescriptor :: Socket a -> IO Fd Source
sendBuffer :: Socket a -> IO Int Source
sendHighWM :: Socket a -> IO Int Source
Socket Options (Write)
setAffinity :: Word64 -> Socket a -> IO () Source
setBacklog :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source
setConflate :: Conflatable a => Bool -> Socket a -> IO () Source
Restrict the outgoing and incoming socket buffers to a single message.
setCurveServer :: Bool -> Socket a -> IO () Source
setCurvePublicKey :: KeyFormat f -> Restricted f ByteString -> Socket a -> IO () Source
setCurveSecretKey :: KeyFormat f -> Restricted f ByteString -> Socket a -> IO () Source
setCurveServerKey :: KeyFormat f -> Restricted f ByteString -> Socket a -> IO () Source
setDelayAttachOnConnect :: Bool -> Socket a -> IO () Source
Deprecated: Use setImmediate
setIdentity :: Restricted (N1, N254) ByteString -> Socket a -> IO () Source
setImmediate :: Bool -> Socket a -> IO () Source
setIpv4Only :: Bool -> Socket a -> IO () Source
Deprecated: Use setIpv6
setMaxMessageSize :: Integral i => Restricted (Nneg1, Int64) i -> Socket a -> IO () Source
setMcastHops :: Integral i => Restricted (N1, Int32) i -> Socket a -> IO () Source
setPlainServer :: Bool -> Socket a -> IO () Source
setPlainPassword :: Restricted (N1, N254) ByteString -> Socket a -> IO () Source
setPlainUserName :: Restricted (N1, N254) ByteString -> Socket a -> IO () Source
setReceiveBuffer :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source
setReceiveHighWM :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source
setReceiveTimeout :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO () Source
setReconnectInterval :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source
setReconnectIntervalMax :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source
setRecoveryInterval :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source
setSendBuffer :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source
setSendHighWM :: Integral i => Restricted (N0, Int32) i -> Socket a -> IO () Source
setSendTimeout :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO () Source
setTcpAcceptFilter :: Maybe ByteString -> Socket a -> IO () Source
setTcpKeepAlive :: Switch -> Socket a -> IO () Source
setTcpKeepAliveCount :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO () Source
setTcpKeepAliveIdle :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO () Source
setTcpKeepAliveInterval :: Integral i => Restricted (Nneg1, Int32) i -> Socket a -> IO () Source
Restrictions
restrict :: Restriction r v => v -> Restricted r v Source
Create a restricted value. If the given value does not satisfy the restrictions, a modified variant is used instead, e.g. if an integer is larger than the upper bound, the upper bound value is used.
toRestricted :: Restriction r v => v -> Maybe (Restricted r v) Source
Create a restricted value. Returns Nothing
if
the given value does not satisfy all restrictions.
Error Handling
ZMQError encapsulates information about errors, which occur when using the native 0MQ API, such as error number and message.
Low-level Functions
term :: Context -> IO () Source
Terminate a 0MQ context. Equivalent to zmq_ctx_term.
shutdown :: Context -> IO () Source
Shutdown a 0MQ context. Equivalent to zmq_ctx_shutdown.
Initialize a 0MQ context. Equivalent to zmq_ctx_new.
socket :: SocketType a => Context -> a -> IO (Socket a) Source
Create a new 0MQ socket within the given context. withSocket
provides
automatic socket closing and may be safer to use.
close :: Socket a -> IO () Source
Close a 0MQ socket. withSocket
provides automatic socket closing and may
be safer to use.
waitRead :: Socket a -> IO () Source
Wait until data is available for reading from the given Socket.
After this function returns, a call to receive
will essentially be
non-blocking.
waitWrite :: Socket a -> IO () Source
Wait until data can be written to the given Socket.
After this function returns, a call to send
will essentially be
non-blocking.
z85Encode :: MonadIO m => Restricted Div4 ByteString -> m ByteString Source
z85Decode :: MonadIO m => Restricted Div5 ByteString -> m ByteString Source
Utils
proxy :: Socket a -> Socket b -> Maybe (Socket c) -> IO () Source
Starts built-in 0MQ proxy (cf. zmq_proxy)
Proxy connects front to back socket
Before calling proxy all sockets should be binded
If the capture socket is not Nothing, the proxy shall send all messages, received on both frontend and backend, to the capture socket.
curveKeyPair :: MonadIO m => m (Restricted Div5 ByteString, Restricted Div5 ByteString) Source
Generate a new curve key pair. (cf. zmq_curve_keypair)