{-# OPTIONS_GHC -Wall #-} {-# LANGUAGE TypeFamilies #-} {-# LANGUAGE RankNTypes #-} {-| Module : Pipes.ZMQ4 Copyright : (c) Matthew Peddie 2014 License : BSD3 (see the file zeromq4-pipes/LICENSE) Maintainer : matt.peddie@planet.com Stability : experimental Portability : GHC This module provides functions to help you attach ZMQ sockets to "Pipes" processing pipelines. If you want to hook ZMQ sockets into a unidirectional pipeline involving 'Producer's, 'Consumer's and 'Pipe's, see @examples/proxy.hs@ for a short usage example for 'setupProducer' and 'setupConsumer'. If you want to hook ZMQ sockets into a bidirectional or non-'Pull'-based pipeline involving 'Client's, 'Server's and 'Proxy's, see @examples/server.hs@ and @examples/client.hs@ for a short usage example for 'setupServer' and 'setupClient'. This module relies on the functions provided by "Pipes.Safe" to deal with exceptions and termination. If you need to avoid this layer of safety, please don't hesitate to contact the author for support. Currently, everything runs in the same thread as the rest of the 'Effect', so __if something blocks forever on a ZMQ receive or send, you may be in trouble.__ -} module Pipes.ZMQ4 ( -- * Create data sources and sinks -- | Each of these functions takes a setup function, which is is run -- before any messaging activities happen, so anything you need to -- do to configure the socket (e.g. 'Z.subscribe', set options, -- 'Z.connect' or 'Z.bind') should be done within it. -- -- __Note that according to the ZeroMQ manual pages__, the correct -- order of operations is to perform all socket configuration before -- running 'Z.connect' or 'Z.bind'. -- ** One-directional sources and sinks (for use with "Pipes"'s 'Pipe', 'Producer' and 'Consumer') setupProducer , setupConsumer , setupPipe , setupPipePair , setupPipeline -- ** Bidirectional sources and sinks (for use with "Pipes.Core"'s 'Proxy', 'Server' and 'Client'). , setupClient , setupServer -- ** Low-level safe setup -- | It's recommended to use 'setupProducer', 'setupConsumer' and -- friends instead of these functions unless you know what you're -- doing and need something else. , setup , receiveMessage , sendMessage -- * Helpers , toNonEmpty -- * Re-exported modules , module Pipes , module Pipes.Safe ) where import Control.Monad (unless, forever) import qualified System.ZMQ4 as Z import qualified Data.ByteString as B import Data.List.NonEmpty (NonEmpty(..)) import Pipes import Pipes.Safe import Pipes.Core -- | This is the low-level function for safely 'bracket'ing ZMQ socket -- creation, so that any exceptions or 'Pipe' termination will not -- result in abandoned sockets. For example, 'setupProducer' is -- defined as -- -- @ -- setupProducer ctx ty opts = setup ctx ty opts $ forever . receiveMessage -- @ setup :: (MonadSafe m, Base m ~ IO, Z.SocketType sockty) => Z.Context -> sockty -> (Z.Socket sockty -> IO ()) -> (Z.Socket sockty -> m v) -> m v setup ctx ty opts act = bracket (Z.socket ctx ty) (liftIO . Z.close) $ \sock -> do _ <- liftIO $ opts sock act sock -- | This is a low-level function for simply passing a message -- received on a socket downstream. -- -- @ -- receiveMessage sock = liftIO (Z.receiveMulti sock) >>= yield -- @ receiveMessage :: (Z.Receiver sck, MonadIO m) => Z.Socket sck -> Producer' [B.ByteString] m () receiveMessage sock = liftIO (Z.receiveMulti sock) >>= yield -- | This is a low-level function for simply sending a message -- available from upstream out on the socket. -- -- @ -- sendMessage sock = await >>= liftIO . Z.sendMulti sock -- @ sendMessage :: (Z.Sender sck, MonadIO m) => Z.Socket sck -> Consumer' (NonEmpty B.ByteString) m () sendMessage sock = await >>= liftIO . Z.sendMulti sock -- | Create a 'Producer' of message data from the given ZeroMQ -- parameters. All messages received on the socket will be sent -- downstream with 'yield'. setupProducer :: (MonadSafe m, Base m ~ IO, Z.SocketType sockty, Z.Receiver sockty) => Z.Context -- ^ ZMQ context -> sockty -- ^ ZMQ socket type -> (Z.Socket sockty -> IO ()) -- ^ Setup function -> Producer [B.ByteString] m () -- ^ Message source setupProducer ctx ty opts = setup ctx ty opts $ forever . receiveMessage -- | Create a 'Consumer' of message data from the given ZeroMQ -- parameters. All data successfully 'await'ed from upstream will be -- sent out the socket. -- -- The resulting 'Consumer' will only accept 'NonEmpty' lists of -- 'B.ByteString' message parts. See 'toNonEmpty' if this is a -- sticking point for you. setupConsumer :: (MonadSafe m, Base m ~ IO, Z.SocketType sockty, Z.Sender sockty) => Z.Context -- ^ ZMQ context -> sockty -- ^ ZMQ socket type -> (Z.Socket sockty -> IO ()) -- ^ Setup function -> Consumer' (NonEmpty B.ByteString) m () -- ^ Message sink setupConsumer ctx ty opts = setup ctx ty opts $ forever . sendMessage -- | Create a 'Pipe' out of a socket from the given ZeroMQ parameters. -- All data successfully 'await'ed from upstream will be sent out the -- socket, and all message data received on the socket will be -- 'yield'ed downstream. -- -- You can use this 'Pipe' to replace a processing stage in your -- pipeline with a ZMQ transaction. Note that the ZMQ actions here -- are blocking; for example, if you make a 'Z.Req' socket, the -- pipeline will block when the message from upstream is transmitted -- until the response from the remote ZMQ socket is received. -- Consider using the @pipes-concurrency@ package if you are sure -- you're using the right socket type but you don't want everything to -- block here. setupPipe :: (MonadSafe m, Base m ~ IO, Z.SocketType sockty, Z.Sender sockty, Z.Receiver sockty) => Z.Context -- ^ ZMQ context -> sockty -- ^ ZMQ socket type -> (Z.Socket sockty -> IO ()) -- ^ Setup function for socket -> Pipe (NonEmpty B.ByteString) [B.ByteString] m () -- ^ Pipeline stage via ZMQ setupPipe ctx ty opts = setup ctx ty opts $ \sock -> forever $ sendMessage sock >> receiveMessage sock -- | 'setupPipePair' is the same as 'setupPipe' except that it allows -- you to use two different sockets, one for sending and the other for -- receiving. setupPipePair :: (MonadSafe m, Base m ~ IO, Z.SocketType txsock, Z.SocketType rxsock, Z.Sender txsock, Z.Receiver rxsock) => Z.Context -- ^ ZMQ context -> txsock -- ^ ZMQ transmit socket type -> rxsock -- ^ ZMQ receive socket type -> (Z.Socket txsock -> IO ()) -- ^ Setup function for transmit socket -> (Z.Socket rxsock -> IO ()) -- ^ Setup function for receive socket -> Pipe (NonEmpty B.ByteString) [B.ByteString] m () -- ^ Pipeline stage via ZMQ setupPipePair ctx txty rxty txopts rxopts = setup ctx txty txopts $ \tx -> setup ctx rxty rxopts $ \rx -> forever $ sendMessage tx >> receiveMessage rx -- | Create an 'Effect' out of a socket from the given ZeroMQ -- parameters and a pipe to use to connect the resulting 'Producer' -- and 'Consumer'. -- -- There are two ways to use a single socket to do both ZMQ sends and -- receives in the same pipeline. 'setupPipe' places the ZMQ socket -- in the middle of the pipeline. 'setupPipeline' places the same ZMQ -- socket at both ends: the provided 'Pipe' is used to connect the -- receiving Producer and sending Consumer generated from the ZMQ -- socket. In other words, for @setupPipeline ctx socktype opts -- myPipe@, -- -- @ -- socket RX >-> myPipe >-> socket TX -- @ -- -- See 'setupPipe' for some caveats. See @examples/serverPipeline.hs@ -- for a usage example. setupPipeline :: (MonadSafe m, Base m ~ IO, Z.SocketType sockty, Z.Sender sockty, Z.Receiver sockty) => Z.Context -- ^ ZMQ context -> sockty -- ^ ZMQ socket type -> (Z.Socket sockty -> IO ()) -- ^ Setup function for socket -> Pipe [B.ByteString] (NonEmpty B.ByteString) m () -- ^ Pipeline to run in the middle -> Effect m () setupPipeline ctx ty opts inpipe = setup ctx ty opts $ \sock -> forever $ (liftIO (Z.receiveMulti sock) >>= yield) >-> inpipe >-> (await >>= liftIO . Z.sendMulti sock) -- | -- This is a low-level function for passing all messages received on -- the socket upstream and sending the responses out on the same -- socket. -- -- @ -- clientLoop sock = forever $ -- liftIO (Z.receiveMulti sock) >>= -- request >>= -- liftIO . Z.sendMulti sock -- @ clientLoop :: (Z.Receiver sockty, Z.Sender sockty, MonadIO m) => Z.Socket sockty -> Client' [B.ByteString] (NonEmpty B.ByteString) m () clientLoop sock = forever $ liftIO (Z.receiveMulti sock) >>= request >>= liftIO . Z.sendMulti sock -- | Create a 'Client' from the given ZeroMQ parameters. The 'Client' -- passes all messages it receives on the ZMQ socket upstream with -- 'request' and sends all corresponding replies back out the socket. setupClient :: (MonadSafe m, Base m ~ IO, Z.SocketType sockty, Z.Sender sockty, Z.Receiver sockty) => Z.Context -- ^ ZMQ context -> sockty -- ^ ZMQ socket type -> (Z.Socket sockty -> IO ()) -- ^ Setup function -> Client [B.ByteString] (NonEmpty B.ByteString) m () setupClient ctx ty opts = setup ctx ty opts clientLoop -- | -- This is a low-level function for passing all messages received on -- the socket upstream and sending the responses out on the same -- socket. -- -- @ -- serverLoop sock msg = -- liftIO (Z.sendMulti sock msg) >> -- liftIO (Z.receiveMulti sock) >>= -- respond >>= -- serverLoop sock -- @ serverLoop :: (Z.Receiver sockty, Z.Sender sockty, MonadIO m) => Z.Socket sockty -> NonEmpty B.ByteString -> Server' (NonEmpty B.ByteString) [B.ByteString] m () serverLoop sock msg = liftIO (Z.sendMulti sock msg) >> liftIO (Z.receiveMulti sock) >>= respond >>= serverLoop sock -- | Create a 'Server' from the given ZeroMQ parameters. The 'Server' -- sends all received requests out on the ZMQ socket and passes all -- messages it receives over the ZMQ socket back downstream with -- 'respond'. setupServer :: (MonadSafe m, Base m ~ IO, Z.SocketType sockty, Z.Sender sockty, Z.Receiver sockty) => Z.Context -- ^ ZMQ context -> sockty -- ^ ZMQ socket type -> (Z.Socket sockty -> IO ()) -- ^ Setup function -> NonEmpty B.ByteString -- ^ Server request input -> Server (NonEmpty B.ByteString) [B.ByteString] m () setupServer ctx ty opts n = setup ctx ty opts (`serverLoop` n) -- | This is simply an adapter between lists of message parts and the -- 'NonEmpty' lists demanded by ZMQ 'Consumer's. If an empty list -- arrives in the input, it will be ignored. toNonEmpty :: Monad m => Pipe [B.ByteString] (NonEmpty B.ByteString) m () toNonEmpty = forever $ do msg <- await unless (null msg) $ yield (head msg :| tail msg)