pipes-zeromq4-0.2.0.0: Pipes integration for ZeroMQ messaging

Copyright(c) Matthew Peddie 2014
LicenseBSD3 (see the file zeromq4-pipes/LICENSE)
Maintainermatt.peddie@planet.com
Stabilityexperimental
PortabilityGHC
Safe HaskellNone
LanguageHaskell2010

Pipes.ZMQ4

Contents

Description

This module provides functions to help you attach ZMQ sockets to Pipes processing pipelines.

If you want to hook ZMQ sockets into a unidirectional pipeline involving Producers, Consumers and Pipes, see examples/proxy.hs for a short usage example for setupProducer and setupConsumer.

If you want to hook ZMQ sockets into a bidirectional or non-Pull-based pipeline involving Clients, Servers and Proxys, see examples/server.hs and examples/client.hs for a short usage example for setupServer and setupClient.

This module relies on the functions provided by Pipes.Safe to deal with exceptions and termination. If you need to avoid this layer of safety, please don't hesitate to contact the author for support.

Currently, everything runs in the same thread as the rest of the Effect, so if something blocks forever on a ZMQ receive or send, you may be in trouble.

Synopsis

Create data sources and sinks

Each of these functions takes a setup function, which is is run before any messaging activities happen, so anything you need to do to configure the socket (e.g. subscribe, set options, connect or bind) should be done within it.

Note that according to the ZeroMQ manual pages, the correct order of operations is to perform all socket configuration before running connect or bind.

One-directional sources and sinks (for use with Pipes's Pipe, Producer and Consumer)

setupProducer Source

Arguments

:: (MonadSafe m, Base m ~ IO, SocketType sockty, Receiver sockty) 
=> Context

ZMQ context

-> sockty

ZMQ socket type

-> (Socket sockty -> IO ())

Setup function

-> Producer [ByteString] m ()

Message source

Create a Producer of message data from the given ZeroMQ parameters. All messages received on the socket will be sent downstream with yield.

setupConsumer Source

Arguments

:: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty) 
=> Context

ZMQ context

-> sockty

ZMQ socket type

-> (Socket sockty -> IO ())

Setup function

-> Consumer' (NonEmpty ByteString) m ()

Message sink

Create a Consumer of message data from the given ZeroMQ parameters. All data successfully awaited from upstream will be sent out the socket.

The resulting Consumer will only accept NonEmpty lists of ByteString message parts. See toNonEmpty if this is a sticking point for you.

setupPipe Source

Arguments

:: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) 
=> Context

ZMQ context

-> sockty

ZMQ socket type

-> (Socket sockty -> IO ())

Setup function for socket

-> Pipe (NonEmpty ByteString) [ByteString] m ()

Pipeline stage via ZMQ

Create a Pipe out of a socket from the given ZeroMQ parameters. All data successfully awaited from upstream will be sent out the socket, and all message data received on the socket will be yielded downstream.

You can use this Pipe to replace a processing stage in your pipeline with a ZMQ transaction. Note that the ZMQ actions here are blocking; for example, if you make a Req socket, the pipeline will block when the message from upstream is transmitted until the response from the remote ZMQ socket is received. Consider using the pipes-concurrency package if you are sure you're using the right socket type but you don't want everything to block here.

setupPipePair Source

Arguments

:: (MonadSafe m, Base m ~ IO, SocketType txsock, SocketType rxsock, Sender txsock, Receiver rxsock) 
=> Context

ZMQ context

-> txsock

ZMQ transmit socket type

-> rxsock

ZMQ receive socket type

-> (Socket txsock -> IO ())

Setup function for transmit socket

-> (Socket rxsock -> IO ())

Setup function for receive socket

-> Pipe (NonEmpty ByteString) [ByteString] m ()

Pipeline stage via ZMQ

setupPipePair is the same as setupPipe except that it allows you to use two different sockets, one for sending and the other for receiving.

setupPipeline Source

Arguments

:: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) 
=> Context

ZMQ context

-> sockty

ZMQ socket type

-> (Socket sockty -> IO ())

Setup function for socket

-> Pipe [ByteString] (NonEmpty ByteString) m ()

Pipeline to run in the middle

-> Effect m () 

Create an Effect out of a socket from the given ZeroMQ parameters and a pipe to use to connect the resulting Producer and Consumer.

There are two ways to use a single socket to do both ZMQ sends and receives in the same pipeline. setupPipe places the ZMQ socket in the middle of the pipeline. setupPipeline places the same ZMQ socket at both ends: the provided Pipe is used to connect the receiving Producer and sending Consumer generated from the ZMQ socket. In other words, for setupPipeline ctx socktype opts myPipe,

socket RX >-> myPipe >-> socket TX

See setupPipe for some caveats. See examples/serverPipeline.hs for a usage example.

Bidirectional sources and sinks (for use with Pipes.Core's Proxy, Server and Client).

setupClient Source

Arguments

:: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) 
=> Context

ZMQ context

-> sockty

ZMQ socket type

-> (Socket sockty -> IO ())

Setup function

-> Client [ByteString] (NonEmpty ByteString) m () 

Create a Client' from the given ZeroMQ parameters. The Client' passes all messages it receives on the ZMQ socket upstream with request and sends all corresponding replies back out the socket.

setupServer Source

Arguments

:: (MonadSafe m, Base m ~ IO, SocketType sockty, Sender sockty, Receiver sockty) 
=> Context

ZMQ context

-> sockty

ZMQ socket type

-> (Socket sockty -> IO ())

Setup function

-> NonEmpty ByteString

Server request input

-> Server (NonEmpty ByteString) [ByteString] m () 

Create a Server' from the given ZeroMQ parameters. The Server' sends all received requests out on the ZMQ socket and passes all messages it receives over the ZMQ socket back downstream with respond.

Low-level safe setup

It's recommended to use setupProducer, setupConsumer and friends instead of these functions unless you know what you're doing and need something else.

setup :: (MonadSafe m, Base m ~ IO, SocketType sockty) => Context -> sockty -> (Socket sockty -> IO ()) -> (Socket sockty -> m v) -> m v Source

This is the low-level function for safely bracketing ZMQ socket creation, so that any exceptions or Pipe termination will not result in abandoned sockets. For example, setupProducer is defined as

setupProducer ctx ty opts = setup ctx ty opts $ forever . receiveMessage

receiveMessage :: (Receiver sck, MonadIO m) => Socket sck -> Producer' [ByteString] m () Source

This is a low-level function for simply passing a message received on a socket downstream.

receiveMessage sock = forever $ liftIO (Z.receiveMulti sock) >>= yield

sendMessage :: (Sender sck, MonadIO m) => Socket sck -> Consumer' (NonEmpty ByteString) m () Source

This is a low-level function for simply sending a message available from upstream out on the socket.

sendMessage sock = await >>= liftIO . Z.sendMulti sock

Helpers

toNonEmpty :: Monad m => Pipe [ByteString] (NonEmpty ByteString) m () Source

This is simply an adapter between lists of message parts and the NonEmpty lists demanded by ZMQ Consumers. If an empty list arrives in the input, it will be ignored.

Re-exported modules

module Pipes

module Pipes.Safe