| Copyright | (c) Matthew Peddie 2014 |
|---|---|
| License | BSD3 (see the file zeromq4-pipes/LICENSE) |
| Maintainer | matt.peddie@planet.com |
| Stability | experimental |
| Portability | GHC |
| Safe Haskell | None |
| Language | Haskell2010 |
Pipes.ZMQ4
Contents
- Create data sources and sinks
- One-directional sources and sinks (for use with Pipes's
Pipe,ProducerandConsumer) - Bidirectional sources and sinks (for use with Pipes.Core's
Proxy,ServerandClient). - Low-level safe setup
- One-directional sources and sinks (for use with Pipes's
- Helpers
- Re-exported modules
Description
This module provides functions to help you attach ZMQ sockets to Pipes processing pipelines.
If you want to hook ZMQ sockets into a unidirectional pipeline
involving Producers, Consumers and Pipes, see
examples/proxy.hs for a short usage example for setupProducer and
setupConsumer.
If you want to hook ZMQ sockets into a bidirectional or
non-Pull-based pipeline involving Clients, Servers and Proxys,
see examples/server.hs and examples/client.hs for a short usage
example for setupServer and setupClient.
This module relies on the functions provided by Pipes.Safe to deal with exceptions and termination. If you need to avoid this layer of safety, please don't hesitate to contact the author for support.
Currently, everything runs in the same thread as the rest of the
Effect, so
if something blocks forever on a ZMQ receive or send, you may be in trouble.
- setupProducer :: (MonadSafe m, Base m ~ IO, SocketType sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Producer [ByteString] m ()
- setupConsumer :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Consumer' (NonEmpty ByteString) m ()
- setupPipe :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Pipe (NonEmpty ByteString) [ByteString] m ()
- setupPipePair :: (MonadSafe m, Base m ~ IO, SocketType txsock, SocketType rxsock, Sender txsock, Receiver rxsock) => Context -> txsock -> rxsock -> (Socket txsock -> IO ()) -> (Socket rxsock -> IO ()) -> Pipe (NonEmpty ByteString) [ByteString] m ()
- setupPipeline :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Pipe [ByteString] (NonEmpty ByteString) m () -> Effect m ()
- setupClient :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Client [ByteString] (NonEmpty ByteString) m ()
- setupServer :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> NonEmpty ByteString -> Server (NonEmpty ByteString) [ByteString] m ()
- setup :: (MonadSafe m, Base m ~ IO, SocketType sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> (Socket sockty -> m v) -> m v
- receiveMessage :: (Receiver sck, MonadIO m) => Socket sck -> Producer' [ByteString] m ()
- sendMessage :: (Sender sck, MonadIO m) => Socket sck -> Consumer' (NonEmpty ByteString) m ()
- toNonEmpty :: Monad m => Pipe [ByteString] (NonEmpty ByteString) m ()
- module Pipes
- module Pipes.Safe
Create data sources and sinks
Each of these functions takes a setup function, which is is run
before any messaging activities happen, so anything you need to
do to configure the socket (e.g. subscribe, set options,
connect or bind) should be done within it.
Note that according to the ZeroMQ manual pages, the correct
order of operations is to perform all socket configuration before
running connect or bind.
One-directional sources and sinks (for use with Pipes's Pipe, Producer and Consumer)
Arguments
| :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty) | |
| => Context | ZMQ context |
| -> sockty | ZMQ socket type |
| -> (Socket sockty -> IO ()) | Setup function |
| -> Consumer' (NonEmpty ByteString) m () | Message sink |
Create a Consumer of message data from the given ZeroMQ
parameters. All data successfully awaited from upstream will be
sent out the socket.
The resulting Consumer will only accept NonEmpty lists of
ByteString message parts. See toNonEmpty if this is a
sticking point for you.
Arguments
| :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) | |
| => Context | ZMQ context |
| -> sockty | ZMQ socket type |
| -> (Socket sockty -> IO ()) | Setup function for socket |
| -> Pipe (NonEmpty ByteString) [ByteString] m () | Pipeline stage via ZMQ |
Create a Pipe out of a socket from the given ZeroMQ parameters.
All data successfully awaited from upstream will be sent out the
socket, and all message data received on the socket will be
yielded downstream.
You can use this Pipe to replace a processing stage in your
pipeline with a ZMQ transaction. Note that the ZMQ actions here
are blocking; for example, if you make a Req socket, the
pipeline will block when the message from upstream is transmitted
until the response from the remote ZMQ socket is received.
Consider using the pipes-concurrency package if you are sure
you're using the right socket type but you don't want everything to
block here.
Arguments
| :: (MonadSafe m, Base m ~ IO, SocketType txsock, SocketType rxsock, Sender txsock, Receiver rxsock) | |
| => Context | ZMQ context |
| -> txsock | ZMQ transmit socket type |
| -> rxsock | ZMQ receive socket type |
| -> (Socket txsock -> IO ()) | Setup function for transmit socket |
| -> (Socket rxsock -> IO ()) | Setup function for receive socket |
| -> Pipe (NonEmpty ByteString) [ByteString] m () | Pipeline stage via ZMQ |
setupPipePair is the same as setupPipe except that it allows
you to use two different sockets, one for sending and the other for
receiving.
Arguments
| :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) | |
| => Context | ZMQ context |
| -> sockty | ZMQ socket type |
| -> (Socket sockty -> IO ()) | Setup function for socket |
| -> Pipe [ByteString] (NonEmpty ByteString) m () | Pipeline to run in the middle |
| -> Effect m () |
Create an Effect out of a socket from the given ZeroMQ
parameters and a pipe to use to connect the resulting Producer
and Consumer.
There are two ways to use a single socket to do both ZMQ sends and
receives in the same pipeline. setupPipe places the ZMQ socket
in the middle of the pipeline. setupPipeline places the same ZMQ
socket at both ends: the provided Pipe is used to connect the
receiving Producer and sending Consumer generated from the ZMQ
socket. In other words, for setupPipeline ctx socktype opts
myPipe,
socket RX >-> myPipe >-> socket TX
See setupPipe for some caveats. See examples/serverPipeline.hs
for a usage example.
Bidirectional sources and sinks (for use with Pipes.Core's Proxy, Server and Client).
Low-level safe setup
It's recommended to use setupProducer, setupConsumer and
friends instead of these functions unless you know what you're
doing and need something else.
setup :: (MonadSafe m, Base m ~ IO, SocketType sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> (Socket sockty -> m v) -> m v Source #
This is the low-level function for safely bracketing ZMQ socket
creation, so that any exceptions or Pipe termination will not
result in abandoned sockets. For example, setupProducer is
defined as
setupProducer ctx ty opts = setup ctx ty opts $ forever . receiveMessage
receiveMessage :: (Receiver sck, MonadIO m) => Socket sck -> Producer' [ByteString] m () Source #
This is a low-level function for simply passing a message received on a socket downstream.
receiveMessage sock = liftIO (Z.receiveMulti sock) >>= yield
sendMessage :: (Sender sck, MonadIO m) => Socket sck -> Consumer' (NonEmpty ByteString) m () Source #
This is a low-level function for simply sending a message available from upstream out on the socket.
sendMessage sock = await >>= liftIO . Z.sendMulti sock
Helpers
toNonEmpty :: Monad m => Pipe [ByteString] (NonEmpty ByteString) m () Source #
This is simply an adapter between lists of message parts and the
NonEmpty lists demanded by ZMQ Consumers. If an empty list
arrives in the input, it will be ignored.
Re-exported modules
module Pipes
module Pipes.Safe