module System.ZMQ (
Size,
Context,
Socket,
Flag(..),
SocketOption(..),
Poll(..),
PollEvent(..),
Device(..),
SType,
SubsType,
Pair(..),
Pub(..),
Sub(..),
Req(..),
Rep(..),
XReq(..),
XRep(..),
Pull(..),
Push(..),
Up(..),
Down(..),
init,
term,
socket,
close,
setOption,
getOption,
System.ZMQ.subscribe,
System.ZMQ.unsubscribe,
bind,
connect,
send,
send',
receive,
moreToReceive,
poll,
device
) where
import Prelude hiding (init)
import Control.Applicative
import Control.Monad (foldM_)
import Control.Exception
import Data.Int
import Data.Maybe
import System.ZMQ.Base
import qualified System.ZMQ.Base as B
import Foreign
import Foreign.C.Error
import Foreign.C.String
import Foreign.C.Types (CInt, CShort, CSize)
import qualified Data.ByteString as SB
import qualified Data.ByteString.Lazy as LB
import qualified Data.ByteString.Unsafe as UB
import System.Posix.Types (Fd(..))
newtype Context = Context { ctx :: ZMQCtx }
newtype Socket a = Socket { sock :: ZMQSocket }
newtype Message = Message { msgPtr :: ZMQMsgPtr }
type Timeout = Int64
type Size = Word
class SType a where
zmqSocketType :: a -> ZMQSocketType
data Pair = Pair
instance SType Pair where
zmqSocketType = const pair
data Pub = Pub
instance SType Pub where
zmqSocketType = const pub
data Sub = Sub
instance SType Sub where
zmqSocketType = const sub
data Req = Req
instance SType Req where
zmqSocketType = const request
data Rep = Rep
instance SType Rep where
zmqSocketType = const response
data XReq = Xreq
instance SType XReq where
zmqSocketType = const xrequest
data XRep = Xrep
instance SType XRep where
zmqSocketType = const xresponse
data Pull = Pull
instance SType Pull where
zmqSocketType = const pull
data Push = Push
instance SType Push where
zmqSocketType = const push
data Up = Up
instance SType Up where
zmqSocketType = const upstream
data Down = Down
instance SType Down where
zmqSocketType = const downstream
class SubsType a
instance SubsType Sub
data SocketOption =
HighWM Word64
| Swap Int64
| Affinity Word64
| Identity String
| Rate Int64
| RecoveryIVL Int64
| RecoveryIVLMsec Int64
| McastLoop Int64
| SendBuf Word64
| ReceiveBuf Word64
| FD CInt
| Events Word32
| Linger CInt
| ReconnectIVL CInt
| Backlog CInt
deriving (Eq, Ord, Show)
data Flag = NoBlock
| SndMore
deriving (Eq, Ord, Show)
data PollEvent =
In
| Out
| InOut
| Native
deriving (Eq, Ord, Show)
data Poll =
forall a. S (Socket a) PollEvent
| F Fd PollEvent
data Device =
Streamer
| Forwarder
| Queue
deriving (Eq, Ord, Show)
init :: Size -> IO Context
init ioThreads = do
c <- throwErrnoIfNull "init" $ c_zmq_init (fromIntegral ioThreads)
return (Context c)
term :: Context -> IO ()
term = throwErrnoIfMinus1_ "term" . c_zmq_term . ctx
socket :: SType a => Context -> a -> IO (Socket a)
socket (Context c) t =
let zt = typeVal . zmqSocketType $ t
in Socket <$> throwErrnoIfNull "socket" (c_zmq_socket c zt)
close :: Socket a -> IO ()
close = throwErrnoIfMinus1_ "close" . c_zmq_close . sock
setOption :: Socket a -> SocketOption -> IO ()
setOption s (HighWM o) = setIntOpt s highWM o
setOption s (Swap o) = setIntOpt s swap o
setOption s (Affinity o) = setIntOpt s affinity o
setOption s (Identity o) = setStrOpt s identity o
setOption s (Rate o) = setIntOpt s rate o
setOption s (RecoveryIVL o) = setIntOpt s recoveryIVL o
setOption s (RecoveryIVLMsec o) = setIntOpt s recoveryIVLMsec o
setOption s (McastLoop o) = setIntOpt s mcastLoop o
setOption s (SendBuf o) = setIntOpt s sendBuf o
setOption s (ReceiveBuf o) = setIntOpt s receiveBuf o
setOption s (FD o) = setIntOpt s filedesc o
setOption s (Events o) = setIntOpt s events o
setOption s (Linger o) = setIntOpt s linger o
setOption s (ReconnectIVL o) = setIntOpt s reconnectIVL o
setOption s (Backlog o) = setIntOpt s backlog o
getOption :: Socket a -> SocketOption -> IO SocketOption
getOption s (HighWM _) = HighWM <$> getIntOpt s highWM
getOption s (Swap _) = Swap <$> getIntOpt s swap
getOption s (Affinity _) = Affinity <$> getIntOpt s affinity
getOption s (Identity _) = Identity <$> getStrOpt s identity
getOption s (Rate _) = Rate <$> getIntOpt s rate
getOption s (RecoveryIVL _) = RecoveryIVL <$> getIntOpt s recoveryIVL
getOption s (RecoveryIVLMsec _) = RecoveryIVLMsec <$> getIntOpt s recoveryIVLMsec
getOption s (McastLoop _) = McastLoop <$> getIntOpt s mcastLoop
getOption s (SendBuf _) = SendBuf <$> getIntOpt s sendBuf
getOption s (ReceiveBuf _) = ReceiveBuf <$> getIntOpt s receiveBuf
getOption s (FD _) = FD <$> getIntOpt s filedesc
getOption s (Events _) = Events <$> getIntOpt s events
getOption s (Linger _) = Linger <$> getIntOpt s linger
getOption s (ReconnectIVL _) = ReconnectIVL <$> getIntOpt s reconnectIVL
getOption s (Backlog _) = Backlog <$> getIntOpt s backlog
subscribe :: SubsType a => Socket a -> String -> IO ()
subscribe s = setStrOpt s B.subscribe
unsubscribe :: SubsType a => Socket a -> String -> IO ()
unsubscribe s = setStrOpt s B.unsubscribe
moreToReceive :: Socket a -> IO Bool
moreToReceive s = getBoolOpt s receiveMore
bind :: Socket a -> String -> IO ()
bind (Socket s) str = throwErrnoIfMinus1_ "bind" $
withCString str (c_zmq_bind s)
connect :: Socket a -> String -> IO ()
connect (Socket s) str = throwErrnoIfMinus1_ "connect" $
withCString str (c_zmq_connect s)
send :: Socket a -> SB.ByteString -> [Flag] -> IO ()
send (Socket s) val fls = bracket (messageOf val) messageClose $ \m ->
throwErrnoIfMinus1_ "send" $ c_zmq_send s (msgPtr m) (combine fls)
send' :: Socket a -> LB.ByteString -> [Flag] -> IO ()
send' (Socket s) val fls = bracket (messageOfLazy val) messageClose $ \m ->
throwErrnoIfMinus1_ "send'" $ c_zmq_send s (msgPtr m) (combine fls)
receive :: Socket a -> [Flag] -> IO (SB.ByteString)
receive (Socket s) fls = bracket messageInit messageClose $ \m -> do
throwErrnoIfMinus1Retry_ "receive" $ c_zmq_recv s (msgPtr m) (combine fls)
data_ptr <- c_zmq_msg_data (msgPtr m)
size <- c_zmq_msg_size (msgPtr m)
SB.packCStringLen (data_ptr, fromIntegral size)
poll :: [Poll] -> Timeout -> IO [Poll]
poll fds to = do
let len = length fds
ps = map createZMQPoll fds
withArray ps $ \ptr -> do
throwErrnoIfMinus1Retry_ "poll" $
c_zmq_poll ptr (fromIntegral len) (fromIntegral to)
ps' <- peekArray len ptr
createPoll ps' []
where
createZMQPoll :: Poll -> ZMQPoll
createZMQPoll (S (Socket s) e) =
ZMQPoll s 0 (fromEvent e) 0
createZMQPoll (F (Fd s) e) =
ZMQPoll nullPtr (fromIntegral s) (fromEvent e) 0
createPoll :: [ZMQPoll] -> [Poll] -> IO [Poll]
createPoll [] fd = return fd
createPoll (p:pp) fd = do
let s = pSocket p;
f = pFd p;
r = toEvent $ pRevents p
if isJust r
then createPoll pp (newPoll s f r:fd)
else createPoll pp fd
newPoll :: ZMQSocket -> CInt -> Maybe PollEvent -> Poll
newPoll s 0 r = S (Socket s) (fromJust r)
newPoll _ f r = F (Fd f) (fromJust r)
fromEvent :: PollEvent -> CShort
fromEvent In = fromIntegral . pollVal $ pollIn
fromEvent Out = fromIntegral . pollVal $ pollOut
fromEvent InOut = fromIntegral . pollVal $ pollInOut
fromEvent Native = fromIntegral . pollVal $ pollerr
toEvent :: CShort -> Maybe PollEvent
toEvent e | e == (fromIntegral . pollVal $ pollIn) = Just In
| e == (fromIntegral . pollVal $ pollOut) = Just Out
| e == (fromIntegral . pollVal $ pollInOut) = Just InOut
| e == (fromIntegral . pollVal $ pollerr) = Just Native
| otherwise = Nothing
device :: Device -> Socket a -> Socket b -> IO ()
device device' (Socket insocket) (Socket outsocket) =
throwErrnoIfMinus1_ "device" $
c_zmq_device (fromDevice device') insocket outsocket
where
fromDevice :: Device -> CInt
fromDevice Streamer = fromIntegral . deviceType $ deviceStreamer
fromDevice Forwarder = fromIntegral . deviceType $ deviceForwarder
fromDevice Queue = fromIntegral . deviceType $ deviceQueue
messageOf :: SB.ByteString -> IO Message
messageOf b = UB.unsafeUseAsCStringLen b $ \(cstr, len) -> do
msg <- messageInitSize (fromIntegral len)
data_ptr <- c_zmq_msg_data (msgPtr msg)
copyBytes data_ptr cstr len
return msg
messageOfLazy :: LB.ByteString -> IO Message
messageOfLazy lbs = do
msg <- messageInitSize (fromIntegral len)
data_ptr <- c_zmq_msg_data (msgPtr msg)
let fn offset bs = UB.unsafeUseAsCStringLen bs $ \(cstr, str_len) -> do
copyBytes (data_ptr `plusPtr` offset) cstr str_len
return (offset + str_len)
foldM_ fn 0 (LB.toChunks lbs)
return msg
where
len = LB.length lbs
messageClose :: Message -> IO ()
messageClose (Message ptr) = do
throwErrnoIfMinus1_ "messageClose" $ c_zmq_msg_close ptr
free ptr
messageInit :: IO Message
messageInit = do
ptr <- new (ZMQMsg nullPtr)
throwErrnoIfMinus1_ "messageInit" $ c_zmq_msg_init ptr
return (Message ptr)
messageInitSize :: Size -> IO Message
messageInitSize s = do
ptr <- new (ZMQMsg nullPtr)
throwErrnoIfMinus1_ "messageInitSize" $
c_zmq_msg_init_size ptr (fromIntegral s)
return (Message ptr)
setIntOpt :: (Storable b, Integral b) => Socket a -> ZMQOption -> b -> IO ()
setIntOpt (Socket s) (ZMQOption o) i =
throwErrnoIfMinus1_ "setIntOpt" $ with i $ \ptr ->
c_zmq_setsockopt s (fromIntegral o)
(castPtr ptr)
(fromIntegral . sizeOf $ i)
setStrOpt :: Socket a -> ZMQOption -> String -> IO ()
setStrOpt (Socket s) (ZMQOption o) str = throwErrnoIfMinus1_ "setStrOpt" $
withCStringLen str $ \(cstr, len) ->
c_zmq_setsockopt s (fromIntegral o)
(castPtr cstr)
(fromIntegral len)
getBoolOpt :: Socket a -> ZMQOption -> IO Bool
getBoolOpt s o = ((1 :: Int64) ==) <$> getIntOpt s o
getIntOpt :: (Storable b, Integral b) => Socket a -> ZMQOption -> IO b
getIntOpt (Socket s) (ZMQOption o) = do
let i = 0
bracket (new i) free $ \iptr ->
bracket (new (fromIntegral . sizeOf $ i :: CSize)) free $ \jptr -> do
throwErrnoIfMinus1_ "integralOpt" $
c_zmq_getsockopt s (fromIntegral o) (castPtr iptr) jptr
peek iptr
getStrOpt :: Socket a -> ZMQOption -> IO String
getStrOpt (Socket s) (ZMQOption o) =
bracket (mallocBytes 255) free $ \bPtr ->
bracket (new (255 :: CSize)) free $ \sPtr -> do
throwErrnoIfMinus1_ "getStrOpt" $
c_zmq_getsockopt s (fromIntegral o) (castPtr bPtr) sPtr
peek sPtr >>= \len -> peekCStringLen (bPtr, fromIntegral len)
toZMQFlag :: Flag -> ZMQFlag
toZMQFlag NoBlock = noBlock
toZMQFlag SndMore = sndMore
combine :: [Flag] -> CInt
combine = fromIntegral . foldr ((.|.) . flagVal . toZMQFlag) 0