| Copyright | (c) Matthew Peddie 2014 |
|---|---|
| License | BSD3 (see the file zeromq4-pipes/LICENSE) |
| Maintainer | matt.peddie@planet.com |
| Stability | experimental |
| Portability | GHC |
| Safe Haskell | None |
| Language | Haskell2010 |
Pipes.ZMQ4
Description
This module provides functions to help you attach ZMQ sockets to Pipes processing pipelines.
If you want to hook ZMQ sockets into a unidirectional pipeline
involving Producers, Consumers and Pipes, see
examples/proxy.hs for a short usage example for setupProducer and
setupConsumer.
If you want to hook ZMQ sockets into a bidirectional or
non-Pull-based pipeline involving Clients, Servers and Proxys,
see examples/server.hs and examples/client.hs for a short usage
example for setupServer and setupClient.
This module relies on the functions provided by Pipes.Safe to deal with exceptions and termination. If you need to avoid this layer of safety, please don't hesitate to contact the author for support.
- setupProducer :: (MonadSafe m, Base m ~ IO, SocketType sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Producer [ByteString] m ()
- setupConsumer :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Consumer' (NonEmpty ByteString) m ()
- setupBi :: (MonadSafe m, Base m ~ IO, MonadSafe m1, Base m1 ~ IO, SocketType sockty, Sender sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> m1 (Producer [ByteString] m (), Consumer (NonEmpty ByteString) m ())
- setupClient :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> Client [ByteString] (NonEmpty ByteString) m ()
- setupServer :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> NonEmpty ByteString -> Server (NonEmpty ByteString) [ByteString] m ()
- setup :: (MonadSafe m, Base m ~ IO, SocketType sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> (Socket sockty -> m v) -> m v
- receiveLoop :: (Receiver sck, MonadIO m) => Socket sck -> Producer [ByteString] m ()
- sendLoop :: (Sender sck, MonadIO m) => Socket sck -> Consumer' (NonEmpty ByteString) m ()
- toNonEmpty :: Monad m => Pipe [ByteString] (NonEmpty ByteString) m ()
- module Pipes
- module Pipes.Safe
Create data sources and sinks
Each of these functions takes a setup function, which is is run
before any messaging activities happen, so anything you need to
do to configure the socket (e.g. subscribe, set options,
connect or bind) should be done within it.
Note that according to the ZeroMQ manual pages, the correct
order of operations is to perform all socket configuration before
running connect or bind.
Arguments
| :: (MonadSafe m, Base m ~ IO, SocketType sockty, Receiver sockty) | |
| => Context | ZMQ context |
| -> sockty | ZMQ socket type |
| -> (Socket sockty -> IO ()) | Setup function |
| -> Producer [ByteString] m () | Message source |
Arguments
| :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty) | |
| => Context | ZMQ context |
| -> sockty | ZMQ socket type |
| -> (Socket sockty -> IO ()) | Setup function |
| -> Consumer' (NonEmpty ByteString) m () | Message sink |
Create a Consumer of message data from the given ZeroMQ
parameters. All data successfully awaited from upstream will be
sent out the socket.
The resulting Consumer will only accept NonEmpty lists of
ByteString message parts. See toNonEmpty if this is a
sticking point for you.
Arguments
| :: (MonadSafe m, Base m ~ IO, MonadSafe m1, Base m1 ~ IO, SocketType sockty, Sender sockty, Receiver sockty) | |
| => Context | ZMQ context |
| -> sockty | ZMQ socket type |
| -> (Socket sockty -> IO ()) | Setup function |
| -> m1 (Producer [ByteString] m (), Consumer (NonEmpty ByteString) m ()) | Message (source, sink) pair |
Create both a Producer and a Consumer of message data, both
corresponding to the same socket, from the given ZeroMQ parameters.
This is like setupProducer and setupConsumer combined; the
socket type must be both a Sender and a Receiver (for
example, a Dealer). Messages received over the socket are
yielded by the Producer; messages awaited by the Consumer
are sent over the socket.
See also the descriptions of setupProducer and setupConsumer.
Arguments
| :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) | |
| => Context | ZMQ context |
| -> sockty | ZMQ socket type |
| -> (Socket sockty -> IO ()) | Setup function |
| -> Client [ByteString] (NonEmpty ByteString) m () |
Arguments
| :: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) | |
| => Context | ZMQ context |
| -> sockty | ZMQ socket type |
| -> (Socket sockty -> IO ()) | Setup function |
| -> NonEmpty ByteString | Server request input |
| -> Server (NonEmpty ByteString) [ByteString] m () |
Low-level safe setup
It's recommended to use setupProducer, setupConsumer and
friends instead of these functions unless you know what you're
doing and need something else.
setup :: (MonadSafe m, Base m ~ IO, SocketType sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> (Socket sockty -> m v) -> m v Source
This is the low-level function for safely bracketing ZMQ socket
creation, so that any exceptions or Pipe termination will not
result in abandoned sockets. For example, setupProducer is
defined as
setupProducer ctx ty opts = setup ctx ty opts receiveLoop
receiveLoop :: (Receiver sck, MonadIO m) => Socket sck -> Producer [ByteString] m () Source
This is a low-level function for simply passing all messages received on a socket downstream.
receiveLoop sock = forever $ liftIO (Z.receiveMulti sock) >>= yield
sendLoop :: (Sender sck, MonadIO m) => Socket sck -> Consumer' (NonEmpty ByteString) m () Source
This is a low-level function for simply sending all messages available from upstream out on the socket.
sendLoop sock = forever $ await >>= liftIO . Z.sendMulti sock
Helpers
toNonEmpty :: Monad m => Pipe [ByteString] (NonEmpty ByteString) m () Source
Re-exported modules
module Pipes
module Pipes.Safe