module System.ZMQ (
Size
, Context
, Socket
, Flag(..)
, SocketOption(..)
, Poll(..)
, Timeout
, PollEvent(..)
, SType
, SubsType
, Pair(..)
, Pub(..)
, Sub(..)
, XPub(..)
, XSub(..)
, Req(..)
, Rep(..)
, XReq(..)
, XRep(..)
, Pull(..)
, Push(..)
#ifdef ZMQ2
, Up(..)
, Down(..)
#endif
, withContext
, withSocket
, setOption
, getOption
#ifdef ZMQ3
, getMsgOption
#endif
, System.ZMQ.subscribe
, System.ZMQ.unsubscribe
, bind
, connect
, send
, send'
, receive
, moreToReceive
, poll
, System.ZMQ.zmqVersion
, init
, term
, socket
, close
#ifdef ZMQ2
, Device(..)
, device
#endif
) where
import Prelude hiding (init)
import Control.Applicative
import Control.Exception
import Control.Monad (unless, when)
import Data.IORef (atomicModifyIORef)
import Foreign hiding (with)
import Foreign.C.Error
import Foreign.C.String
import Foreign.C.Types (CInt, CShort)
import qualified Data.ByteString as SB
import qualified Data.ByteString.Lazy as LB
import System.Mem.Weak (addFinalizer)
import System.Posix.Types (Fd(..))
import System.ZMQ.Base
import qualified System.ZMQ.Base as B
import System.ZMQ.Internal
import GHC.Conc (threadWaitRead, threadWaitWrite)
class SType a where
zmqSocketType :: a -> ZMQSocketType
data Pair = Pair
instance SType Pair where
zmqSocketType = const pair
data Pub = Pub
instance SType Pub where
zmqSocketType = const pub
data Sub = Sub
instance SType Sub where
zmqSocketType = const sub
data XPub = XPub
instance SType XPub where
zmqSocketType = const xpub
data XSub = XSub
instance SType XSub where
zmqSocketType = const xsub
data Req = Req
instance SType Req where
zmqSocketType = const request
data Rep = Rep
instance SType Rep where
zmqSocketType = const response
data XReq = Xreq
instance SType XReq where
zmqSocketType = const xrequest
data XRep = Xrep
instance SType XRep where
zmqSocketType = const xresponse
data Pull = Pull
instance SType Pull where
zmqSocketType = const pull
data Push = Push
instance SType Push where
zmqSocketType = const push
#ifdef ZMQ2
data Up = Up
instance SType Up where
zmqSocketType = const upstream
data Down = Down
instance SType Down where
zmqSocketType = const downstream
#endif
class SubsType a
instance SubsType Sub
instance SubsType XSub
data SocketOption =
Affinity Word64
| Backlog Int
| Events PollEvent
| FD Int
| Identity String
| Linger Int
| Rate Int64
| ReceiveBuf Word64
| ReceiveMore Bool
| ReconnectIVL Int
| ReconnectIVLMax Int
| RecoveryIVL Int64
| SendBuf Word64
#ifdef ZMQ2
| HighWM Word64
| McastLoop Bool
| RecoveryIVLMsec Int64
| Swap Int64
#endif
#ifdef ZMQ3
| IPv4Only Bool
| MaxMsgSize Int64
| McastHops Int
| ReceiveHighWM Int
| ReceiveTimeout Int
| SendHighWM Int
| SendTimeout Int
#endif
deriving (Eq, Ord, Show)
#ifdef ZMQ3
data MessageOption = MoreMsgParts CInt
deriving (Eq, Ord, Show)
#endif
data PollEvent =
In
| Out
| InOut
| Native
| None
deriving (Eq, Ord, Show)
data Poll =
forall a. S (Socket a) PollEvent
| F Fd PollEvent
setOption :: Socket a -> SocketOption -> IO ()
setOption s (Affinity o) = setIntOpt s affinity o
setOption s (Backlog o) = setIntOpt s backlog o
setOption _ (Events _) = return ()
setOption _ (FD _) = return ()
setOption s (Identity o) = setStrOpt s identity o
setOption s (Linger o) = setIntOpt s linger o
setOption s (Rate o) = setIntOpt s rate o
setOption s (ReceiveBuf o) = setIntOpt s receiveBuf o
setOption _ (ReceiveMore _) = return ()
setOption s (ReconnectIVL o) = setIntOpt s reconnectIVL o
setOption s (ReconnectIVLMax o) = setIntOpt s reconnectIVLMax o
setOption s (RecoveryIVL o) = setIntOpt s recoveryIVL o
setOption s (SendBuf o) = setIntOpt s sendBuf o
#ifdef ZMQ2
setOption s (HighWM o) = setIntOpt s highWM o
setOption s (McastLoop o) = setBoolOpt s mcastLoop o
setOption s (RecoveryIVLMsec o) = setIntOpt s recoveryIVLMsec o
setOption s (Swap o) = setIntOpt s swap o
#endif
#ifdef ZMQ3
setOption s (IPv4Only o) = setBoolOpt s ipv4Only o
setOption s (MaxMsgSize o) = setIntOpt s maxMessageSize o
setOption s (McastHops o) = setIntOpt s mcastHops o
setOption s (ReceiveHighWM o) = setIntOpt s receiveHighWM o
setOption s (ReceiveTimeout o) = setIntOpt s receiveTimeout o
setOption s (SendHighWM o) = setIntOpt s sendHighWM o
setOption s (SendTimeout o) = setIntOpt s sendTimeout o
#endif
getOption :: Socket a -> SocketOption -> IO SocketOption
getOption s (Affinity _) = Affinity <$> getIntOpt s affinity
getOption s (Backlog _) = Backlog <$> getIntOpt s backlog
getOption s (Events _) = Events . toEvent <$> getIntOpt s events
getOption s (FD _) = FD <$> getIntOpt s filedesc
getOption s (Identity _) = Identity <$> getStrOpt s identity
getOption s (Linger _) = Linger <$> getIntOpt s linger
getOption s (Rate _) = Rate <$> getIntOpt s rate
getOption s (ReceiveBuf _) = ReceiveBuf <$> getIntOpt s receiveBuf
getOption s (ReceiveMore _) = ReceiveMore <$> getBoolOpt s receiveMore
getOption s (ReconnectIVL _) = ReconnectIVL <$> getIntOpt s reconnectIVL
getOption s (ReconnectIVLMax _) = ReconnectIVLMax <$> getIntOpt s reconnectIVLMax
getOption s (RecoveryIVL _) = RecoveryIVL <$> getIntOpt s recoveryIVL
getOption s (SendBuf _) = SendBuf <$> getIntOpt s sendBuf
#ifdef ZMQ2
getOption s (HighWM _) = HighWM <$> getIntOpt s highWM
getOption s (McastLoop _) = McastLoop <$> getBoolOpt s mcastLoop
getOption s (RecoveryIVLMsec _) = RecoveryIVLMsec <$> getIntOpt s recoveryIVLMsec
getOption s (Swap _) = Swap <$> getIntOpt s swap
#endif
#ifdef ZMQ3
getOption s (IPv4Only _) = IPv4Only <$> getBoolOpt s ipv4Only
getOption s (MaxMsgSize _) = MaxMsgSize <$> getIntOpt s maxMessageSize
getOption s (McastHops _) = McastHops <$> getIntOpt s mcastHops
getOption s (ReceiveHighWM _) = ReceiveHighWM <$> getIntOpt s receiveHighWM
getOption s (ReceiveTimeout _) = ReceiveTimeout <$> getIntOpt s receiveTimeout
getOption s (SendHighWM _) = SendHighWM <$> getIntOpt s sendHighWM
getOption s (SendTimeout _) = SendTimeout <$> getIntOpt s sendTimeout
getMsgOption :: Message -> MessageOption -> IO MessageOption
getMsgOption m (MoreMsgParts _) = MoreMsgParts <$> getIntMsgOpt m more
#endif
zmqVersion :: (Int, Int, Int)
zmqVersion = B.zmqVersion
init :: Size -> IO Context
init ioThreads = do
c <- throwErrnoIfNull "init" $ c_zmq_init (fromIntegral ioThreads)
return (Context c)
term :: Context -> IO ()
term = throwErrnoIfMinus1_ "term" . c_zmq_term . ctx
withContext :: Size -> (Context -> IO a) -> IO a
withContext ioThreads act =
bracket (throwErrnoIfNull "c_zmq_init" $ c_zmq_init (fromIntegral ioThreads))
(throwErrnoIfMinus1_ "c_zmq_term" . c_zmq_term)
(act . Context)
withSocket :: SType a => Context -> a -> (Socket a -> IO b) -> IO b
withSocket c t = bracket (socket c t) close
socket :: SType a => Context -> a -> IO (Socket a)
socket (Context c) t = do
let zt = typeVal . zmqSocketType $ t
s <- throwErrnoIfNull "socket" (c_zmq_socket c zt)
sock@(Socket _ status) <- mkSocket s
addFinalizer sock $ do
alive <- atomicModifyIORef status (\b -> (False, b))
when alive $ c_zmq_close s >> return ()
return sock
close :: Socket a -> IO ()
close sock@(Socket _ status) = onSocket "close" sock $ \s -> do
alive <- atomicModifyIORef status (\b -> (False, b))
when alive $ throwErrnoIfMinus1_ "close" . c_zmq_close $ s
subscribe :: SubsType a => Socket a -> String -> IO ()
subscribe s = setStrOpt s B.subscribe
unsubscribe :: SubsType a => Socket a -> String -> IO ()
unsubscribe s = setStrOpt s B.unsubscribe
moreToReceive :: Socket a -> IO Bool
moreToReceive s = getBoolOpt s receiveMore
bind :: Socket a -> String -> IO ()
bind sock str = onSocket "bind" sock $
throwErrnoIfMinus1_ "bind" . withCString str . c_zmq_bind
connect :: Socket a -> String -> IO ()
connect sock str = onSocket "connect" sock $
throwErrnoIfMinus1_ "connect" . withCString str . c_zmq_connect
send :: Socket a -> SB.ByteString -> [Flag] -> IO ()
send sock val fls = bracket (messageOf val) messageClose $ \m ->
onSocket "send" sock $ \s ->
retry "send" (waitWrite sock) $
c_zmq_send s (msgPtr m) (combine (NoBlock : fls))
send' :: Socket a -> LB.ByteString -> [Flag] -> IO ()
send' sock val fls = bracket (messageOfLazy val) messageClose $ \m ->
onSocket "send'" sock $ \s ->
retry "send'" (waitWrite sock) $
c_zmq_send s (msgPtr m) (combine (NoBlock : fls))
receive :: Socket a -> [Flag] -> IO (SB.ByteString)
receive sock fls = bracket messageInit messageClose $ \m ->
onSocket "receive" sock $ \s -> do
retry "receive" (waitRead sock) $
c_zmq_recv s (msgPtr m) (combine (NoBlock : fls))
data_ptr <- c_zmq_msg_data (msgPtr m)
size <- c_zmq_msg_size (msgPtr m)
SB.packCStringLen (data_ptr, fromIntegral size)
poll :: [Poll] -> Timeout -> IO [Poll]
poll fds to = do
let len = length fds
ps = map createZMQPoll fds
withArray ps $ \ptr -> do
throwErrnoIfMinus1Retry_ "poll" $
c_zmq_poll ptr (fromIntegral len) (fromIntegral to)
ps' <- peekArray len ptr
return $ map createPoll (zip ps' fds)
where
createZMQPoll :: Poll -> ZMQPoll
createZMQPoll (S (Socket s _) e) =
ZMQPoll s 0 (fromEvent e) 0
createZMQPoll (F (Fd s) e) =
ZMQPoll nullPtr (fromIntegral s) (fromEvent e) 0
createPoll :: (ZMQPoll, Poll) -> Poll
createPoll (zp, S (Socket s t) _) =
S (Socket s t) (toEvent . fromIntegral . pRevents $ zp)
createPoll (zp, F fd _) =
F fd (toEvent . fromIntegral . pRevents $ zp)
fromEvent :: PollEvent -> CShort
fromEvent In = fromIntegral . pollVal $ pollIn
fromEvent Out = fromIntegral . pollVal $ pollOut
fromEvent InOut = fromIntegral . pollVal $ pollInOut
fromEvent Native = fromIntegral . pollVal $ pollerr
fromEvent None = 0
toEvent :: Word32 -> PollEvent
toEvent e | e == (fromIntegral . pollVal $ pollIn) = In
| e == (fromIntegral . pollVal $ pollOut) = Out
| e == (fromIntegral . pollVal $ pollInOut) = InOut
| e == (fromIntegral . pollVal $ pollerr) = Native
| otherwise = None
retry :: String -> IO () -> IO CInt -> IO ()
retry msg wait act = throwErrnoIfMinus1RetryMayBlock_ msg act wait
wait' :: (Fd -> IO ()) -> ZMQPollEvent -> Socket a -> IO ()
wait' w f s = do
fd <- getIntOpt s filedesc
w (Fd fd)
evs <- getIntOpt s events :: IO Word32
unless (testev evs) $
wait' w f s
where
testev e = e .&. fromIntegral (pollVal f) /= 0
waitRead, waitWrite :: Socket a -> IO ()
waitRead = wait' threadWaitRead pollIn
waitWrite = wait' threadWaitWrite pollOut
#ifdef ZMQ2
data Device =
Streamer
| Forwarder
| Queue
deriving (Eq, Ord, Show)
device :: Device -> Socket a -> Socket b -> IO ()
device device' insock outsock =
onSocket "device" insock $ \insocket ->
onSocket "device" outsock $ \outsocket ->
throwErrnoIfMinus1Retry_ "device" $
c_zmq_device (fromDevice device') insocket outsocket
where
fromDevice :: Device -> CInt
fromDevice Streamer = fromIntegral . deviceType $ deviceStreamer
fromDevice Forwarder = fromIntegral . deviceType $ deviceForwarder
fromDevice Queue = fromIntegral . deviceType $ deviceQueue
#endif