module System.ZMQ (
Size
, Context
, Socket
, Flag(..)
, SocketOption(..)
, Poll(..)
, Timeout
, PollEvent(..)
, SType
, SubsType
, Pair(..)
, Pub(..)
, Sub(..)
, Req(..)
, Rep(..)
, XReq(..)
, XRep(..)
, Dealer(..)
, Router(..)
, Pull(..)
, Push(..)
, Up(..)
, Down(..)
, withContext
, withSocket
, setOption
, getOption
, System.ZMQ.subscribe
, System.ZMQ.unsubscribe
, bind
, connect
, send
, send'
, receive
, moreToReceive
, poll
, version
, init
, term
, socket
, close
, Device(..)
, device
) where
import Prelude hiding (init)
import Control.Applicative
import Control.Exception
import Control.Monad (unless, when)
import Data.IORef (atomicModifyIORef)
import Foreign
import Foreign.C.Error
import Foreign.C.String
import Foreign.C.Types (CInt, CShort)
import qualified Data.ByteString as SB
import qualified Data.ByteString.Lazy as LB
import System.Posix.Types (Fd(..))
import System.ZMQ.Base
import qualified System.ZMQ.Base as B
import System.ZMQ.Internal
import GHC.Conc (threadWaitRead, threadWaitWrite)
class SType a where
zmqSocketType :: a -> ZMQSocketType
data Pair = Pair
instance SType Pair where
zmqSocketType = const pair
data Pub = Pub
instance SType Pub where
zmqSocketType = const pub
data Sub = Sub
instance SType Sub where
zmqSocketType = const sub
data Req = Req
instance SType Req where
zmqSocketType = const request
data Rep = Rep
instance SType Rep where
zmqSocketType = const response
data XReq = XReq
instance SType XReq where
zmqSocketType = const xrequest
data Dealer = Dealer
instance SType Dealer where
zmqSocketType = const dealer
data XRep = XRep
instance SType XRep where
zmqSocketType = const xresponse
data Router = Router
instance SType Router where
zmqSocketType = const router
data Pull = Pull
instance SType Pull where
zmqSocketType = const pull
data Push = Push
instance SType Push where
zmqSocketType = const push
data Up = Up
instance SType Up where
zmqSocketType = const upstream
data Down = Down
instance SType Down where
zmqSocketType = const downstream
class SubsType a
instance SubsType Sub
data SocketOption =
Affinity Word64
| Backlog CInt
| Events PollEvent
| FD CInt
| Identity String
| Linger CInt
| Rate Int64
| ReceiveBuf Word64
| ReceiveMore Bool
| ReconnectIVL CInt
| ReconnectIVLMax CInt
| RecoveryIVL Int64
| SendBuf Word64
| HighWM Word64
| McastLoop Bool
| RecoveryIVLMsec Int64
| Swap Int64
deriving (Eq, Ord, Show, Read)
data PollEvent =
In
| Out
| InOut
| Native
| None
deriving (Eq, Ord, Show, Read)
data Poll =
forall a. S (Socket a) PollEvent
| F Fd PollEvent
setOption :: Socket a -> SocketOption -> IO ()
setOption s (Affinity o) = setIntOpt s affinity o
setOption s (Backlog o) = setIntOpt s backlog o
setOption _ (Events _) = return ()
setOption _ (FD _) = return ()
setOption s (Identity o) = setStrOpt s identity o
setOption s (Linger o) = setIntOpt s linger o
setOption s (Rate o) = setIntOpt s rate o
setOption s (ReceiveBuf o) = setIntOpt s receiveBuf o
setOption _ (ReceiveMore _) = return ()
setOption s (ReconnectIVL o) = setIntOpt s reconnectIVL o
setOption s (ReconnectIVLMax o) = setIntOpt s reconnectIVLMax o
setOption s (RecoveryIVL o) = setIntOpt s recoveryIVL o
setOption s (SendBuf o) = setIntOpt s sendBuf o
setOption s (HighWM o) = setIntOpt s highWM o
setOption s (McastLoop o) = setBoolOpt s mcastLoop o
setOption s (RecoveryIVLMsec o) = setIntOpt s recoveryIVLMsec o
setOption s (Swap o) = setIntOpt s swap o
getOption :: Socket a -> SocketOption -> IO SocketOption
getOption s (Affinity _) = Affinity <$> getIntOpt s affinity
getOption s (Backlog _) = Backlog <$> getIntOpt s backlog
getOption s (Events _) = Events . toEvent <$> getIntOpt s events
getOption s (FD _) = FD <$> getIntOpt s filedesc
getOption s (Identity _) = Identity <$> getStrOpt s identity
getOption s (Linger _) = Linger <$> getIntOpt s linger
getOption s (Rate _) = Rate <$> getIntOpt s rate
getOption s (ReceiveBuf _) = ReceiveBuf <$> getIntOpt s receiveBuf
getOption s (ReceiveMore _) = ReceiveMore <$> getBoolOpt s receiveMore
getOption s (ReconnectIVL _) = ReconnectIVL <$> getIntOpt s reconnectIVL
getOption s (ReconnectIVLMax _) = ReconnectIVLMax <$> getIntOpt s reconnectIVLMax
getOption s (RecoveryIVL _) = RecoveryIVL <$> getIntOpt s recoveryIVL
getOption s (SendBuf _) = SendBuf <$> getIntOpt s sendBuf
getOption s (HighWM _) = HighWM <$> getIntOpt s highWM
getOption s (McastLoop _) = McastLoop <$> getBoolOpt s mcastLoop
getOption s (RecoveryIVLMsec _) = RecoveryIVLMsec <$> getIntOpt s recoveryIVLMsec
getOption s (Swap _) = Swap <$> getIntOpt s swap
version :: IO (Int, Int, Int)
version =
with 0 $ \major_ptr ->
with 0 $ \minor_ptr ->
with 0 $ \patch_ptr ->
c_zmq_version major_ptr minor_ptr patch_ptr >>
tupleUp <$> peek major_ptr <*> peek minor_ptr <*> peek patch_ptr
where
tupleUp a b c = (fromIntegral a, fromIntegral b, fromIntegral c)
init :: Size -> IO Context
init ioThreads = do
c <- throwErrnoIfNull "init" $ c_zmq_init (fromIntegral ioThreads)
return (Context c)
term :: Context -> IO ()
term = throwErrnoIfMinus1Retry_ "term" . c_zmq_term . ctx
withContext :: Size -> (Context -> IO a) -> IO a
withContext ioThreads act =
bracket (throwErrnoIfNull "c_zmq_init" $ c_zmq_init (fromIntegral ioThreads))
(throwErrnoIfMinus1Retry_ "c_zmq_term" . c_zmq_term)
(act . Context)
withSocket :: SType a => Context -> a -> (Socket a -> IO b) -> IO b
withSocket c t = bracket (socket c t) close
socket :: SType a => Context -> a -> IO (Socket a)
socket (Context c) t = do
let zt = typeVal . zmqSocketType $ t
throwErrnoIfNull "socket" (c_zmq_socket c zt) >>= mkSocket
close :: Socket a -> IO ()
close sock@(Socket _ status) = onSocket "close" sock $ \s -> do
alive <- atomicModifyIORef status (\b -> (False, b))
when alive $ throwErrnoIfMinus1_ "close" . c_zmq_close $ s
subscribe :: SubsType a => Socket a -> String -> IO ()
subscribe s = setStrOpt s B.subscribe
unsubscribe :: SubsType a => Socket a -> String -> IO ()
unsubscribe s = setStrOpt s B.unsubscribe
moreToReceive :: Socket a -> IO Bool
moreToReceive s = getBoolOpt s receiveMore
bind :: Socket a -> String -> IO ()
bind sock str = onSocket "bind" sock $
throwErrnoIfMinus1_ "bind" . withCString str . c_zmq_bind
connect :: Socket a -> String -> IO ()
connect sock str = onSocket "connect" sock $
throwErrnoIfMinus1_ "connect" . withCString str . c_zmq_connect
send :: Socket a -> SB.ByteString -> [Flag] -> IO ()
send sock val fls = bracket (messageOf val) messageClose $ \m ->
onSocket "send" sock $ \s ->
retry "send" (waitWrite sock) $
c_zmq_send s (msgPtr m) (combine (NoBlock : fls))
send' :: Socket a -> LB.ByteString -> [Flag] -> IO ()
send' sock val fls = bracket (messageOfLazy val) messageClose $ \m ->
onSocket "send'" sock $ \s ->
retry "send'" (waitWrite sock) $
c_zmq_send s (msgPtr m) (combine (NoBlock : fls))
receive :: Socket a -> [Flag] -> IO (SB.ByteString)
receive sock fls = bracket messageInit messageClose $ \m ->
onSocket "receive" sock $ \s -> do
retry "receive" (waitRead sock) $
c_zmq_recv s (msgPtr m) (combine (NoBlock : fls))
data_ptr <- c_zmq_msg_data (msgPtr m)
size <- c_zmq_msg_size (msgPtr m)
SB.packCStringLen (data_ptr, fromIntegral size)
poll :: [Poll] -> Timeout -> IO [Poll]
poll fds to = do
let len = length fds
ps = map createZMQPoll fds
withArray ps $ \ptr -> do
throwErrnoIfMinus1Retry_ "poll" $
c_zmq_poll ptr (fromIntegral len) (fromIntegral to)
ps' <- peekArray len ptr
return $ map createPoll (zip ps' fds)
where
createZMQPoll :: Poll -> ZMQPoll
createZMQPoll (S (Socket s _) e) =
ZMQPoll s 0 (fromEvent e) 0
createZMQPoll (F (Fd s) e) =
ZMQPoll nullPtr (fromIntegral s) (fromEvent e) 0
createPoll :: (ZMQPoll, Poll) -> Poll
createPoll (zp, S (Socket s t) _) =
S (Socket s t) (toEvent . fromIntegral . pRevents $ zp)
createPoll (zp, F fd _) =
F fd (toEvent . fromIntegral . pRevents $ zp)
fromEvent :: PollEvent -> CShort
fromEvent In = fromIntegral . pollVal $ pollIn
fromEvent Out = fromIntegral . pollVal $ pollOut
fromEvent InOut = fromIntegral . pollVal $ pollInOut
fromEvent Native = fromIntegral . pollVal $ pollerr
fromEvent None = 0
toEvent :: Word32 -> PollEvent
toEvent e | e == (fromIntegral . pollVal $ pollIn) = In
| e == (fromIntegral . pollVal $ pollOut) = Out
| e == (fromIntegral . pollVal $ pollInOut) = InOut
| e == (fromIntegral . pollVal $ pollerr) = Native
| otherwise = None
retry :: String -> IO () -> IO CInt -> IO ()
retry msg wait act = throwErrnoIfMinus1RetryMayBlock_ msg act wait
wait' :: (Fd -> IO ()) -> ZMQPollEvent -> Socket a -> IO ()
wait' w f s = do
fd <- getIntOpt s filedesc
w (Fd fd)
evs <- getIntOpt s events :: IO Word32
unless (testev evs) $
wait' w f s
where
testev e = e .&. fromIntegral (pollVal f) /= 0
waitRead, waitWrite :: Socket a -> IO ()
waitRead = wait' threadWaitRead pollIn
waitWrite = wait' threadWaitWrite pollOut
data Device =
Streamer
| Forwarder
| Queue
deriving (Eq, Ord, Show)
device :: Device -> Socket a -> Socket b -> IO ()
device device' insock outsock =
onSocket "device" insock $ \insocket ->
onSocket "device" outsock $ \outsocket ->
throwErrnoIfMinus1Retry_ "device" $
c_zmq_device (fromDevice device') insocket outsocket
where
fromDevice :: Device -> CInt
fromDevice Streamer = fromIntegral . deviceType $ deviceStreamer
fromDevice Forwarder = fromIntegral . deviceType $ deviceForwarder
fromDevice Queue = fromIntegral . deviceType $ deviceQueue