{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}

{-# LANGUAGE FlexibleContexts #-}
module Network.ZGossip.ZMQ (zgossipDealer, zgossipRouter) where

import Control.Monad
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.IO.Class

import Data.ByteString (ByteString)
import qualified System.ZMQ4.Monadic as ZMQ
import qualified Data.ByteString.Char8 as B
import qualified Data.List.NonEmpty as NE

import Data.Maybe

import Data.ZGossip
import System.ZMQ4.Endpoint

zgossipDealer :: Control.Monad.IO.Class.MonadIO m
              => Endpoint
              -> ByteString
              -> TBQueue ZGSCmd
              -> (ZGSMsg -> IO ())
              -> m a
zgossipDealer :: Endpoint
-> ByteString -> TBQueue ZGSCmd -> (ZGSMsg -> IO ()) -> m a
zgossipDealer Endpoint
endpoint ByteString
ourUUID TBQueue ZGSCmd
peerQ ZGSMsg -> IO ()
handler = (forall z. ZMQ z a) -> m a
forall (m :: * -> *) a. MonadIO m => (forall z. ZMQ z a) -> m a
ZMQ.runZMQ ((forall z. ZMQ z a) -> m a) -> (forall z. ZMQ z a) -> m a
forall a b. (a -> b) -> a -> b
$ do
  Socket z Dealer
d <- Dealer -> ZMQ z (Socket z Dealer)
forall t z. SocketType t => t -> ZMQ z (Socket z t)
ZMQ.socket Dealer
ZMQ.Dealer
  Restricted (Nneg1, Int32) Int -> Socket z Dealer -> ZMQ z ()
forall i z t.
Integral i =>
Restricted (Nneg1, Int32) i -> Socket z t -> ZMQ z ()
ZMQ.setLinger (Int -> Restricted (Nneg1, Int32) Int
forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict (Int
1 :: Int)) Socket z Dealer
d
  -- Never block on sending; we use an infinite HWM and buffer as many
  -- messages as needed in outgoing pipes. Note that the maximum number
  -- is the overall tuple set size
  Restricted (N0, Int32) Int -> Socket z Dealer -> ZMQ z ()
forall i z t.
Integral i =>
Restricted (N0, Int32) i -> Socket z t -> ZMQ z ()
ZMQ.setSendHighWM (Int -> Restricted (N0, Int32) Int
forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict (Int
0 :: Int)) Socket z Dealer
d
  Restricted (Nneg1, Int32) Int -> Socket z Dealer -> ZMQ z ()
forall i z t.
Integral i =>
Restricted (Nneg1, Int32) i -> Socket z t -> ZMQ z ()
ZMQ.setSendTimeout (Int -> Restricted (Nneg1, Int32) Int
forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict (Int
0 :: Int)) Socket z Dealer
d
  Restricted (N1, N254) ByteString -> Socket z Dealer -> ZMQ z ()
forall z t.
Restricted (N1, N254) ByteString -> Socket z t -> ZMQ z ()
ZMQ.setIdentity (ByteString -> Restricted (N1, N254) ByteString
forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict (ByteString -> Restricted (N1, N254) ByteString)
-> ByteString -> Restricted (N1, N254) ByteString
forall a b. (a -> b) -> a -> b
$ ByteString
ourUUID) Socket z Dealer
d
  Socket z Dealer -> String -> ZMQ z ()
forall z t. Socket z t -> String -> ZMQ z ()
ZMQ.connect Socket z Dealer
d (String -> ZMQ z ()) -> String -> ZMQ z ()
forall a b. (a -> b) -> a -> b
$ ByteString -> String
B.unpack (ByteString -> String) -> ByteString -> String
forall a b. (a -> b) -> a -> b
$ Endpoint -> ByteString
pEndpoint Endpoint
endpoint
  let spam :: ZMQ z b
spam = ZMQ z () -> ZMQ z b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ZMQ z () -> ZMQ z b) -> ZMQ z () -> ZMQ z b
forall a b. (a -> b) -> a -> b
$ do
        ZGSCmd
cmd <- IO ZGSCmd -> ZMQ z ZGSCmd
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ZGSCmd -> ZMQ z ZGSCmd) -> IO ZGSCmd -> ZMQ z ZGSCmd
forall a b. (a -> b) -> a -> b
$ STM ZGSCmd -> IO ZGSCmd
forall a. STM a -> IO a
atomically (STM ZGSCmd -> IO ZGSCmd) -> STM ZGSCmd -> IO ZGSCmd
forall a b. (a -> b) -> a -> b
$ TBQueue ZGSCmd -> STM ZGSCmd
forall a. TBQueue a -> STM a
readTBQueue TBQueue ZGSCmd
peerQ
        --liftIO $ print "Spreading gossip" >> (print $ newZGS cmd)
        Socket z Dealer -> NonEmpty ByteString -> ZMQ z ()
forall t z.
Sender t =>
Socket z t -> NonEmpty ByteString -> ZMQ z ()
ZMQ.sendMulti Socket z Dealer
d (NonEmpty ByteString -> ZMQ z ())
-> NonEmpty ByteString -> ZMQ z ()
forall a b. (a -> b) -> a -> b
$ ([ByteString] -> NonEmpty ByteString
forall a. [a] -> NonEmpty a
NE.fromList [ZGSMsg -> ByteString
encodeZGS (ZGSMsg -> ByteString) -> ZGSMsg -> ByteString
forall a b. (a -> b) -> a -> b
$ ZGSCmd -> ZGSMsg
newZGS ZGSCmd
cmd] :: NE.NonEmpty ByteString)

  let recv :: ZMQ z b
recv = ZMQ z () -> ZMQ z b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ZMQ z () -> ZMQ z b) -> ZMQ z () -> ZMQ z b
forall a b. (a -> b) -> a -> b
$ do
        [ByteString]
input <- Socket z Dealer -> ZMQ z [ByteString]
forall t z. Receiver t => Socket z t -> ZMQ z [ByteString]
ZMQ.receiveMulti Socket z Dealer
d
        case [ByteString] -> Either String ZGSMsg
parseZGS [ByteString]
input of
           Left String
err -> do
             IO () -> ZMQ z ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ZMQ z ()) -> IO () -> ZMQ z ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. Show a => a -> IO ()
print (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Malformed gossip message received: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
err
             IO () -> ZMQ z ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ZMQ z ()) -> IO () -> ZMQ z ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> IO ()
forall a. Show a => a -> IO ()
print [ByteString]
input
           Right ZGSMsg
msg -> do
             IO () -> ZMQ z ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ZMQ z ()) -> IO () -> ZMQ z ()
forall a b. (a -> b) -> a -> b
$ ZGSMsg -> IO ()
handler ZGSMsg
msg

  Async a
sa <- ZMQ z a -> ZMQ z (Async a)
forall z a. ZMQ z a -> ZMQ z (Async a)
ZMQ.async ZMQ z a
forall b. ZMQ z b
spam
  ZMQ z (Async Any) -> ZMQ z ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (ZMQ z (Async Any) -> ZMQ z ()) -> ZMQ z (Async Any) -> ZMQ z ()
forall a b. (a -> b) -> a -> b
$ ZMQ z Any -> ZMQ z (Async Any)
forall z a. ZMQ z a -> ZMQ z (Async a)
ZMQ.async ZMQ z Any
forall b. ZMQ z b
recv
  IO a -> ZMQ z a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> ZMQ z a) -> IO a -> ZMQ z a
forall a b. (a -> b) -> a -> b
$ Async a -> IO a
forall a. Async a -> IO a
wait Async a
sa

zgossipRouter :: (Foldable t, Control.Monad.IO.Class.MonadIO m)
              => Endpoint
              -> (ByteString
              -> ZGSCmd
              -> IO (t (ByteString, ZGSCmd)))
              -> m a
zgossipRouter :: Endpoint
-> (ByteString -> ZGSCmd -> IO (t (ByteString, ZGSCmd))) -> m a
zgossipRouter Endpoint
endpoint ByteString -> ZGSCmd -> IO (t (ByteString, ZGSCmd))
handler = (forall z. ZMQ z a) -> m a
forall (m :: * -> *) a. MonadIO m => (forall z. ZMQ z a) -> m a
ZMQ.runZMQ ((forall z. ZMQ z a) -> m a) -> (forall z. ZMQ z a) -> m a
forall a b. (a -> b) -> a -> b
$ do
  Socket z Router
sock <- Router -> ZMQ z (Socket z Router)
forall t z. SocketType t => t -> ZMQ z (Socket z t)
ZMQ.socket Router
ZMQ.Router
  Socket z Router -> String -> ZMQ z ()
forall z t. Socket z t -> String -> ZMQ z ()
ZMQ.bind Socket z Router
sock (String -> ZMQ z ()) -> String -> ZMQ z ()
forall a b. (a -> b) -> a -> b
$ ByteString -> String
B.unpack (ByteString -> String) -> ByteString -> String
forall a b. (a -> b) -> a -> b
$ Endpoint -> ByteString
pEndpoint Endpoint
endpoint

  ZMQ z () -> ZMQ z a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (ZMQ z () -> ZMQ z a) -> ZMQ z () -> ZMQ z a
forall a b. (a -> b) -> a -> b
$ do
     [ByteString]
input <- Socket z Router -> ZMQ z [ByteString]
forall t z. Receiver t => Socket z t -> ZMQ z [ByteString]
ZMQ.receiveMulti Socket z Router
sock
     case [ByteString] -> Either String ZGSMsg
parseZGS [ByteString]
input of
        Left String
err -> IO () -> ZMQ z ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> ZMQ z ()) -> IO () -> ZMQ z ()
forall a b. (a -> b) -> a -> b
$ String -> IO ()
forall a. Show a => a -> IO ()
print (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Malformed gossip message received: " String -> String -> String
forall a. [a] -> [a] -> [a]
++ String
err
        Right ZGSMsg{Maybe ByteString
ZGSCmd
zgsCmd :: ZGSMsg -> ZGSCmd
zgsFrom :: ZGSMsg -> Maybe ByteString
zgsCmd :: ZGSCmd
zgsFrom :: Maybe ByteString
..} -> do
            --liftIO $ print msg
            t (ByteString, ZGSCmd)
res <- IO (t (ByteString, ZGSCmd)) -> ZMQ z (t (ByteString, ZGSCmd))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (t (ByteString, ZGSCmd)) -> ZMQ z (t (ByteString, ZGSCmd)))
-> IO (t (ByteString, ZGSCmd)) -> ZMQ z (t (ByteString, ZGSCmd))
forall a b. (a -> b) -> a -> b
$ ByteString -> ZGSCmd -> IO (t (ByteString, ZGSCmd))
handler (Maybe ByteString -> ByteString
forall a. HasCallStack => Maybe a -> a
fromJust Maybe ByteString
zgsFrom) ZGSCmd
zgsCmd
            (((ByteString, ZGSCmd) -> ZMQ z ())
 -> t (ByteString, ZGSCmd) -> ZMQ z ())
-> t (ByteString, ZGSCmd)
-> ((ByteString, ZGSCmd) -> ZMQ z ())
-> ZMQ z ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip ((ByteString, ZGSCmd) -> ZMQ z ())
-> t (ByteString, ZGSCmd) -> ZMQ z ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ t (ByteString, ZGSCmd)
res (((ByteString, ZGSCmd) -> ZMQ z ()) -> ZMQ z ())
-> ((ByteString, ZGSCmd) -> ZMQ z ()) -> ZMQ z ()
forall a b. (a -> b) -> a -> b
$ \(ByteString
to, ZGSCmd
cmd) -> do
              --liftIO $ print "FWDing" >> print (to, cmd)
              Socket z Router -> NonEmpty ByteString -> ZMQ z ()
forall t z.
Sender t =>
Socket z t -> NonEmpty ByteString -> ZMQ z ()
ZMQ.sendMulti Socket z Router
sock (NonEmpty ByteString -> ZMQ z ())
-> NonEmpty ByteString -> ZMQ z ()
forall a b. (a -> b) -> a -> b
$ ([ByteString] -> NonEmpty ByteString
forall a. [a] -> NonEmpty a
NE.fromList [ByteString
to, ByteString
to, ZGSMsg -> ByteString
encodeZGS (ZGSMsg -> ByteString) -> ZGSMsg -> ByteString
forall a b. (a -> b) -> a -> b
$ ZGSCmd -> ZGSMsg
newZGS (ZGSCmd -> ZGSMsg) -> ZGSCmd -> ZGSMsg
forall a b. (a -> b) -> a -> b
$ ZGSCmd
cmd ] :: NE.NonEmpty ByteString)