Copyright | (c) Matthew Peddie 2014 |
---|---|
License | BSD3 (see the file zeromq4-pipes/LICENSE) |
Maintainer | matt.peddie@planet.com |
Stability | experimental |
Portability | GHC |
Safe Haskell | None |
Language | Haskell2010 |
- Create data sources and sinks
- One-directional sources and sinks (for use with Pipes's
Pipe
,Producer
andConsumer
) - Bidirectional sources and sinks (for use with Pipes.Core's
Proxy
,Server
andClient
). - Low-level safe setup
- One-directional sources and sinks (for use with Pipes's
- Helpers
- Re-exported modules
This module provides functions to help you attach ZMQ sockets to Pipes processing pipelines.
If you want to hook ZMQ sockets into a unidirectional pipeline
involving Producer
s, Consumer
s and Pipe
s, see
examples/proxy.hs
for a short usage example for setupProducer
and
setupConsumer
.
If you want to hook ZMQ sockets into a bidirectional or
non-Pull
-based pipeline involving Client
s, Server
s and Proxy
s,
see examples/server.hs
and examples/client.hs
for a short usage
example for setupServer
and setupClient
.
This module relies on the functions provided by Pipes.Safe to deal with exceptions and termination. If you need to avoid this layer of safety, please don't hesitate to contact the author for support.
Currently, everything runs in the same thread as the rest of the
Effect
, so
if something blocks forever on a ZMQ receive or send, you may be in trouble.
- setupProducer :: (MonadSafe m, Base m ~ IO, SocketType sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Producer [ByteString] m ()
- setupConsumer :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Consumer' (NonEmpty ByteString) m ()
- setupPipe :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Pipe (NonEmpty ByteString) [ByteString] m ()
- setupPipePair :: (MonadSafe m, Base m ~ IO, SocketType txsock, SocketType rxsock, Sender txsock, Receiver rxsock) => Context -> txsock -> rxsock -> (Socket txsock -> IO ()) -> (Socket rxsock -> IO ()) -> Pipe (NonEmpty ByteString) [ByteString] m ()
- setupPipeline :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Pipe [ByteString] (NonEmpty ByteString) m () -> Effect m ()
- setupClient :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Client [ByteString] (NonEmpty ByteString) m ()
- setupServer :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> NonEmpty ByteString -> Server (NonEmpty ByteString) [ByteString] m ()
- setup :: (MonadSafe m, Base m ~ IO, SocketType sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> (Socket sockty -> m v) -> m v
- receiveMessage :: (Receiver sck, MonadIO m) => Socket sck -> Producer' [ByteString] m ()
- sendMessage :: (Sender sck, MonadIO m) => Socket sck -> Consumer' (NonEmpty ByteString) m ()
- toNonEmpty :: Monad m => Pipe [ByteString] (NonEmpty ByteString) m ()
- module Pipes
- module Pipes.Safe
Create data sources and sinks
Each of these functions takes a setup function, which is is run
before any messaging activities happen, so anything you need to
do to configure the socket (e.g. subscribe
, set options,
connect
or bind
) should be done within it.
Note that according to the ZeroMQ manual pages, the correct
order of operations is to perform all socket configuration before
running connect
or bind
.
One-directional sources and sinks (for use with Pipes's Pipe
, Producer
and Consumer
)
:: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty) | |
=> Context | ZMQ context |
-> sockty | ZMQ socket type |
-> (Socket sockty -> IO ()) | Setup function |
-> Consumer' (NonEmpty ByteString) m () | Message sink |
Create a Consumer
of message data from the given ZeroMQ
parameters. All data successfully await
ed from upstream will be
sent out the socket.
The resulting Consumer
will only accept NonEmpty
lists of
ByteString
message parts. See toNonEmpty
if this is a
sticking point for you.
:: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) | |
=> Context | ZMQ context |
-> sockty | ZMQ socket type |
-> (Socket sockty -> IO ()) | Setup function for socket |
-> Pipe (NonEmpty ByteString) [ByteString] m () | Pipeline stage via ZMQ |
Create a Pipe
out of a socket from the given ZeroMQ parameters.
All data successfully await
ed from upstream will be sent out the
socket, and all message data received on the socket will be
yield
ed downstream.
You can use this Pipe
to replace a processing stage in your
pipeline with a ZMQ transaction. Note that the ZMQ actions here
are blocking; for example, if you make a Req
socket, the
pipeline will block when the message from upstream is transmitted
until the response from the remote ZMQ socket is received.
Consider using the pipes-concurrency
package if you are sure
you're using the right socket type but you don't want everything to
block here.
:: (MonadSafe m, Base m ~ IO, SocketType txsock, SocketType rxsock, Sender txsock, Receiver rxsock) | |
=> Context | ZMQ context |
-> txsock | ZMQ transmit socket type |
-> rxsock | ZMQ receive socket type |
-> (Socket txsock -> IO ()) | Setup function for transmit socket |
-> (Socket rxsock -> IO ()) | Setup function for receive socket |
-> Pipe (NonEmpty ByteString) [ByteString] m () | Pipeline stage via ZMQ |
setupPipePair
is the same as setupPipe
except that it allows
you to use two different sockets, one for sending and the other for
receiving.
:: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) | |
=> Context | ZMQ context |
-> sockty | ZMQ socket type |
-> (Socket sockty -> IO ()) | Setup function for socket |
-> Pipe [ByteString] (NonEmpty ByteString) m () | Pipeline to run in the middle |
-> Effect m () |
Create an Effect
out of a socket from the given ZeroMQ
parameters and a pipe to use to connect the resulting Producer
and Consumer
.
There are two ways to use a single socket to do both ZMQ sends and
receives in the same pipeline. setupPipe
places the ZMQ socket
in the middle of the pipeline. setupPipeline
places the same ZMQ
socket at both ends: the provided Pipe
is used to connect the
receiving Producer and sending Consumer generated from the ZMQ
socket. In other words, for setupPipeline ctx socktype opts
myPipe
,
socket RX >-> myPipe >-> socket TX
See setupPipe
for some caveats. See examples/serverPipeline.hs
for a usage example.
Bidirectional sources and sinks (for use with Pipes.Core's Proxy
, Server
and Client
).
Low-level safe setup
It's recommended to use setupProducer
, setupConsumer
and
friends instead of these functions unless you know what you're
doing and need something else.
setup :: (MonadSafe m, Base m ~ IO, SocketType sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> (Socket sockty -> m v) -> m v Source #
This is the low-level function for safely bracket
ing ZMQ socket
creation, so that any exceptions or Pipe
termination will not
result in abandoned sockets. For example, setupProducer
is
defined as
setupProducer ctx ty opts = setup ctx ty opts $ forever . receiveMessage
receiveMessage :: (Receiver sck, MonadIO m) => Socket sck -> Producer' [ByteString] m () Source #
This is a low-level function for simply passing a message received on a socket downstream.
receiveMessage sock = liftIO (Z.receiveMulti sock) >>= yield
sendMessage :: (Sender sck, MonadIO m) => Socket sck -> Consumer' (NonEmpty ByteString) m () Source #
This is a low-level function for simply sending a message available from upstream out on the socket.
sendMessage sock = await >>= liftIO . Z.sendMulti sock
Helpers
toNonEmpty :: Monad m => Pipe [ByteString] (NonEmpty ByteString) m () Source #
This is simply an adapter between lists of message parts and the
NonEmpty
lists demanded by ZMQ Consumer
s. If an empty list
arrives in the input, it will be ignored.
Re-exported modules
module Pipes
module Pipes.Safe