{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}

{-# LANGUAGE FlexibleContexts #-}
module Network.ZGossip.ZMQ (zgossipDealer, zgossipRouter) where

import Control.Monad
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Monad.IO.Class

import Data.ByteString (ByteString)
import qualified System.ZMQ4.Monadic as ZMQ
import qualified Data.ByteString.Char8 as B
import qualified Data.List.NonEmpty as NE

import Data.Maybe

import Data.ZGossip
import System.ZMQ4.Endpoint

zgossipDealer :: Control.Monad.IO.Class.MonadIO m
              => Endpoint
              -> ByteString
              -> TBQueue ZGSCmd
              -> (ZGSMsg -> IO ())
              -> m a
zgossipDealer :: forall (m :: * -> *) a.
MonadIO m =>
Endpoint
-> ByteString -> TBQueue ZGSCmd -> (ZGSMsg -> IO ()) -> m a
zgossipDealer Endpoint
endpoint ByteString
ourUUID TBQueue ZGSCmd
peerQ ZGSMsg -> IO ()
handler = forall (m :: * -> *) a. MonadIO m => (forall z. ZMQ z a) -> m a
ZMQ.runZMQ forall a b. (a -> b) -> a -> b
$ do
  Socket z Dealer
d <- forall t z. SocketType t => t -> ZMQ z (Socket z t)
ZMQ.socket Dealer
ZMQ.Dealer
  forall i z t.
Integral i =>
Restricted (Nneg1, Int32) i -> Socket z t -> ZMQ z ()
ZMQ.setLinger (forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict (Int
1 :: Int)) Socket z Dealer
d
  -- Never block on sending; we use an infinite HWM and buffer as many
  -- messages as needed in outgoing pipes. Note that the maximum number
  -- is the overall tuple set size
  forall i z t.
Integral i =>
Restricted (N0, Int32) i -> Socket z t -> ZMQ z ()
ZMQ.setSendHighWM (forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict (Int
0 :: Int)) Socket z Dealer
d
  forall i z t.
Integral i =>
Restricted (Nneg1, Int32) i -> Socket z t -> ZMQ z ()
ZMQ.setSendTimeout (forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict (Int
0 :: Int)) Socket z Dealer
d
  forall z t.
Restricted (N1, N254) ByteString -> Socket z t -> ZMQ z ()
ZMQ.setIdentity (forall r v. Restriction r v => v -> Restricted r v
ZMQ.restrict forall a b. (a -> b) -> a -> b
$ ByteString
ourUUID) Socket z Dealer
d
  forall z t. Socket z t -> String -> ZMQ z ()
ZMQ.connect Socket z Dealer
d forall a b. (a -> b) -> a -> b
$ ByteString -> String
B.unpack forall a b. (a -> b) -> a -> b
$ Endpoint -> ByteString
pEndpoint Endpoint
endpoint
  let spam :: ZMQ z b
spam = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
        ZGSCmd
cmd <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> STM a
readTBQueue TBQueue ZGSCmd
peerQ
        --liftIO $ print "Spreading gossip" >> (print $ newZGS cmd)
        forall t z.
Sender t =>
Socket z t -> NonEmpty ByteString -> ZMQ z ()
ZMQ.sendMulti Socket z Dealer
d forall a b. (a -> b) -> a -> b
$ (forall a. [a] -> NonEmpty a
NE.fromList [ZGSMsg -> ByteString
encodeZGS forall a b. (a -> b) -> a -> b
$ ZGSCmd -> ZGSMsg
newZGS ZGSCmd
cmd] :: NE.NonEmpty ByteString)

  let recv :: ZMQ z b
recv = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
        [ByteString]
input <- forall t z. Receiver t => Socket z t -> ZMQ z [ByteString]
ZMQ.receiveMulti Socket z Dealer
d
        case [ByteString] -> Either String ZGSMsg
parseZGS [ByteString]
input of
           Left String
err -> do
             forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> IO ()
print forall a b. (a -> b) -> a -> b
$ String
"Malformed gossip message received: " forall a. [a] -> [a] -> [a]
++ String
err
             forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> IO ()
print [ByteString]
input
           Right ZGSMsg
msg -> do
             forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ZGSMsg -> IO ()
handler ZGSMsg
msg

  Async a
sa <- forall z a. ZMQ z a -> ZMQ z (Async a)
ZMQ.async forall {b}. ZMQ z b
spam
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall z a. ZMQ z a -> ZMQ z (Async a)
ZMQ.async forall {b}. ZMQ z b
recv
  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Async a -> IO a
wait Async a
sa

zgossipRouter :: (Foldable t, Control.Monad.IO.Class.MonadIO m)
              => Endpoint
              -> (ByteString
              -> ZGSCmd
              -> IO (t (ByteString, ZGSCmd)))
              -> m a
zgossipRouter :: forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, MonadIO m) =>
Endpoint
-> (ByteString -> ZGSCmd -> IO (t (ByteString, ZGSCmd))) -> m a
zgossipRouter Endpoint
endpoint ByteString -> ZGSCmd -> IO (t (ByteString, ZGSCmd))
handler = forall (m :: * -> *) a. MonadIO m => (forall z. ZMQ z a) -> m a
ZMQ.runZMQ forall a b. (a -> b) -> a -> b
$ do
  Socket z Router
sock <- forall t z. SocketType t => t -> ZMQ z (Socket z t)
ZMQ.socket Router
ZMQ.Router
  forall z t. Socket z t -> String -> ZMQ z ()
ZMQ.bind Socket z Router
sock forall a b. (a -> b) -> a -> b
$ ByteString -> String
B.unpack forall a b. (a -> b) -> a -> b
$ Endpoint -> ByteString
pEndpoint Endpoint
endpoint

  forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
     [ByteString]
input <- forall t z. Receiver t => Socket z t -> ZMQ z [ByteString]
ZMQ.receiveMulti Socket z Router
sock
     case [ByteString] -> Either String ZGSMsg
parseZGS [ByteString]
input of
        Left String
err -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. Show a => a -> IO ()
print forall a b. (a -> b) -> a -> b
$ String
"Malformed gossip message received: " forall a. [a] -> [a] -> [a]
++ String
err
        Right ZGSMsg{Maybe ByteString
ZGSCmd
zgsCmd :: ZGSMsg -> ZGSCmd
zgsFrom :: ZGSMsg -> Maybe ByteString
zgsCmd :: ZGSCmd
zgsFrom :: Maybe ByteString
..} -> do
            --liftIO $ print msg
            t (ByteString, ZGSCmd)
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ByteString -> ZGSCmd -> IO (t (ByteString, ZGSCmd))
handler (forall a. HasCallStack => Maybe a -> a
fromJust Maybe ByteString
zgsFrom) ZGSCmd
zgsCmd
            forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ t (ByteString, ZGSCmd)
res forall a b. (a -> b) -> a -> b
$ \(ByteString
to, ZGSCmd
cmd) -> do
              --liftIO $ print "FWDing" >> print (to, cmd)
              forall t z.
Sender t =>
Socket z t -> NonEmpty ByteString -> ZMQ z ()
ZMQ.sendMulti Socket z Router
sock forall a b. (a -> b) -> a -> b
$ (forall a. [a] -> NonEmpty a
NE.fromList [ByteString
to, ByteString
to, ZGSMsg -> ByteString
encodeZGS forall a b. (a -> b) -> a -> b
$ ZGSCmd -> ZGSMsg
newZGS forall a b. (a -> b) -> a -> b
$ ZGSCmd
cmd ] :: NE.NonEmpty ByteString)