{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.ZRE (
    runZre
  , runZreCfg
  , runZreOpts
  , readZ
  , writeZ
  , unReadZ
  , defaultConf
  , API(..)
  , Event(..)
  , ZRE
  , Z.Group
  , zjoin
  , zleave
  , zshout
  , zshout'
  , zwhisper
  , zdebug
  , znodebug
  , zquit
  , zrecv
  , pEndpoint
  , toASCIIBytes
  , getApiQueue
  , getEventQueue
  , module Network.ZRE.Lib
  ) where

import Prelude hiding (putStrLn, take)
import Control.Monad hiding (join)
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM

import Data.UUID
import Data.UUID.V1
import Data.Maybe

import qualified Data.Set as S
import qualified Data.Map as M
import qualified Data.ByteString.Char8 as B

import qualified Data.ZRE as Z
import Network.ZRE.Beacon
import Network.ZRE.Config
import Network.ZRE.Lib
import Network.ZRE.Options
import Network.ZRE.Peer
import Network.ZRE.Types
import Network.ZRE.Utils
import Network.ZRE.ZMQ

import Network.ZGossip
import System.ZMQ4.Endpoint

import Options.Applicative
import Data.Semigroup ((<>))

runZreOpts :: ZRE a -> IO ()
runZreOpts :: ZRE a -> IO ()
runZreOpts app :: ZRE a
app = do
  ZRECfg
cfg <- ParserInfo ZRECfg -> IO ZRECfg
forall a. ParserInfo a -> IO a
execParser ParserInfo ZRECfg
opts
  ZRECfg -> ZRE a -> IO ()
forall a. ZRECfg -> ZRE a -> IO ()
runZreCfg ZRECfg
cfg ZRE a
app
  where
    opts :: ParserInfo ZRECfg
opts = Parser ZRECfg -> InfoMod ZRECfg -> ParserInfo ZRECfg
forall a. Parser a -> InfoMod a -> ParserInfo a
info (Parser ZRECfg
parseOptions Parser ZRECfg -> Parser (ZRECfg -> ZRECfg) -> Parser ZRECfg
forall (f :: * -> *) a b. Applicative f => f a -> f (a -> b) -> f b
<**> Parser (ZRECfg -> ZRECfg)
forall a. Parser (a -> a)
helper)
      ( InfoMod ZRECfg
forall a. InfoMod a
fullDesc
     InfoMod ZRECfg -> InfoMod ZRECfg -> InfoMod ZRECfg
forall a. Semigroup a => a -> a -> a
<> String -> InfoMod ZRECfg
forall a. String -> InfoMod a
progDesc "ZRE"
     InfoMod ZRECfg -> InfoMod ZRECfg -> InfoMod ZRECfg
forall a. Semigroup a => a -> a -> a
<> String -> InfoMod ZRECfg
forall a. String -> InfoMod a
header "zre tools" )

getIfaces :: [B.ByteString]
          -> IO [(B.ByteString, B.ByteString, B.ByteString)]
getIfaces :: [ByteString] -> IO [(ByteString, ByteString, ByteString)]
getIfaces ifcs :: [ByteString]
ifcs = do
  case [ByteString]
ifcs of
    [] -> do
      Maybe (ByteString, ByteString)
dr <- IO (Maybe (ByteString, ByteString))
getDefRoute
      case Maybe (ByteString, ByteString)
dr of
        Nothing -> ByteString -> IO [(ByteString, ByteString, ByteString)]
forall b. ByteString -> IO b
exitFail "Unable to get default route"
        Just (_route :: ByteString
_route, iface :: ByteString
iface) -> do
          (ByteString, ByteString, ByteString)
i <- ByteString -> IO (ByteString, ByteString, ByteString)
getIfaceReport ByteString
iface
          [(ByteString, ByteString, ByteString)]
-> IO [(ByteString, ByteString, ByteString)]
forall (m :: * -> *) a. Monad m => a -> m a
return ([(ByteString, ByteString, ByteString)]
 -> IO [(ByteString, ByteString, ByteString)])
-> [(ByteString, ByteString, ByteString)]
-> IO [(ByteString, ByteString, ByteString)]
forall a b. (a -> b) -> a -> b
$ [(ByteString, ByteString, ByteString)
i]
    x :: [ByteString]
x  -> do
      [ByteString]
-> (ByteString -> IO (ByteString, ByteString, ByteString))
-> IO [(ByteString, ByteString, ByteString)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [ByteString]
x ByteString -> IO (ByteString, ByteString, ByteString)
getIfaceReport

runIface :: Show a
         => TVar ZREState
         -> Int
         -> (B.ByteString, B.ByteString, a)
         -> IO ()
runIface :: TVar ZREState -> Int -> (ByteString, ByteString, a) -> IO ()
runIface s :: TVar ZREState
s port :: Int
port (iface :: ByteString
iface, ipv4 :: ByteString
ipv4, ipv6 :: a
ipv6) = do
   Async ()
r <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Endpoint -> (ZREMsg -> IO ()) -> IO ()
forall (m :: * -> *) a1 a.
MonadIO m =>
Endpoint -> (ZREMsg -> IO a1) -> m a
zreRouter (ByteString -> Int -> Endpoint
newTCPEndpoint ByteString
ipv4 Int
port) (TVar ZREState -> ZREMsg -> IO ()
inbox TVar ZREState
s)
   STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> (ZREState -> ZREState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s ((ZREState -> ZREState) -> STM ())
-> (ZREState -> ZREState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: ZREState
x ->
     ZREState
x { zreIfaces :: Map ByteString [Async ()]
zreIfaces = ByteString
-> [Async ()]
-> Map ByteString [Async ()]
-> Map ByteString [Async ()]
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ByteString
iface [Async ()
r] (ZREState -> Map ByteString [Async ()]
zreIfaces ZREState
x) }

runZre :: ZRE a -> IO ()
runZre :: ZRE a -> IO ()
runZre a :: ZRE a
a = do
  ZRECfg
cfg <- IO ZRECfg
envZRECfg
  ZRECfg -> ZRE a -> IO ()
forall a. ZRECfg -> ZRE a -> IO ()
runZreCfg ZRECfg
cfg ZRE a
a

runZreCfg :: ZRECfg -> ZRE a -> IO ()
runZreCfg :: ZRECfg -> ZRE a -> IO ()
runZreCfg ZRECfg{..} app :: ZRE a
app = do
    [(ByteString, ByteString, ByteString)]
ifcs <- [ByteString] -> IO [(ByteString, ByteString, ByteString)]
getIfaces [ByteString]
zreInterfaces

    UUID
u <- IO UUID -> (UUID -> IO UUID) -> IO (Maybe UUID) -> IO UUID
forall (m :: * -> *) b a.
Monad m =>
m b -> (a -> m b) -> m (Maybe a) -> m b
maybeM (ByteString -> IO UUID
forall b. ByteString -> IO b
exitFail "Unable to get UUID") UUID -> IO UUID
forall (m :: * -> *) a. Monad m => a -> m a
return IO (Maybe UUID)
nextUUID
    let uuid :: ByteString
uuid = UUID -> ByteString
uuidByteString UUID
u

    case [(ByteString, ByteString, ByteString)]
ifcs of
      [] -> ByteString -> IO ()
forall b. ByteString -> IO b
exitFail "No interfaces found"
      ifaces :: [(ByteString, ByteString, ByteString)]
ifaces@((_ifcname :: ByteString
_ifcname, ipv4 :: ByteString
ipv4, _ipv6 :: ByteString
_ipv6):_) -> do
        Int
zrePort <- ByteString -> IO Int
randPort ByteString
ipv4

        let zreEndpoint :: Endpoint
zreEndpoint = ByteString -> Int -> Endpoint
newTCPEndpoint ByteString
ipv4 Int
zrePort
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
zreDbg (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> IO ()
B.putStrLn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ "Starting with " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> (Endpoint -> ByteString
forall a. Show a => a -> ByteString
bshow Endpoint
zreEndpoint)

        ByteString
zreName <- ByteString -> IO ByteString
getName ByteString
zreNamed

        -- 1M events both ways, not sure about this
        TBQueue Event
inQ <- STM (TBQueue Event) -> IO (TBQueue Event)
forall a. STM a -> IO a
atomically (STM (TBQueue Event) -> IO (TBQueue Event))
-> STM (TBQueue Event) -> IO (TBQueue Event)
forall a b. (a -> b) -> a -> b
$ Natural -> STM (TBQueue Event)
forall a. Natural -> STM (TBQueue a)
newTBQueue 1000000
        TBQueue API
outQ <- STM (TBQueue API) -> IO (TBQueue API)
forall a. STM a -> IO a
atomically (STM (TBQueue API) -> IO (TBQueue API))
-> STM (TBQueue API) -> IO (TBQueue API)
forall a b. (a -> b) -> a -> b
$ Natural -> STM (TBQueue API)
forall a. Natural -> STM (TBQueue a)
newTBQueue 1000000

        TVar ZREState
s <- ByteString
-> Endpoint
-> UUID
-> TBQueue Event
-> TBQueue API
-> Bool
-> IO (TVar ZREState)
newZREState ByteString
zreName Endpoint
zreEndpoint UUID
u TBQueue Event
inQ TBQueue API
outQ Bool
zreDbg

        -- FIXME: support multiple gossip clients
        case Maybe Endpoint
zreZGossip of
          Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          Just end :: Endpoint
end -> IO (Async ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Async ()) -> IO ()) -> IO (Async ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ ByteString -> Endpoint -> Endpoint -> (ZGSMsg -> IO ()) -> IO ()
zgossipClient ByteString
uuid Endpoint
end Endpoint
zreEndpoint (TBQueue API -> ZGSMsg -> IO ()
zgossipZRE TBQueue API
outQ)

        (mCastAddr :: AddrInfo
mCastAddr:_) <- Endpoint -> IO [AddrInfo]
toAddrInfo Endpoint
zreMCast
        Async Any
_beaconAsync <- IO Any -> IO (Async Any)
forall a. IO a -> IO (Async a)
async (IO Any -> IO (Async Any)) -> IO Any -> IO (Async Any)
forall a b. (a -> b) -> a -> b
$ AddrInfo -> ByteString -> Int -> IO Any
forall a. AddrInfo -> ByteString -> Int -> IO a
beacon AddrInfo
mCastAddr ByteString
uuid Int
zrePort
        Async Any
_beaconRecvAsync <- IO Any -> IO (Async Any)
forall a. IO a -> IO (Async a)
async (IO Any -> IO (Async Any)) -> IO Any -> IO (Async Any)
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> Endpoint -> IO Any
forall b. TVar ZREState -> Endpoint -> IO b
beaconRecv TVar ZREState
s Endpoint
zreMCast

        ((ByteString, ByteString, ByteString) -> IO ())
-> [(ByteString, ByteString, ByteString)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TVar ZREState
-> Int -> (ByteString, ByteString, ByteString) -> IO ()
forall a.
Show a =>
TVar ZREState -> Int -> (ByteString, ByteString, a) -> IO ()
runIface TVar ZREState
s Int
zrePort) [(ByteString, ByteString, ByteString)]
ifaces

        Async ()
apiAsync <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> IO ()
api TVar ZREState
s
        Int -> IO ()
threadDelay 500000
        Async ()
_userAppAsync <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ ZRE () -> TBQueue Event -> TBQueue API -> IO ()
forall a. ZRE a -> TBQueue Event -> TBQueue API -> IO a
runZ (ZRE a
app ZRE a -> ZRE () -> ZRE ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ZRE ()
zquit) TBQueue Event
inQ TBQueue API
outQ

        Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
apiAsync
        () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

api :: TVar ZREState -> IO ()
api :: TVar ZREState -> IO ()
api s :: TVar ZREState
s = do
  API
a <- STM API -> IO API
forall a. STM a -> IO a
atomically (STM API -> IO API) -> STM API -> IO API
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> STM ZREState
forall a. TVar a -> STM a
readTVar TVar ZREState
s STM ZREState -> (ZREState -> STM API) -> STM API
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= TBQueue API -> STM API
forall a. TBQueue a -> STM a
readTBQueue (TBQueue API -> STM API)
-> (ZREState -> TBQueue API) -> ZREState -> STM API
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ZREState -> TBQueue API
zreOut
  TVar ZREState -> API -> IO ()
handleApi TVar ZREState
s API
a
  case API
a of
    DoQuit -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    _ -> TVar ZREState -> IO ()
api TVar ZREState
s

handleApi :: TVar ZREState -> API -> IO ()
handleApi :: TVar ZREState -> API -> IO ()
handleApi s :: TVar ZREState
s act :: API
act = do
  case API
act of
    DoJoin group :: ByteString
group -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      STM ()
incGroupSeq
      TVar ZREState -> (ZREState -> ZREState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s ((ZREState -> ZREState) -> STM ())
-> (ZREState -> ZREState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: ZREState
x -> ZREState
x { zreGroups :: Groups
zreGroups = ByteString -> Groups -> Groups
forall a. Ord a => a -> Set a -> Set a
S.insert ByteString
group (ZREState -> Groups
zreGroups ZREState
x) }
      ZREState
st <- TVar ZREState -> STM ZREState
forall a. TVar a -> STM a
readTVar TVar ZREState
s
      TVar ZREState -> ByteString -> Int -> STM ()
msgAllJoin TVar ZREState
s ByteString
group (ZREState -> Int
zreGroupSeq ZREState
st)

    DoLeave group :: ByteString
group -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      STM ()
incGroupSeq
      TVar ZREState -> (ZREState -> ZREState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s ((ZREState -> ZREState) -> STM ())
-> (ZREState -> ZREState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: ZREState
x -> ZREState
x { zreGroups :: Groups
zreGroups = ByteString -> Groups -> Groups
forall a. Ord a => a -> Set a -> Set a
S.delete ByteString
group (ZREState -> Groups
zreGroups ZREState
x) }
      ZREState
st <- TVar ZREState -> STM ZREState
forall a. TVar a -> STM a
readTVar TVar ZREState
s
      TVar ZREState -> ByteString -> Int -> STM ()
msgAllLeave TVar ZREState
s ByteString
group (ZREState -> Int
zreGroupSeq ZREState
st)

    DoShout group :: ByteString
group msg :: ByteString
msg -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> ByteString -> ByteString -> STM ()
shoutGroup TVar ZREState
s ByteString
group ByteString
msg
    DoShoutMulti group :: ByteString
group mmsg :: [ByteString]
mmsg -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> ByteString -> [ByteString] -> STM ()
shoutGroupMulti TVar ZREState
s ByteString
group [ByteString]
mmsg
    DoWhisper uuid :: UUID
uuid msg :: ByteString
msg -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> UUID -> ByteString -> STM ()
whisperPeerUUID TVar ZREState
s UUID
uuid ByteString
msg

    DoDiscover uuid :: UUID
uuid endpoint :: Endpoint
endpoint -> do
      Maybe (TVar Peer)
mp <- STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer))
forall a. STM a -> IO a
atomically (STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer)))
-> STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer))
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer TVar ZREState
s UUID
uuid
      case Maybe (TVar Peer)
mp of
        Just _ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Nothing -> do
          IO (TVar Peer) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (TVar Peer) -> IO ()) -> IO (TVar Peer) -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState
-> UUID
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid ((UTCTime
  -> UUID
  -> TVar ZREState
  -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
 -> IO (TVar Peer))
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
forall a b. (a -> b) -> a -> b
$ Endpoint
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ()))
forall (m :: * -> *) a b.
MonadIO m =>
Endpoint
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromEndpoint Endpoint
endpoint

    DoDebug bool :: Bool
bool -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> (ZREState -> ZREState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s ((ZREState -> ZREState) -> STM ())
-> (ZREState -> ZREState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: ZREState
x -> ZREState
x { zreDebug :: Bool
zreDebug = Bool
bool }

    DoQuit -> do
      let chk :: IO Bool
chk = STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
                   ZREState
s' <- TVar ZREState -> STM ZREState
forall a. TVar a -> STM a
readTVar TVar ZREState
s
                   [Bool]
pqs <- [(UUID, TVar Peer)]
-> ((UUID, TVar Peer) -> STM Bool) -> STM [Bool]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM (Map UUID (TVar Peer) -> [(UUID, TVar Peer)]
forall k a. Map k a -> [(k, a)]
M.toList (Map UUID (TVar Peer) -> [(UUID, TVar Peer)])
-> Map UUID (TVar Peer) -> [(UUID, TVar Peer)]
forall a b. (a -> b) -> a -> b
$ ZREState -> Map UUID (TVar Peer)
zrePeers ZREState
s') (((UUID, TVar Peer) -> STM Bool) -> STM [Bool])
-> ((UUID, TVar Peer) -> STM Bool) -> STM [Bool]
forall a b. (a -> b) -> a -> b
$ \(_, tp :: TVar Peer
tp) -> TVar Peer -> STM Peer
forall a. TVar a -> STM a
readTVar TVar Peer
tp STM Peer -> (Peer -> STM Bool) -> STM Bool
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= TBQueue ZRECmd -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue (TBQueue ZRECmd -> STM Bool)
-> (Peer -> TBQueue ZRECmd) -> Peer -> STM Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Peer -> TBQueue ZRECmd
peerQueue
                   Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> STM Bool) -> Bool -> STM Bool
forall a b. (a -> b) -> a -> b
$ [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
and [Bool]
pqs

      let loop :: IO ()
loop = do
              Bool
res <- IO Bool
chk
              Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
res (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Float -> Int
forall a. RealFrac a => a -> Int
sec (0.1 :: Float)) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop

      IO ()
loop
  where
    incGroupSeq :: STM ()
incGroupSeq = TVar ZREState -> (ZREState -> ZREState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s ((ZREState -> ZREState) -> STM ())
-> (ZREState -> ZREState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: ZREState
x -> ZREState
x { zreGroupSeq :: Int
zreGroupSeq = (ZREState -> Int
zreGroupSeq ZREState
x) Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1 }

-- handles incoming ZRE messages
-- creates peers, updates state
inbox :: TVar ZREState -> Z.ZREMsg -> IO ()
inbox :: TVar ZREState -> ZREMsg -> IO ()
inbox s :: TVar ZREState
s msg :: ZREMsg
msg@Z.ZREMsg{..} = do
  let uuid :: UUID
uuid = Maybe UUID -> UUID
forall a. HasCallStack => Maybe a -> a
fromJust Maybe UUID
msgFrom

  -- print msg , "state pre-msg", printAll s

  Maybe (TVar Peer)
mpt <- STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer))
forall a. STM a -> IO a
atomically (STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer)))
-> STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer))
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer TVar ZREState
s UUID
uuid
  case Maybe (TVar Peer)
mpt of
    Nothing -> do
      case ZRECmd
msgCmd of
        -- if the peer is not known but a message is HELLO we create a new
        -- peer, for other messages we don't know the endpoint to connect to
        h :: ZRECmd
h@(Z.Hello _endpoint :: Endpoint
_endpoint _groups :: Groups
_groups _groupSeq :: Int
_groupSeq _name :: ByteString
_name _headers :: Headers
_headers) -> do
          TVar Peer
peer <- TVar ZREState
-> UUID
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid ((UTCTime
  -> UUID
  -> TVar ZREState
  -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
 -> IO (TVar Peer))
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
forall a b. (a -> b) -> a -> b
$ ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ()))
forall (m :: * -> *) a b.
MonadIO m =>
ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromHello ZRECmd
h
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer ((Peer -> Peer) -> STM ()) -> (Peer -> Peer) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: Peer
x -> Peer
x { peerSeq :: Int
peerSeq = (Peer -> Int
peerSeq Peer
x) Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1 }
        -- silently drop any other messages
        _ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    (Just peer :: TVar Peer
peer) -> do
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Peer -> UTCTime -> STM ()
updateLastHeard TVar Peer
peer (UTCTime -> STM ()) -> UTCTime -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe UTCTime -> UTCTime
forall a. HasCallStack => Maybe a -> a
fromJust Maybe UTCTime
msgTime

      -- destroy/re-start peer when this doesn't match
      Peer
p <- STM Peer -> IO Peer
forall a. STM a -> IO a
atomically (STM Peer -> IO Peer) -> STM Peer -> IO Peer
forall a b. (a -> b) -> a -> b
$ TVar Peer -> STM Peer
forall a. TVar a -> STM a
readTVar TVar Peer
peer
      case Peer -> Int
peerSeq Peer
p Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
msgSeq of
        True -> do
          -- rename to peerExpectSeq, need to update at line 127 too
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer ((Peer -> Peer) -> STM ()) -> (Peer -> Peer) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: Peer
x -> Peer
x { peerSeq :: Int
peerSeq = (Peer -> Int
peerSeq Peer
x) Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1 }
          TVar ZREState -> ZREMsg -> TVar Peer -> IO ()
handleCmd TVar ZREState
s ZREMsg
msg TVar Peer
peer
        _ -> do
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s "sequence mismatch, recreating peer"
          UUID -> ZRECmd -> IO ()
recreatePeer (Peer -> UUID
peerUUID Peer
p) ZRECmd
msgCmd

  -- "state post-msg", printAll s
  where
    recreatePeer :: UUID -> ZRECmd -> IO ()
recreatePeer uuid :: UUID
uuid h :: ZRECmd
h@(Z.Hello _ _ _ _ _) = do
          TVar ZREState -> UUID -> IO ()
destroyPeer TVar ZREState
s UUID
uuid
          TVar Peer
peer <- TVar ZREState
-> UUID
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid ((UTCTime
  -> UUID
  -> TVar ZREState
  -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
 -> IO (TVar Peer))
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
forall a b. (a -> b) -> a -> b
$ ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ()))
forall (m :: * -> *) a b.
MonadIO m =>
ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromHello ZRECmd
h
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer ((Peer -> Peer) -> STM ()) -> (Peer -> Peer) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: Peer
x -> Peer
x { peerSeq :: Int
peerSeq = (Peer -> Int
peerSeq Peer
x) Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1 }
    recreatePeer uuid :: UUID
uuid _ = TVar ZREState -> UUID -> IO ()
destroyPeer TVar ZREState
s UUID
uuid

handleCmd :: TVar ZREState -> Z.ZREMsg -> TVar Peer -> IO ()
handleCmd :: TVar ZREState -> ZREMsg -> TVar Peer -> IO ()
handleCmd s :: TVar ZREState
s Z.ZREMsg{msgFrom :: ZREMsg -> Maybe UUID
msgFrom=(Just from :: UUID
from), msgTime :: ZREMsg -> Maybe UTCTime
msgTime=(Just time :: UTCTime
time), msgCmd :: ZREMsg -> ZRECmd
msgCmd=ZRECmd
cmd} peer :: TVar Peer
peer = do
      case ZRECmd
cmd of
        (Z.Whisper content :: [ByteString]
content) -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          TVar ZREState -> Event -> STM ()
emit TVar ZREState
s (Event -> STM ()) -> Event -> STM ()
forall a b. (a -> b) -> a -> b
$ UUID -> [ByteString] -> UTCTime -> Event
Whisper UUID
from [ByteString]
content UTCTime
time
          TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate " " ["whisper", [ByteString] -> ByteString
B.concat [ByteString]
content]

        Z.Shout group :: ByteString
group content :: [ByteString]
content -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          TVar ZREState -> Event -> STM ()
emit TVar ZREState
s (Event -> STM ()) -> Event -> STM ()
forall a b. (a -> b) -> a -> b
$ UUID -> ByteString -> [ByteString] -> UTCTime -> Event
Shout UUID
from ByteString
group [ByteString]
content UTCTime
time
          TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate " " ["shout for group", ByteString
group, ">", [ByteString] -> ByteString
B.concat [ByteString]
content]

        Z.Join group :: ByteString
group groupSeq :: Int
groupSeq -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          TVar ZREState -> TVar Peer -> ByteString -> Int -> STM ()
joinGroup TVar ZREState
s TVar Peer
peer ByteString
group Int
groupSeq
          TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate " " ["join", ByteString
group, Int -> ByteString
forall a. Show a => a -> ByteString
bshow Int
groupSeq]

        Z.Leave group :: ByteString
group groupSeq :: Int
groupSeq -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          TVar ZREState -> TVar Peer -> ByteString -> Int -> STM ()
leaveGroup TVar ZREState
s TVar Peer
peer ByteString
group Int
groupSeq
          TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate " " ["leave", ByteString
group, Int -> ByteString
forall a. Show a => a -> ByteString
bshow Int
groupSeq]

        Z.Ping -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          TVar Peer -> ZRECmd -> STM ()
msgPeer TVar Peer
peer ZRECmd
Z.PingOk
          Peer
p <- TVar Peer -> STM Peer
forall a. TVar a -> STM a
readTVar TVar Peer
peer
          TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
B.unwords ["sending pings to ", Peer -> ByteString
forall a. Show a => a -> ByteString
bshow Peer
p]
        Z.PingOk -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Z.Hello endpoint :: Endpoint
endpoint groups :: Groups
groups groupSeq :: Int
groupSeq name :: ByteString
name headers :: Headers
headers -> do
          -- if this peer was already registered
          -- (e.g. from beacon) update appropriate data
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            TVar ZREState -> TVar Peer -> Groups -> Int -> STM ()
joinGroups TVar ZREState
s TVar Peer
peer Groups
groups Int
groupSeq
            TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer ((Peer -> Peer) -> STM ()) -> (Peer -> Peer) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: Peer
x -> Peer
x {
                         peerName :: Maybe ByteString
peerName = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
name
                       , peerHeaders :: Headers
peerHeaders = Headers
headers
                       }
            Peer
p <- TVar Peer -> STM Peer
forall a. TVar a -> STM a
readTVar TVar Peer
peer
            TVar ZREState -> Event -> STM ()
emit TVar ZREState
s (Event -> STM ()) -> Event -> STM ()
forall a b. (a -> b) -> a -> b
$ UUID -> ByteString -> Groups -> Headers -> Endpoint -> Event
Ready (Peer -> UUID
peerUUID Peer
p) ByteString
name Groups
groups Headers
headers Endpoint
endpoint
            TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ "update peer"
          () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleCmd _ _ _ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()