{-# LANGUAGE OverloadedStrings #-}
module Network.ZRE.ZMQ (zreRouter, zreDealer) where

import Control.Monad
import Control.Concurrent.STM
import Control.Monad.IO.Class
import qualified System.ZMQ4.Monadic as ZMQ
import qualified Data.ByteString.Char8 as B
import qualified Data.List.NonEmpty as NE
import Data.Time.Clock.POSIX

import Data.ZRE
import System.ZMQ4.Endpoint

zreDealer :: Control.Monad.IO.Class.MonadIO m
          => Endpoint
          -> B.ByteString
          -> TBQueue ZRECmd
          -> m a
zreDealer :: Endpoint -> ByteString -> TBQueue ZRECmd -> m a
zreDealer endpoint :: Endpoint
endpoint ourUUID :: ByteString
ourUUID peerQ :: TBQueue ZRECmd
peerQ = (forall z. ZMQ z a) -> m a
forall (m :: * -> *) a. MonadIO m => (forall z. ZMQ z a) -> m a
ZMQ.runZMQ ((forall z. ZMQ z a) -> m a) -> (forall z. ZMQ z a) -> m a
forall a b. (a -> b) -> a -> b
$ do
  Socket z Dealer
d <- Dealer -> ZMQ z (Socket z Dealer)
forall t z. SocketType t => t -> ZMQ z (Socket z t)
ZMQ.socket Dealer
ZMQ.Dealer
  Restricted (Nneg1, Int32) Int -> Socket z Dealer -> ZMQ z ()
forall i z t.
Integral i =>
Restricted (Nneg1, Int32) i -> Socket z t -> ZMQ z ()
ZMQ.setLinger (Int -> Restricted (Nneg1, Int32) Int
forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict (1 :: Int)) Socket z Dealer
d
  -- The sender MAY set a high-water mark (HWM) of, for example, 100 messages per second (if the timeout period is 30 second, this means a HWM of 3,000 messages).
  Restricted (N0, Int32) Int -> Socket z Dealer -> ZMQ z ()
forall i z t.
Integral i =>
Restricted (N0, Int32) i -> Socket z t -> ZMQ z ()
ZMQ.setSendHighWM (Int -> Restricted (N0, Int32) Int
forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict (Int -> Restricted (N0, Int32) Int)
-> Int -> Restricted (N0, Int32) Int
forall a b. (a -> b) -> a -> b
$ (30 Int -> Int -> Int
forall a. Num a => a -> a -> a
* 100 :: Int)) Socket z Dealer
d
  Restricted (Nneg1, Int32) Int -> Socket z Dealer -> ZMQ z ()
forall i z t.
Integral i =>
Restricted (Nneg1, Int32) i -> Socket z t -> ZMQ z ()
ZMQ.setSendTimeout (Int -> Restricted (Nneg1, Int32) Int
forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict (0 :: Int)) Socket z Dealer
d
  -- prepend '1' in front of 16bit UUID, ZMQ.restrict would do that for us but protocol requires it
  Restricted (N1, N254) ByteString -> Socket z Dealer -> ZMQ z ()
forall z t.
Restricted (N1, N254) ByteString -> Socket z t -> ZMQ z ()
ZMQ.setIdentity (ByteString -> Restricted (N1, N254) ByteString
forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict (ByteString -> Restricted (N1, N254) ByteString)
-> ByteString -> Restricted (N1, N254) ByteString
forall a b. (a -> b) -> a -> b
$ Char -> ByteString -> ByteString
B.cons '1' ByteString
ourUUID) Socket z Dealer
d
  Socket z Dealer -> String -> ZMQ z ()
forall z t. Socket z t -> String -> ZMQ z ()
ZMQ.connect Socket z Dealer
d (String -> ZMQ z ()) -> String -> ZMQ z ()
forall a b. (a -> b) -> a -> b
$ ByteString -> String
B.unpack (ByteString -> String) -> ByteString -> String
forall a b. (a -> b) -> a -> b
$ Endpoint -> ByteString
pEndpoint Endpoint
endpoint
  Socket z Dealer -> Int -> ZMQ z a
forall t z b. Sender t => Socket z t -> Int -> ZMQ z b
loop Socket z Dealer
d 1 -- sequence number must start with 1
  where loop :: Socket z t -> Int -> ZMQ z b
loop d :: Socket z t
d x :: Int
x = do
           ZRECmd
cmd <- IO ZRECmd -> ZMQ z ZRECmd
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ZRECmd -> ZMQ z ZRECmd) -> IO ZRECmd -> ZMQ z ZRECmd
forall a b. (a -> b) -> a -> b
$ STM ZRECmd -> IO ZRECmd
forall a. STM a -> IO a
atomically (STM ZRECmd -> IO ZRECmd) -> STM ZRECmd -> IO ZRECmd
forall a b. (a -> b) -> a -> b
$ TBQueue ZRECmd -> STM ZRECmd
forall a. TBQueue a -> STM a
readTBQueue TBQueue ZRECmd
peerQ
           -- liftIO $ print "Sending" >> (print $ newZRE x cmd)
           Socket z t -> NonEmpty ByteString -> ZMQ z ()
forall t z.
Sender t =>
Socket z t -> NonEmpty ByteString -> ZMQ z ()
ZMQ.sendMulti Socket z t
d (NonEmpty ByteString -> ZMQ z ())
-> NonEmpty ByteString -> ZMQ z ()
forall a b. (a -> b) -> a -> b
$ ([ByteString] -> NonEmpty ByteString
forall a. [a] -> NonEmpty a
NE.fromList ([ByteString] -> NonEmpty ByteString)
-> [ByteString] -> NonEmpty ByteString
forall a b. (a -> b) -> a -> b
$ ZREMsg -> [ByteString]
encodeZRE (ZREMsg -> [ByteString]) -> ZREMsg -> [ByteString]
forall a b. (a -> b) -> a -> b
$ Int -> ZRECmd -> ZREMsg
newZRE Int
x ZRECmd
cmd :: NE.NonEmpty B.ByteString)
           Socket z t -> Int -> ZMQ z b
loop Socket z t
d (Int
xInt -> Int -> Int
forall a. Num a => a -> a -> a
+1)

zreRouter :: Control.Monad.IO.Class.MonadIO m
          => Endpoint
          -> (ZREMsg -> IO a1)
          -> m a
zreRouter :: Endpoint -> (ZREMsg -> IO a1) -> m a
zreRouter endpoint :: Endpoint
endpoint handler :: ZREMsg -> IO a1
handler = (forall z. ZMQ z a) -> m a
forall (m :: * -> *) a. MonadIO m => (forall z. ZMQ z a) -> m a
ZMQ.runZMQ ((forall z. ZMQ z a) -> m a) -> (forall z. ZMQ z a) -> m a
forall a b. (a -> b) -> a -> b
$ do
  Socket z Router
sock <- Router -> ZMQ z (Socket z Router)
forall t z. SocketType t => t -> ZMQ z (Socket z t)
ZMQ.socket Router
ZMQ.Router
  Socket z Router -> String -> ZMQ z ()
forall z t. Socket z t -> String -> ZMQ z ()
ZMQ.bind Socket z Router
sock (String -> ZMQ z ()) -> String -> ZMQ z ()
forall a b. (a -> b) -> a -> b
$ ByteString -> String
B.unpack (ByteString -> String) -> ByteString -> String
forall a b. (a -> b) -> a -> b
$ Endpoint -> ByteString
pEndpoint Endpoint
endpoint
  ZMQ z () -> ZMQ z a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ZMQ z () -> ZMQ z a) -> ZMQ z () -> ZMQ z a
forall a b. (a -> b) -> a -> b
$ do
     [ByteString]
input <- Socket z Router -> ZMQ z [ByteString]
forall t z. Receiver t => Socket z t -> ZMQ z [ByteString]
ZMQ.receiveMulti Socket z Router
sock
     UTCTime
now <- IO UTCTime -> ZMQ z UTCTime
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO UTCTime -> ZMQ z UTCTime) -> IO UTCTime -> ZMQ z UTCTime
forall a b. (a -> b) -> a -> b
$ IO UTCTime
getCurrentTime
     case [ByteString] -> Either String ZREMsg
parseZRE [ByteString]
input of
        Left err :: String
err -> IO () -> ZMQ z ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ZMQ z ()) -> IO () -> ZMQ z ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. Show a => a -> IO ()
print (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ "Malformed message received: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
err
        Right msg :: ZREMsg
msg -> do
          let updateTime :: ZREMsg -> ZREMsg
updateTime = \x :: ZREMsg
x -> ZREMsg
x { msgTime :: Maybe UTCTime
msgTime = UTCTime -> Maybe UTCTime
forall a. a -> Maybe a
Just UTCTime
now }
          ZMQ z a1 -> ZMQ z ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ZMQ z a1 -> ZMQ z ()) -> ZMQ z a1 -> ZMQ z ()
forall a b. (a -> b) -> a -> b
$ IO a1 -> ZMQ z a1
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a1 -> ZMQ z a1) -> IO a1 -> ZMQ z a1
forall a b. (a -> b) -> a -> b
$ ZREMsg -> IO a1
handler (ZREMsg -> ZREMsg
updateTime ZREMsg
msg)
          () -> ZMQ z ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()