Portability | portable |
---|---|
Stability | experimental |
Safe Haskell | Safe-Infered |
Basic communication patterns
- withServer :: Context -> String -> Parameter -> Int -> AccessPoint -> LinkType -> InBound c -> OutBound o -> OnError -> (String -> Iteratee c IO i) -> Fetch i o -> (Service -> IO a) -> IO a
- data Client i o
- clientContext :: Client i o -> Context
- setClientOptions :: Client i o -> [SocketOption] -> IO ()
- withClient :: Context -> AccessPoint -> OutBound o -> InBound i -> (Client i o -> IO a) -> IO a
- request :: Client i o -> Enumerator o IO () -> Iteratee i IO a -> IO (Either SomeException a)
- askFor :: Client i o -> Enumerator o IO () -> IO ()
- checkFor :: Client i o -> Iteratee i IO a -> IO (Maybe (Either SomeException a))
- data Pub o
- pubContext :: Pub o -> Context
- setPubOptions :: Pub o -> [SocketOption] -> IO ()
- withPub :: Context -> AccessPoint -> OutBound o -> (Pub o -> IO a) -> IO a
- issue :: Pub o -> Enumerator o IO () -> IO ()
- withPeriodicPub :: Context -> String -> Parameter -> Timeout -> AccessPoint -> OutBound o -> OnError_ -> Fetch_ o -> (Service -> IO a) -> IO a
- withSub :: Context -> String -> Parameter -> [Topic] -> AccessPoint -> InBound i -> OnError_ -> Dump i -> (Service -> IO a) -> IO a
- data Sub i
- subContext :: Sub i -> Context
- setSubOptions :: Sub i -> [SocketOption] -> IO ()
- withSporadicSub :: Context -> AccessPoint -> InBound i -> [Topic] -> (Sub i -> IO a) -> IO a
- checkSub :: Sub i -> Iteratee i IO a -> IO (Maybe (Either SomeException a))
- waitSub :: Sub i -> Iteratee i IO a -> IO (Either SomeException a)
- unsubscribe :: Sub i -> Topic -> IO ()
- resubscribe :: Sub i -> Topic -> IO ()
- data Pipe o
- pipeContext :: Pipe o -> Context
- setPipeOptions :: Pipe o -> [SocketOption] -> IO ()
- withPipe :: Context -> AccessPoint -> OutBound o -> (Pipe o -> IO a) -> IO a
- push :: Pipe o -> Enumerator o IO () -> IO ()
- withPuller :: Context -> String -> Parameter -> AccessPoint -> InBound i -> OnError_ -> Dump i -> (Service -> IO a) -> IO a
- data Peer a
- peerContext :: Peer a -> Context
- setPeerOptions :: Peer a -> [SocketOption] -> IO ()
- withPeer :: Context -> AccessPoint -> LinkType -> InBound a -> OutBound a -> (Peer a -> IO b) -> IO b
- send :: Peer o -> Enumerator o IO () -> IO ()
- receive :: Peer i -> Iteratee i IO a -> IO (Either SomeException a)
- data AccessPoint = Address {
- acAdd :: String
- acOs :: [SocketOption]
- data LinkType
- parseLink :: String -> Maybe LinkType
- type InBound a = ByteString -> IO a
- type OutBound a = a -> IO ByteString
- idIn :: InBound ByteString
- idOut :: OutBound ByteString
- inString :: InBound String
- outString :: OutBound String
- inUTF8 :: InBound String
- outUTF8 :: OutBound String
- data Criticality
- type OnError = Criticality -> SomeException -> String -> Parameter -> IO (Maybe ByteString)
- type OnError_ = Criticality -> SomeException -> String -> Parameter -> IO ()
- chainIO :: IO a -> (a -> Iteratee b IO c) -> Iteratee b IO c
- chainIOe :: IO a -> (a -> Iteratee b IO c) -> Iteratee b IO c
- tryIO :: IO a -> Iteratee i IO a
- tryIOe :: IO a -> Iteratee i IO a
- data Service
- srvName :: Service -> String
- srvContext :: Service -> Context
- pause :: Service -> IO ()
- resume :: Service -> IO ()
- changeParam :: Service -> Parameter -> IO ()
- changeOption :: Service -> SocketOption -> IO ()
- data Context
- withContext :: Size -> (Context -> IO a) -> IO a
- data SocketOption
- type Topic = String
- alltopics :: [Topic]
- notopic :: [Topic]
- type Timeout = Int64
- type Parameter = String
- noparam :: Parameter
Server/Client
withServer :: Context -> String -> Parameter -> Int -> AccessPoint -> LinkType -> InBound c -> OutBound o -> OnError -> (String -> Iteratee c IO i) -> Fetch i o -> (Service -> IO a) -> IO aSource
Starts one or more server threads
and executes an action that
receives a Service
to control the server.
The Service
is a thread local resource.
It must not be passed to threads forked
from the thread that has started the service.
The Service
is valid only in the scope of the action.
When the action terminates, the server is automatically stopped.
During the action, the server can be paused and restarted.
Also, the SocketOption
of the underlying ZMQ Socket
can be changed.
Please refer to pause
, resume
and changeOption
for more details.
The application may implement control parameters.
Control parameters are mere strings that are passed
to the application call-backs.
It is up to the application to enquire these strings
and to implement different behaviour for the possible settings.
Control parameter can be changed during run-time
by means of changeParam
.
Parameters:
-
Context
: The ZMQ context; -
String
: The name of the server, useful for debugging; -
Parameter
: The initial value of the control parameter passed to all application call-backs; -
Int
: The number of worker threads; note that a server with only one thread handles client requests sequentially. The number of threads (together with the number of hardware processing resources) defines how many client requests can be processed in parallel. -
AccessPoint
: The access point, through which this server can be reached; -
LinkType
: The link type; standalone servers usually bind their access point, whereas clients connect to it. Instead, a server may also connect to a load-balancing device, to which other servers and clients connect (seewithDevice
andwithQueue
). -
InBound
: The converter to convert the incoming data stream (of typeByteString
) into a client request component. Note that the converter converts single message segments to components of type c. TheIteratee
, receiving this c-typed elements shall combine them to a complete request of type i, which is then processed by anEnumerator
to create the server response. -
OutBound
: The converter to convert the results of type o to aByteString
, which then is sent back to the client. -
OnError
: The error handler -
String
->Iteratee
: TheIteratee
that processes request components of type c and yields a request of type i. TheString
argument is the control parameter, whose logic is implemented by the application. -
Fetch
: TheEnumerator
that processes the request of type i to produce results of type o. -
Service
-> IO (): The action to invoke, when the server has been started; the service is used to control the server.
The following code fragment shows a simple server to process data base queries using standard converters and error handlers not further defined here:
withContext 1 $ \ctx -> do c <- connectODBC "DSN=xyz" -- some ODBC connection s <- prepare c "select ..." -- some database query withServer ctx "MyQuery" -- name of the server is "MyQuery" noparam -- no parameter 5 -- five worker threads (Address "tcp://*:5555" []) Bind -- bind to this address iconv oconv -- some standard converters onErr -- some standard error handler (\_ -> one []) --Iteratee
for single segment messages; -- refer toEnumerator
for details (dbFetcher s) $ \srv -> -- theEnumerator
; untilInterrupt $ do -- install a signal handler for SIGINT -- and repeat the following action -- until SIGINT is received; putStrLn $ "server " ++ srvName srv ++ " up and running..." threadDelay 1000000
The untilInterrupt loop may be implemented as follows:
untilInterrupt :: IO () -> IO () untilInterrupt run = do continue <- newMVar True _ <- installHandler sigINT (Catch $ handler continue) Nothing go continue where handler m = modifyMVar_ m (\_ -> return False) go m = do run continue <- readMVar m when continue $ go m
Finally, a simple dbFetcher:
dbFetcher :: SQL.Statement -> Fetch [SQL.SqlValue] String dbFetcher s _ _ _ stp = tryIO (SQL.execute s []) >>= \_ -> go stp where go step = case step of E.Continue k -> do mbR <- tryIO $ SQL.fetchRow s case mbR of Nothing -> E.continue k -- convRow is not defined here Just r -> go $$ k (E.Chunks [convRow r]) _ -> E.returnI step
setClientOptions :: Client i o -> [SocketOption] -> IO ()Source
Setting SocketOption
to the underlying ZMQ Socket
withClient :: Context -> AccessPoint -> OutBound o -> InBound i -> (Client i o -> IO a) -> IO aSource
Creates a Client
;
a client is not a background process like a server,
but a data type that provides functions
to interoperate with a server.
withClient
creates a client and
invokes the application-defined action,
which receives a Client
argument.
The lifetime of the Client
is limited
to the invoked action.
When the action terminates, the Client
dies.
Parameters:
-
Context
: The ZMQ Context; -
AccessPoint
: The access point, to which the client connects; -
OutBound
: Converter to convert a request from type o to the wire formatByteString
. Note that, as for servers, the request may be composed of components that together form the request. The type o corresponds to one of these request components, not necessarily to the request type as a whole, which is determined when issuing a request. -
InBound
: Converter to convert a reply (ByteString
) into typei
. Note again that the reply may consist of many message segments. The type i relates to one reply component, not necessarily to the reply type as a whole, which is determined when issuing a request. -
Client
-> IO a: The action to perform with this client.
request :: Client i o -> Enumerator o IO () -> Iteratee i IO a -> IO (Either SomeException a)Source
Synchronously requesting a service; the function blocks the current thread, until a reply is received.
Parameters:
-
Client
: The client that performs the request -
Enumerator
: Enumerator to create the request message stream -
Iteratee
: Iteratee to process the reply message stream
A simple client that just writes the results to stdout
:
rcv :: String -> IO () rcv req = withContext 1 $ \ctx -> withClient ctx (Address "tcp://localhost:5555" []) -- connect to this address (return . B.pack) (return . B.unpack) $ -- string converters \s -> do -- request with enum and outit ei <- request s (enum req) outit case ei of Left e -> putStrLn $ "Error: " ++ show (e::SomeException) Right _ -> return ()
-- Enumerator that returns just one string enum :: String -> E.Enumerator String IO () enum = once (return . Just)
-- Iteratee that just writes to stdout outit :: E.Iteratee String IO () outit = do mbi <- EL.head case mbi of Nothing -> return () Just i -> liftIO (putStrLn i) >> outit
Note that this code just issues one request, which is not the most typical use case. It is more likely that the action will loop for ever and receive requests, for instance, from a user interface.
askFor :: Client i o -> Enumerator o IO () -> IO ()Source
Asynchronously requesting a service; the function sends a request to the server without waiting for a result.
Parameters:
-
Client
: The client that performs the request -
Enumerator
: Enumerator to create the request message stream
checkFor :: Client i o -> Iteratee i IO a -> IO (Maybe (Either SomeException a))Source
Polling for a reply;
the function polls for a server request.
If nothing has been received, it returns Nothing
;
otherwise it returns Just
the result or an error.
Parameters:
The synchronous request (see request
)
could be implemented asynchronously like:
rcv :: String -> IO () rcv req = withContext 1 $ \ctx -> do let ap = address l "tcp://localhost:5555" [] withClient ctx ap (return . B.pack) (return . B.unpack) $ \s -> do ei <- try $ askFor s (enum req) case ei of Left e -> putStrLn $ "Error: " ++ show (e::SomeException) Right _ -> wait s -- check for results periodically where wait s = checkFor s outit >>= \mbei -> case mbei of Nothing -> do putStrLn "Waiting..." threadDelay 10000 >> wait s Just (Left e) -> putStrLn $ "Error: " ++ show e Just (Right _) -> putStrLn "Ready!"
Publish/Subscribe
setPubOptions :: Pub o -> [SocketOption] -> IO ()Source
Setting SocketOption
to the underlying ZMQ Socket
withPub :: Context -> AccessPoint -> OutBound o -> (Pub o -> IO a) -> IO aSource
Creates a publisher;
A publisher is a data type
that provides an interface to publish data to subscribers.
withPub
creates a publisher and invokes
an application-defined action,
which receives a Pub
argument.
The lifetime of the publisher is limited to the action.
When the action terminates, the publisher dies.
Parameter:
-
Context
: The ZMQ Context -
AccessPoint
: The access point the publisher will bind -
OutBound
: A converter to convert from type o to the wire formatByteString
. Note that a publisher may create a data stream; the type o is then the type of one segment of this stream, not of the stream as a whole. -
Pub
-> IO (): The action to invoke
issue :: Pub o -> Enumerator o IO () -> IO ()Source
Publishes the data stream created by an enumerator;
Parameters:
-
Pub
: The publisher -
Enumerator
: The enumerator to create an outgoing data stream.
A simple weather report publisher:
withContext 1 $ \ctx -> withPub ctx (Address "tcp://*:5555" []) (return . B.pack) $ \pub -> untilInterrupt $ do issue pub (once weather noparam) threadDelay 10000 -- update every 10ms
-- fake weather report with some random values weather :: String -> IO (Maybe String) weather _ = do zipcode <- randomRIO (10000, 99999) :: IO Int temperature <- randomRIO (-10, 30) :: IO Int humidity <- randomRIO ( 10, 60) :: IO Int return $ Just (unwords [show zipcode, show temperature, show humidity])
withPeriodicPub :: Context -> String -> Parameter -> Timeout -> AccessPoint -> OutBound o -> OnError_ -> Fetch_ o -> (Service -> IO a) -> IO aSource
Creates a background process that periodically publishes data;
Parameters:
-
Context
: The ZMQ Context -
String
: Name of this Publisher; useful for debugging -
Parameter
: The initial value of the control parameter -
Timeout
: The period of the publisher in microseconds; the process will issue the publisher data every n microseconds. -
AccessPoint
: Bind address -
OutBound
: A converter that converts one segment of the data stream from type o to the wire formatByteString
-
OnError_
: Error Handler -
String
->Fetch
:Enumerator
to create the outgoing data stream; the string argument is the parameter. -
Service
-> IO (): The user action to perform
The weather report publisher introduced above (see withPub
)
can be implemented by means of withPeriodicPub
as:
withPeriodicPub ctx "Weather Report" noparam 100000 -- publish every 100ms (Address "tcp://*:5555" []) (return . B.pack) -- string converter onErr_ -- standard error handler (\_ -> fetch1 fetch) -- creates one instance -- of the return of "fetch"; -- seeEnumerator
for details $ \pub -> untilInterrupt $ do -- until SIGINT, seewithServer
for details threadDelay 100000 putStrLn $ "I am doing nothing " ++ srvName pub
withSub :: Context -> String -> Parameter -> [Topic] -> AccessPoint -> InBound i -> OnError_ -> Dump i -> (Service -> IO a) -> IO aSource
A subscription is a background service
that receives and processes data streams
from a publisher.
A typical use case is an application
that operates on periodically updated data;
the subscriber would receive these data and
and make them accessible to other threads in the process
through an MVar
.
Parameters:
-
Context
: The ZMQ Context -
String
: The subscriber's name -
Parameter
: The initial value of the control parameter - [
Topic
]: The topics to subscribe to; in the example above (withPub
), the publisher publishes the weather report per zip code; the zip code, in this example, could be a meaningful topic for a subscriber. It is good practice to send the topic in an initial message segment, the envelope, to avoid that the subscriber matches on some arbitrary part of the message. -
InBound
: A converter that converts one segment of the incoming data stream to type o -
OnError_
: Error handler -
Dump
:Iteratee
to process the incoming data stream. -
Service
-> IO (): Application-defined action to control the service. Note thatService
is a thread-local resource and must not be passed to threads forked from the action.
Weather Report Subscriber:
withContext 1 $ \ctx -> withSub ctx "Weather Report" noparam ["10001"] -- zipcode to subscribe to (Address "tcp://localhost:5555" []) (return . B.unpack) onErr_ output -- Iteratee that just writes to stdout $ \s -> untilInterrupt $ do putStrLn $ "Doing nothing " ++ srvName s threadDelay 1000000
setSubOptions :: Sub i -> [SocketOption] -> IO ()Source
Setting SocketOption
to the underlying ZMQ Socket
withSporadicSub :: Context -> AccessPoint -> InBound i -> [Topic] -> (Sub i -> IO a) -> IO aSource
Similar to Pub
, a Sub
is a data type
that provides an interface to subscribe data.
withSporadicSub
creates a subscriber and invokes
an application-defined action,
which receives a Sub
argument.
The lifetime of the subscriber is limited to the action.
When the action terminates, the subscriber dies.
waitSub :: Sub i -> Iteratee i IO a -> IO (Either SomeException a)Source
Waiting for data;
the function blocks the current thread,
until data are being received from the publisher.
It returns either SomeException
or the result.
Parameters:
Pipeline
A pipeline consists of a "pusher" and a set of workers ("pullers"). The pusher sends jobs down the pipeline that will be assigned to one of the workers. The pipeline pattern is, thus, a work-balancing scheme.
setPipeOptions :: Pipe o -> [SocketOption] -> IO ()Source
Setting SocketOption
to the underlying ZMQ Socket
withPipe :: Context -> AccessPoint -> OutBound o -> (Pipe o -> IO a) -> IO aSource
Creates a pipeline;
a Pipe
is a data type
that provides an interface to push a data stream
to workers connected to the other side of the pipe.
withPipe
creates a pipeline and invokes an application-defined
action which receives a Pipe
argument.
The lifetime of the Pipe
is limited to the action.
When the action terminates, the Pipe
dies.
Parameters:
-
Context
: The ZMQ Context -
AccessPoint
: The bind address -
OutBound
: A converter to convert message segments of type o to the wire formatByteString
-
Pipe
-> IO (): The action to invoke
push :: Pipe o -> Enumerator o IO () -> IO ()Source
Sends a job down the pipeline;
Parameters:
-
Pipe
: The pipeline -
Enumerator
: enumerator to create the data stream that constitutes the job
A simple pusher:
sendF :: FilePath -> IO () sendF f = withContext 1 $ \ctx -> do let ap = Address "tcp://*:5555" [] withPipe ctx ap return $ \p -> push pu (EB.enumFile f) -- file enumerator -- see Data.Enumerator.Binary (EB)
withPuller :: Context -> String -> Parameter -> AccessPoint -> InBound i -> OnError_ -> Dump i -> (Service -> IO a) -> IO aSource
A puller is a background service that receives and processes data streams from a pipeline.
Parameters:
-
Context
: The ZMQ Context -
String
: The service name -
Parameter
: The initial value of the control parameter -
AccessPoint
: The address to connect to -
InBound
: A converter to convert segments of the incoming data stream from the wire formatByteString
to the type i -
OnError_
: Error Handler -
Dump
:Iteratee
to process the incoming data stream -
Service
-> IO (): Application-defined action
A worker that just writes the incoming stream to stdout:
withContext 1 $ \ctx -> withPuller ctx "Worker" noparam (Address "tcp://localhost:5555" []) (return . B.unpack) onErr_ output $ \s -> untilInterrupt $ do putStrLn "Doing nothing " ++ srvName s threadDelay 100000
Exclusive Pair
An Exclusive Pair is a general purpose pattern
of two equal peers that communicate with each other
by sending (send
) and receiving (receive
) data.
One of the peers has to bind
the AccessPoint
the other connect
s to it.
setPeerOptions :: Peer a -> [SocketOption] -> IO ()Source
Sets SocketOption
withPeer :: Context -> AccessPoint -> LinkType -> InBound a -> OutBound a -> (Peer a -> IO b) -> IO bSource
Creates a Peer
;
a peer is a data type
that provides an interface to exchange data with another peer.
withPeer
creates the peer and invokes an application-defined
action that receives a Peer
argument.
The lifetime of the Peer
is limited to the action.
When the action terminates, the Peer
dies.
Parameters:
-
Context
: The ZMQ Context -
AccessPoint
: The address, to which this peer either binds or connects -
LinkType
: One of the peers has to bind the address, the other has to connect. -
InBound
: A converter to convert message segments from the wire formatByteString
to type i -
OutBound
: A converter to convert message segments of type o to the wire formatByteString
-
Peer
-> IO (): The action to invoke
send :: Peer o -> Enumerator o IO () -> IO ()Source
Sends a data stream to another peer;
Parameters:
-
Peer
: The peer -
Enumerator
: Enumerator to create the outoing data stream
Service Access Point
data AccessPoint Source
Describes how to access a service;
an AccessPoint
usually consists of an address
and a list of SocketOption
.
Addresses are passed in as strings of the form:
- "tcp://*:5555": for binding the port 5555 via TCP/IP on all network interfaces; an IPv4 address or the operating system interface name could be given instead.
- "tcp://localhost:5555": for connecting to the port 5555 on localhost via TCP/IP; the endpoint may given as DNS name or as an IPv4 address.
- "ipc://tmp/queues/0": for binding and connecting to a local inter-process communication endpoint, in this case created under /tmp/queues/0; only available on UNIX.
- "inproc://worker": for binding and connecting to the process internal address worker
For more options, please refer to the zeromq documentation.
Address | |
|
How to link to an AccessPoint
parseLink :: String -> Maybe LinkTypeSource
Safely read LinkType
;
ignores the case of the input string
and, besides "bind" and "connect",
also accepts "bin", "con" and "conn";
intended for use with command line parameters
Converters
type InBound a = ByteString -> IO aSource
Converters are user-defined functions
that convert a ByteString
to a value of type a (InBound
) or
a value of type a to ByteString
(OutBound
).
Converters are, hence, similar to put and get in the Binary
monad.
The reason for using explicit, user-defined converters
instead of Binary encode and decode
is that the conversion
may be more complex, involving reading configurations
or other IO
actions.
The simplest possible in-bound converter for plain strings is:
let iconv = return . toString
type OutBound a = a -> IO ByteStringSource
A simple string OutBound
converter may be:
let oconv = return . fromString
Errors and Error Handlers
data Criticality Source
Indicates criticality of the error event
Error | The current operation
(e.g. processing a request)
has not terminated properly,
but the service is able to continue;
the error may have been caused by a faulty
request or other temporal conditions.
Note that if an application-defined |
Critical | One worker thread is lost ( |
Fatal | The service cannot recover and will terminate |
type OnError = Criticality -> SomeException -> String -> Parameter -> IO (Maybe ByteString)Source
Error handler for servers;
receives the Criticality
of the error event,
the exception, the server name and the service control parameter.
If the error handler returns Just
a ByteString
this ByteString
is sent to the client as error message.
A good policy for implementing servers is
to terminate or restart the Server
when a Fatal
or Critical
error occurs
and to send an error message to the client
on a plain Error
.
The error handler, additionally, may log the incident
or inform an administrator.
type OnError_ = Criticality -> SomeException -> String -> Parameter -> IO ()Source
Error handler for all services but servers;
receives the Criticality
of the error event,
the exception, the service name
and the service control parameter.
A good policy is
to terminate or restart the service
when a Fatal
error occurs
and to continue, if possible,
on a plain Error
.
The error handler, additionally, may log the incident
or inform an administrator.
chainIO :: IO a -> (a -> Iteratee b IO c) -> Iteratee b IO cSource
Chains IO Actions in an Enumerator
together;
throws SomeException
using throwError
when an error occurs
chainIOe :: IO a -> (a -> Iteratee b IO c) -> Iteratee b IO cSource
Chains IO Actions in an Enumerator
together;
returns Error
when an error occurs
tryIO :: IO a -> Iteratee i IO aSource
Executes an IO Actions in an Iteratee
;
throws SomeException
using throwError
when an error occurs
Generic Serivce
Generic Service data type;
Service
is passed to application-defined actions
used with background services, namely
withServer, withPeriodicPub, withSub, withPuller and withDevice.
changeOption :: Service -> SocketOption -> IO ()Source
Changes SocketOption
ZMQ Context
data Context
A 0MQ context representation.
withContext :: Size -> (Context -> IO a) -> IO a
Run an action with a 0MQ context. The Context
supplied to your
action will not be valid after the action either returns or
throws an exception.
data SocketOption
The option to set on 0MQ sockets (cf. zmq_setsockopt and zmq_getsockopt manpages for details).
Affinity Word64 | ZMQ_AFFINITY |
Backlog CInt | ZMQ_BACKLOG |
Events PollEvent | ZMQ_EVENTS |
FD CInt | ZMQ_FD |
Identity String | ZMQ_IDENTITY |
Linger CInt | ZMQ_LINGER |
Rate Int64 | ZMQ_RATE |
ReceiveBuf Word64 | ZMQ_RCVBUF |
ReceiveMore Bool | ZMQ_RCVMORE |
ReconnectIVL CInt | ZMQ_RECONNECT_IVL |
ReconnectIVLMax CInt | ZMQ_RECONNECT_IVL_MAX |
RecoveryIVL Int64 | ZMQ_RECOVERY_IVL |
SendBuf Word64 | ZMQ_SNDBUF |
HighWM Word64 | ZMQ_HWM |
McastLoop Bool | ZMQ_MCAST_LOOP |
RecoveryIVLMsec Int64 | ZMQ_RECOVERY_IVL_MSEC |
Swap Int64 | ZMQ_SWAP |