{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.ZGossip (
    zgossipServer
  , zgossipClient
  , zgossipZRE) where


import Prelude hiding (putStrLn, take)
import Control.Monad hiding (join)
import Control.Monad.IO.Class
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM

import Data.ByteString (ByteString)
import Data.UUID
import qualified Data.Map as M
import qualified Data.Set as S
import qualified Data.ByteString.Char8 as B
import qualified Data.ByteString.Lazy as BL

import Data.ZGossip
import Network.ZRE.Types (API(DoDiscover))
import Network.ZRE.Utils (bshow)

import Network.ZGossip.ZMQ
import Network.ZGossip.Types

import System.ZMQ4.Endpoint

zgossipClient :: Key -> Endpoint -> Endpoint -> (ZGSMsg -> IO ()) -> IO ()
zgossipClient :: ByteString -> Endpoint -> Endpoint -> (ZGSMsg -> IO ()) -> IO ()
zgossipClient ByteString
uuid Endpoint
endpoint Endpoint
ourEndpoint ZGSMsg -> IO ()
handler = do
  TBQueue ZGSCmd
gossipQ <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
10
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue ZGSCmd
gossipQ) [ZGSCmd
Hello]
  Async Any
pa <- forall a. IO a -> IO (Async a)
async forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
    forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue ZGSCmd
gossipQ forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString -> TTL -> ZGSCmd
Publish ByteString
uuid (Endpoint -> ByteString
pEndpoint Endpoint
ourEndpoint) TTL
600
    -- publish every 50s
    TTL -> IO ()
threadDelay forall a b. (a -> b) -> a -> b
$ TTL
1000000forall a. Num a => a -> a -> a
*TTL
50

  forall a. Async a -> IO ()
link Async Any
pa
  forall (m :: * -> *) a.
MonadIO m =>
Endpoint
-> ByteString -> TBQueue ZGSCmd -> (ZGSMsg -> IO ()) -> m a
zgossipDealer Endpoint
endpoint ByteString
uuid TBQueue ZGSCmd
gossipQ ZGSMsg -> IO ()
handler

zgossipServer :: Endpoint -> IO ()
zgossipServer :: Endpoint -> IO ()
zgossipServer Endpoint
endpoint = do
  TVar ZGossipState
gossipS <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. a -> STM (TVar a)
newTVar ZGossipState
emptyGossipState

  let expire :: IO b
expire = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
        forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZGossipState
gossipS forall a b. (a -> b) -> a -> b
$ \ZGossipState
x -> ZGossipState
x { gossipPairs :: Map ByteString (ByteString, TTL)
gossipPairs = forall a b k. (a -> Maybe b) -> Map k a -> Map k b
M.mapMaybe forall {b} {a}. (Eq b, Num b) => (a, b) -> Maybe (a, b)
ttlUpdate (ZGossipState -> Map ByteString (ByteString, TTL)
gossipPairs ZGossipState
x) }
        TTL -> IO ()
threadDelay TTL
1000000
        where
           ttlUpdate :: (a, b) -> Maybe (a, b)
ttlUpdate (a
_, b
0)   = forall a. Maybe a
Nothing
           ttlUpdate (a
v, b
ttl) = forall a. a -> Maybe a
Just (a
v, b
ttl forall a. Num a => a -> a -> a
- b
1)

  Async Any
ea <- forall a. IO a -> IO (Async a)
async forall {b}. IO b
expire
  forall a. Async a -> IO ()
link Async Any
ea

  forall (t :: * -> *) (m :: * -> *) a.
(Foldable t, MonadIO m) =>
Endpoint
-> (ByteString -> ZGSCmd -> IO (t (ByteString, ZGSCmd))) -> m a
zgossipRouter Endpoint
endpoint (TVar ZGossipState
-> ByteString -> ZGSCmd -> IO [(ByteString, ZGSCmd)]
serverHandle TVar ZGossipState
gossipS)

serverHandle :: TVar ZGossipState -> Peer -> ZGSCmd -> IO [(Peer, ZGSCmd)]
serverHandle :: TVar ZGossipState
-> ByteString -> ZGSCmd -> IO [(ByteString, ZGSCmd)]
serverHandle TVar ZGossipState
s ByteString
from ZGSCmd
Hello = do
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZGossipState
s forall a b. (a -> b) -> a -> b
$ \ZGossipState
x -> ZGossipState
x { gossipPeers :: Set ByteString
gossipPeers = forall a. Ord a => a -> Set a -> Set a
S.insert ByteString
from (ZGossipState -> Set ByteString
gossipPeers ZGossipState
x) }
  ZGossipState
st <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar ZGossipState
s
  [ByteString] -> IO ()
dbg [ByteString
"Hello from", ByteString -> ByteString
tryUUID ByteString
from]
  -- send all the k,v pairs to this client
  forall (m :: * -> *) a. Monad m => a -> m a
return [(ByteString
from, (ByteString, (ByteString, TTL)) -> ZGSCmd
cvtPub (ByteString, (ByteString, TTL))
pub) | (ByteString, (ByteString, TTL))
pub <- forall k a. Map k a -> [(k, a)]
M.toList forall a b. (a -> b) -> a -> b
$ ZGossipState -> Map ByteString (ByteString, TTL)
gossipPairs ZGossipState
st ]
serverHandle TVar ZGossipState
s ByteString
from pub :: ZGSCmd
pub@(Publish ByteString
k ByteString
v TTL
ttl) = do
  forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZGossipState
s forall a b. (a -> b) -> a -> b
$ \ZGossipState
x -> ZGossipState
x { gossipPairs :: Map ByteString (ByteString, TTL)
gossipPairs = forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ByteString
k (ByteString
v, TTL
ttl) (ZGossipState -> Map ByteString (ByteString, TTL)
gossipPairs ZGossipState
x) }
  ZGossipState
st <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar ZGossipState
s
  [ByteString] -> IO ()
dbg [ByteString
"Publish from", ByteString -> ByteString
tryUUID ByteString
from, ByteString -> ByteString
tryUUID ByteString
k, ByteString
"=", ByteString
v, ByteString
"( ttl", forall a. Show a => a -> ByteString
bshow TTL
ttl, ByteString
")"]

  -- republish this to all other clients
  forall (m :: * -> *) a. Monad m => a -> m a
return [(ByteString
to, ZGSCmd
pub) | ByteString
to <- forall k a. Map k a -> [k]
M.keys forall a b. (a -> b) -> a -> b
$ ZGossipState -> Map ByteString (ByteString, TTL)
gossipPairs ZGossipState
st, ByteString
to forall a. Eq a => a -> a -> Bool
/= ByteString
from ]
serverHandle TVar ZGossipState
_ ByteString
from ZGSCmd
Ping = do
  [ByteString] -> IO ()
dbg [ByteString
"Ping from", ByteString -> ByteString
tryUUID ByteString
from]
  forall (m :: * -> *) a. Monad m => a -> m a
return [(ByteString
from, ZGSCmd
PingOk)]
serverHandle TVar ZGossipState
_ ByteString
_ ZGSCmd
PingOk = forall (m :: * -> *) a. Monad m => a -> m a
return []
serverHandle TVar ZGossipState
_ ByteString
_ ZGSCmd
Invalid = forall (m :: * -> *) a. Monad m => a -> m a
return []

tryUUID :: ByteString -> ByteString
tryUUID :: ByteString -> ByteString
tryUUID ByteString
x = forall b a. b -> (a -> b) -> Maybe a -> b
maybe ByteString
x UUID -> ByteString
toASCIIBytes (ByteString -> Maybe UUID
fromByteString forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
BL.fromStrict ByteString
x)

dbg :: [ByteString] -> IO ()
dbg :: [ByteString] -> IO ()
dbg = ByteString -> IO ()
B.putStrLn forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" ")

-- send DoDiscover ZRE API messages on new Publish message
zgossipZRE :: TBQueue API -> ZGSMsg -> IO ()
zgossipZRE :: TBQueue API -> ZGSMsg -> IO ()
zgossipZRE TBQueue API
q ZGSMsg{Maybe ByteString
ZGSCmd
zgsCmd :: ZGSMsg -> ZGSCmd
zgsFrom :: ZGSMsg -> Maybe ByteString
zgsCmd :: ZGSCmd
zgsFrom :: Maybe ByteString
..} = ZGSCmd -> IO ()
handlePublish ZGSCmd
zgsCmd
  where handlePublish :: ZGSCmd -> IO ()
handlePublish (Publish ByteString
k ByteString
v TTL
_) = do
          case ByteString -> Maybe UUID
fromByteString forall a b. (a -> b) -> a -> b
$ ByteString -> ByteString
BL.fromStrict ByteString
k of
            Maybe UUID
Nothing ->  forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ByteString -> IO ()
B.putStrLn ByteString
"Can't parse zgossip uuid"
            Just UUID
uuid -> do
             case ByteString -> Either String Endpoint
parseAttoEndpoint ByteString
v of
               (Left String
_err) -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ByteString -> IO ()
B.putStrLn ByteString
"Can't parse zgossip endpoint"
               (Right Endpoint
endpoint) -> forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue API
q (UUID -> Endpoint -> API
DoDiscover UUID
uuid Endpoint
endpoint)
        handlePublish ZGSCmd
_ = forall (m :: * -> *) a. Monad m => a -> m a
return ()