Stability | experimental |
---|---|
Safe Haskell | None |
This is a Haskell binding for the nanomsg library: http://nanomsg.org/.
There's support for blocking send and recv, a non-blocking receive, and for all the socket types and the functions you need to wire them up and tear them down again.
Most socket options are available through accessor and mutator functions. Sockets are typed, transports are not.
Socket type documentation is adapted or quoted verbatim from the nanomsg manual. Please refer to nanomsg.org for information on how to use the library.
- data Pair = Pair
- data Req = Req
- data Rep = Rep
- data Pub = Pub
- data Sub = Sub
- data Surveyor = Surveyor
- data Respondent = Respondent
- data Push = Push
- data Pull = Pull
- data Bus = Bus
- data Socket a
- data Endpoint
- data NNException
- socket :: Protocol a => a -> IO (Socket a)
- withSocket :: Protocol a => a -> (Socket a -> IO b) -> IO b
- bind :: Socket a -> String -> IO Endpoint
- connect :: Socket a -> String -> IO Endpoint
- send :: (SendType a, Protocol a) => Socket a -> ByteString -> IO ()
- recv :: (RecvType a, Protocol a) => Socket a -> IO ByteString
- recv' :: (RecvType a, Protocol a) => Socket a -> IO (Maybe ByteString)
- subscribe :: (SubscriberType a, Protocol a) => Socket a -> ByteString -> IO ()
- unsubscribe :: (SubscriberType a, Protocol a) => Socket a -> ByteString -> IO ()
- shutdown :: Socket a -> Endpoint -> IO ()
- close :: Socket a -> IO ()
- term :: IO ()
- linger :: Socket a -> IO Int
- setLinger :: Socket a -> Int -> IO ()
- sndBuf :: Socket a -> IO Int
- setSndBuf :: Socket a -> Int -> IO ()
- rcvBuf :: Socket a -> IO Int
- setRcvBuf :: Socket a -> Int -> IO ()
- reconnectInterval :: Socket a -> IO Int
- setReconnectInterval :: Socket a -> Int -> IO ()
- reconnectIntervalMax :: Socket a -> IO Int
- setReconnectIntervalMax :: Socket a -> Int -> IO ()
- sndPrio :: Socket a -> IO Int
- setSndPrio :: Socket a -> Int -> IO ()
- ipv4Only :: Socket a -> IO Int
- setIpv4Only :: Socket a -> Int -> IO ()
- requestResendInterval :: ReqType a => Socket a -> IO Int
- setRequestResendInterval :: ReqType a => Socket a -> Int -> IO ()
- tcpNoDelay :: Socket a -> IO Int
- setTcpNoDelay :: Socket a -> Int -> IO ()
Socket types
Socket for communication with exactly one peer. Each
party can send messages at any time. If the peer is not
available or send buffer is full, subsequent calls to
send
will block until it’s possible to send the message.
Used to implement a client application that sends requests and receives replies. The socket will resend requests automatically if there's no reply within a given time. The default timeout is 1 minute.
See also setRequestResendInterval
.
This socket is used to distribute messages to multiple destinations. Can not receive.
Receives messages from the publisher. Only messages that the socket is subscribed to are received. When the socket is created there are no subscriptions and thus no messages will be received.
Send is not defined on this socket. The socket can be connected to at most one peer.
See also subscribe
and unsubscribe
.
Surveyor and respondent are used to broadcast a survey to multiple locations and gather the responses.
This socket is used to send the survey. The survey is delivered to all the connected respondents. Once the query is sent, the socket can be used to receive the responses.
When the survey deadline expires, receive will return ETIMEDOUT error.
See also setSurveyorDeadline
data Respondent Source
Used to respond to a survey. Survey is received using receive function, response is sent using send function. This socket can be connected to at most one peer.
Push and Pull sockets fair queue messages from one processing step, load balancing them among instances of the next processing step.
This socket is used to send messages to a cluster of load-balanced nodes.
Receive operation is not implemented on this socket type.
This socket is used to receive a message from a cluster of nodes.
Send operation is not implemented on this socket type.
Broadcasts messages from any node to all other nodes in the topology. The socket should never receives messages that it sent itself.
This pattern scales only to local level (within a single machine or within a single LAN). Trying to scale it further can result in overloading individual nodes with messages.
Other types
data NNException Source
Pretty much any error condition throws this exception.
Functions
withSocket :: Protocol a => a -> (Socket a -> IO b) -> IO bSource
Creates a socket and runs your action with it.
E.g. collecting 10 messages:
withSocket Sub $ \sub -> do _ <- connect sub "tcp://localhost:5560" subscribe sub (C.pack "") replicateM 10 (recv sub)
Ensures the socket is closed when your action is done.
bind :: Socket a -> String -> IO EndpointSource
Binds the socket to a local interface.
See the nanomsg documentation for specifics on transports. Note that host names do not work for tcp. Some examples are:
bind sock "tcp://*:5560" bind sock "tcp://eth0:5560" bind sock "tcp://127.0.0.1:5560" bind sock "inproc://test" bind sock "ipc:///tmp/test.ipc"
This function returns an Endpoint
, which can be supplied
to shutdown
to remove a connection.
send :: (SendType a, Protocol a) => Socket a -> ByteString -> IO ()Source
recv :: (RecvType a, Protocol a) => Socket a -> IO ByteStringSource
Blocking receive.
recv' :: (RecvType a, Protocol a) => Socket a -> IO (Maybe ByteString)Source
Nonblocking receive function.
subscribe :: (SubscriberType a, Protocol a) => Socket a -> ByteString -> IO ()Source
Subscribe to a given subject string.
unsubscribe :: (SubscriberType a, Protocol a) => Socket a -> ByteString -> IO ()Source
Unsubscribes from a subject.
close :: Socket a -> IO ()Source
Closes the socket. Any buffered inbound messages that were not yet received by the application will be discarded. The library will try to deliver any outstanding outbound messages for the time specified by NN_LINGER socket option. The call will block in the meantime.
Socket option accessors and mutators
linger :: Socket a -> IO IntSource
Specifies how long should the socket try to send pending outbound messages after close has been called, in milliseconds.
Negative value means infinite linger. Default value is 1000 (1 second).
setLinger :: Socket a -> Int -> IO ()Source
Specifies how long should the socket try to send pending outbound messages after close has been called, in milliseconds.
Negative value means infinite linger. Default value is 1000 (1 second).
sndBuf :: Socket a -> IO IntSource
Size of the send buffer, in bytes. To prevent blocking for messages larger than the buffer, exactly one message may be buffered in addition to the data in the send buffer.
Default value is 128kB.
setSndBuf :: Socket a -> Int -> IO ()Source
Size of the send buffer, in bytes. To prevent blocking for messages larger than the buffer, exactly one message may be buffered in addition to the data in the send buffer.
Default value is 128kB.
rcvBuf :: Socket a -> IO IntSource
Size of the receive buffer, in bytes. To prevent blocking for messages larger than the buffer, exactly one message may be buffered in addition to the data in the receive buffer.
Default value is 128kB.
setRcvBuf :: Socket a -> Int -> IO ()Source
Size of the receive buffer, in bytes. To prevent blocking for messages larger than the buffer, exactly one message may be buffered in addition to the data in the receive buffer.
Default value is 128kB.
reconnectInterval :: Socket a -> IO IntSource
For connection-based transports such as TCP, this option specifies how long to wait, in milliseconds, when connection is broken before trying to re-establish it.
Note that actual reconnect interval may be randomised to some extent to prevent severe reconnection storms.
Default value is 100 (0.1 second).
setReconnectInterval :: Socket a -> Int -> IO ()Source
For connection-based transports such as TCP, this option specifies how long to wait, in milliseconds, when connection is broken before trying to re-establish it.
Note that actual reconnect interval may be randomised to some extent to prevent severe reconnection storms.
Default value is 100 (0.1 second).
reconnectIntervalMax :: Socket a -> IO IntSource
This option is to be used only in addition to NN_RECONNECT_IVL option. It specifies maximum reconnection interval. On each reconnect attempt, the previous interval is doubled until NN_RECONNECT_IVL_MAX is reached.
Value of zero means that no exponential backoff is performed and reconnect interval is based only on NN_RECONNECT_IVL. If NN_RECONNECT_IVL_MAX is less than NN_RECONNECT_IVL, it is ignored.
Default value is 0.
setReconnectIntervalMax :: Socket a -> Int -> IO ()Source
This option is to be used only in addition to NN_RECONNECT_IVL option. It specifies maximum reconnection interval. On each reconnect attempt, the previous interval is doubled until NN_RECONNECT_IVL_MAX is reached.
Value of zero means that no exponential backoff is performed and reconnect interval is based only on NN_RECONNECT_IVL. If NN_RECONNECT_IVL_MAX is less than NN_RECONNECT_IVL, it is ignored.
Default value is 0.
sndPrio :: Socket a -> IO IntSource
Sets outbound priority for endpoints subsequently added to the socket. This option has no effect on socket types that send messages to all the peers. However, if the socket type sends each message to a single peer (or a limited set of peers), peers with high priority take precedence over peers with low priority.
Highest priority is 1, lowest priority is 16. Default value is 8.
setSndPrio :: Socket a -> Int -> IO ()Source
Sets outbound priority for endpoints subsequently added to the socket. This option has no effect on socket types that send messages to all the peers. However, if the socket type sends each message to a single peer (or a limited set of peers), peers with high priority take precedence over peers with low priority.
Highest priority is 1, lowest priority is 16. Default value is 8.
ipv4Only :: Socket a -> IO IntSource
If set to 1, only IPv4 addresses are used. If set to 0, both IPv4 and IPv6 addresses are used.
Default value is 1.
setIpv4Only :: Socket a -> Int -> IO ()Source
If set to 1, only IPv4 addresses are used. If set to 0, both IPv4 and IPv6 addresses are used.
Default value is 1.
requestResendInterval :: ReqType a => Socket a -> IO IntSource
This option is defined on the full REQ socket. If reply is not received in specified amount of milliseconds, the request will be automatically resent.
Default value is 60000 (1 minute).
setRequestResendInterval :: ReqType a => Socket a -> Int -> IO ()Source
This option is defined on the full REQ socket. If reply is not received in specified amount of milliseconds, the request will be automatically resent.
Default value is 60000 (1 minute).
tcpNoDelay :: Socket a -> IO IntSource
This option, when set to 1, disables Nagle's algorithm.
Default value is 0.
setTcpNoDelay :: Socket a -> Int -> IO ()Source
This option, when set to 1, disables Nagle's algorithm.
Default value is 0.