{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.ZRE (
    runZre
  , runZreCfg
  , runZreEnvConfig
  , runZreParse
  , readZ
  , writeZ
  , unReadZ
  , defaultConf
  , API(..)
  , Event(..)
  , ZRE
  , Z.Group
  , Z.mkGroup
  , Z.unGroup
  , zjoin
  , zleave
  , zshout
  , zshout'
  , zwhisper
  , zdebug
  , znodebug
  , zquit
  , zfail
  , zrecv
  , pEndpoint
  , toASCIIBytes
  , getApiQueue
  , getEventQueue
  , module Network.ZRE.Lib
  ) where

import Prelude hiding (putStrLn, take)
import Control.Monad hiding (join)
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Exception (SomeException)
import qualified Control.Exception.Lifted

import Data.ByteString (ByteString)
import Data.UUID
import Data.UUID.V1
import Data.Maybe

import qualified Data.Set as S
import qualified Data.Map as M
import qualified Data.ByteString.Char8 as B

import qualified Data.ZRE as Z
import Network.ZRE.Beacon
import Network.ZRE.Config
import Network.ZRE.Lib
import Network.ZRE.Options
import Network.ZRE.Peer
import Network.ZRE.Types
import Network.ZRE.Utils
import Network.ZRE.ZMQ

import Network.ZGossip
import System.ZMQ4.Endpoint

import Options.Applicative

getIfaces :: [ByteString]
          -> IO [(ByteString, ByteString, ByteString)]
getIfaces :: [ByteString] -> IO [(ByteString, ByteString, ByteString)]
getIfaces [ByteString]
ifcs = do
  case [ByteString]
ifcs of
    [] -> do
      Maybe (ByteString, ByteString)
dr <- IO (Maybe (ByteString, ByteString))
getDefRoute
      case Maybe (ByteString, ByteString)
dr of
        Maybe (ByteString, ByteString)
Nothing -> ByteString -> IO [(ByteString, ByteString, ByteString)]
forall b. ByteString -> IO b
exitFail ByteString
"Unable to get default route"
        Just (ByteString
_route, ByteString
iface) -> do
          (ByteString, ByteString, ByteString)
i <- ByteString -> IO (ByteString, ByteString, ByteString)
getIfaceReport ByteString
iface
          [(ByteString, ByteString, ByteString)]
-> IO [(ByteString, ByteString, ByteString)]
forall (m :: * -> *) a. Monad m => a -> m a
return ([(ByteString, ByteString, ByteString)]
 -> IO [(ByteString, ByteString, ByteString)])
-> [(ByteString, ByteString, ByteString)]
-> IO [(ByteString, ByteString, ByteString)]
forall a b. (a -> b) -> a -> b
$ [(ByteString, ByteString, ByteString)
i]
    [ByteString]
x  -> do
      [ByteString]
-> (ByteString -> IO (ByteString, ByteString, ByteString))
-> IO [(ByteString, ByteString, ByteString)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [ByteString]
x ByteString -> IO (ByteString, ByteString, ByteString)
getIfaceReport

runIface :: Show a
         => TVar ZREState
         -> Int
         -> (ByteString, ByteString, a)
         -> IO ()
runIface :: TVar ZREState -> Int -> (ByteString, ByteString, a) -> IO ()
runIface TVar ZREState
s Int
port (ByteString
iface, ByteString
ipv4, a
ipv6) = do
   Async ()
r <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Endpoint -> (ZREMsg -> IO ()) -> IO ()
forall (m :: * -> *) a1 a.
MonadIO m =>
Endpoint -> (ZREMsg -> IO a1) -> m a
zreRouter (ByteString -> Int -> Endpoint
newTCPEndpoint ByteString
ipv4 Int
port) (TVar ZREState -> ZREMsg -> IO ()
inbox TVar ZREState
s)
   STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> (ZREState -> ZREState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s ((ZREState -> ZREState) -> STM ())
-> (ZREState -> ZREState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ZREState
x ->
     ZREState
x { zreIfaces :: Map ByteString [Async ()]
zreIfaces = ByteString
-> [Async ()]
-> Map ByteString [Async ()]
-> Map ByteString [Async ()]
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ByteString
iface [Async ()
r] (ZREState -> Map ByteString [Async ()]
zreIfaces ZREState
x) }

runZre :: ZRE a -> IO ()
runZre :: ZRE a -> IO ()
runZre ZRE a
app = Parser () -> (() -> ZRE a) -> IO ()
forall extra a. Parser extra -> (extra -> ZRE a) -> IO ()
runZreParse (() -> Parser ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()) (ZRE a -> () -> ZRE a
forall a b. a -> b -> a
const ZRE a
app)

-- | Run with config file loaded from the enviornment variable ENVCFG
-- (@/etc/zre.conf@ or @~/.zre.conf@), possibly overriden by command-line options.
--
-- Accepts another `optparse-applicative` `Parser` for extending
-- built-in one.
runZreParse :: Parser extra -> (extra -> ZRE a) -> IO ()
runZreParse :: Parser extra -> (extra -> ZRE a) -> IO ()
runZreParse Parser extra
parseExtra extra -> ZRE a
app = do
  -- try to get config from the enviornment variable ENVCFG, /etc/zre.conf
  -- or ~/.zre.conf and override with command line options.
  ZRECfg
cfgIni <- String -> IO ZRECfg
envZRECfg String
"zre"
  (ZRECfg
cfgOpts, extra
extras) <- ParserInfo (ZRECfg, extra) -> IO (ZRECfg, extra)
forall a. ParserInfo a -> IO a
execParser ParserInfo (ZRECfg, extra)
opts
  ZRECfg -> ZRE a -> IO ()
forall a. ZRECfg -> ZRE a -> IO ()
runZreCfg (ZRECfg -> ZRECfg -> ZRECfg
overrideNonDefault ZRECfg
cfgIni ZRECfg
cfgOpts) (extra -> ZRE a
app extra
extras)
  where
    opts :: ParserInfo (ZRECfg, extra)
opts = Parser (ZRECfg, extra)
-> InfoMod (ZRECfg, extra) -> ParserInfo (ZRECfg, extra)
forall a. Parser a -> InfoMod a -> ParserInfo a
info (((,) (ZRECfg -> extra -> (ZRECfg, extra))
-> Parser ZRECfg -> Parser (extra -> (ZRECfg, extra))
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Parser ZRECfg
parseOptions Parser (extra -> (ZRECfg, extra))
-> Parser extra -> Parser (ZRECfg, extra)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Parser extra
parseExtra)  Parser (ZRECfg, extra)
-> Parser ((ZRECfg, extra) -> (ZRECfg, extra))
-> Parser (ZRECfg, extra)
forall (f :: * -> *) a b. Applicative f => f a -> f (a -> b) -> f b
<**> Parser ((ZRECfg, extra) -> (ZRECfg, extra))
forall a. Parser (a -> a)
helper)
      ( InfoMod (ZRECfg, extra)
forall a. InfoMod a
fullDesc
     InfoMod (ZRECfg, extra)
-> InfoMod (ZRECfg, extra) -> InfoMod (ZRECfg, extra)
forall a. Semigroup a => a -> a -> a
<> String -> InfoMod (ZRECfg, extra)
forall a. String -> InfoMod a
progDesc String
"ZRE"
     InfoMod (ZRECfg, extra)
-> InfoMod (ZRECfg, extra) -> InfoMod (ZRECfg, extra)
forall a. Semigroup a => a -> a -> a
<> String -> InfoMod (ZRECfg, extra)
forall a. String -> InfoMod a
header String
"zre tools" )

-- | Run with config file loaded from the enviornment variable ENVCFG
-- (@/etc/zre.conf@ or @~/.zre.conf@)
runZreEnvConfig :: ZRE a -> IO ()
runZreEnvConfig :: ZRE a -> IO ()
runZreEnvConfig ZRE a
app = do
  ZRECfg
cfgIni <- String -> IO ZRECfg
envZRECfg String
"zre"
  ZRECfg -> ZRE a -> IO ()
forall a. ZRECfg -> ZRE a -> IO ()
runZreCfg ZRECfg
cfgIni ZRE a
app

runZreCfg :: ZRECfg -> ZRE a -> IO ()
runZreCfg :: ZRECfg -> ZRE a -> IO ()
runZreCfg cfg :: ZRECfg
cfg@ZRECfg{Bool
Float
[ByteString]
Maybe Endpoint
ByteString
Endpoint
zreDbg :: ZRECfg -> Bool
zreZGossip :: ZRECfg -> Maybe Endpoint
zreMCast :: ZRECfg -> Endpoint
zreInterfaces :: ZRECfg -> [ByteString]
zreBeaconPeriod :: ZRECfg -> Float
zreDeadPeriod :: ZRECfg -> Float
zreQuietPingRate :: ZRECfg -> Float
zreQuietPeriod :: ZRECfg -> Float
zreNamed :: ZRECfg -> ByteString
zreDbg :: Bool
zreZGossip :: Maybe Endpoint
zreMCast :: Endpoint
zreInterfaces :: [ByteString]
zreBeaconPeriod :: Float
zreDeadPeriod :: Float
zreQuietPingRate :: Float
zreQuietPeriod :: Float
zreNamed :: ByteString
..} ZRE a
app = do
    [(ByteString, ByteString, ByteString)]
ifcs <- [ByteString] -> IO [(ByteString, ByteString, ByteString)]
getIfaces [ByteString]
zreInterfaces

    UUID
u <- IO UUID -> (UUID -> IO UUID) -> IO (Maybe UUID) -> IO UUID
forall (m :: * -> *) b a.
Monad m =>
m b -> (a -> m b) -> m (Maybe a) -> m b
maybeM (ByteString -> IO UUID
forall b. ByteString -> IO b
exitFail ByteString
"Unable to get UUID") UUID -> IO UUID
forall (m :: * -> *) a. Monad m => a -> m a
return IO (Maybe UUID)
nextUUID
    let uuid :: ByteString
uuid = UUID -> ByteString
uuidByteString UUID
u

    case [(ByteString, ByteString, ByteString)]
ifcs of
      [] -> ByteString -> IO ()
forall b. ByteString -> IO b
exitFail ByteString
"No interfaces found"
      ifaces :: [(ByteString, ByteString, ByteString)]
ifaces@((ByteString
_ifcname, ByteString
ipv4, ByteString
_ipv6):[(ByteString, ByteString, ByteString)]
_) -> do
        Int
zrePort <- ByteString -> IO Int
randPort ByteString
ipv4

        let zreEndpoint :: Endpoint
zreEndpoint = ByteString -> Int -> Endpoint
newTCPEndpoint ByteString
ipv4 Int
zrePort
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
zreDbg (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> IO ()
B.putStrLn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString
"Starting with " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> (Endpoint -> ByteString
forall a. Show a => a -> ByteString
bshow Endpoint
zreEndpoint)

        ByteString
zreName <- ByteString -> IO ByteString
getName ByteString
zreNamed

        -- 1M events both ways, not sure about this
        TBQueue Event
inQ <- STM (TBQueue Event) -> IO (TBQueue Event)
forall a. STM a -> IO a
atomically (STM (TBQueue Event) -> IO (TBQueue Event))
-> STM (TBQueue Event) -> IO (TBQueue Event)
forall a b. (a -> b) -> a -> b
$ Natural -> STM (TBQueue Event)
forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
1000000
        TBQueue API
outQ <- STM (TBQueue API) -> IO (TBQueue API)
forall a. STM a -> IO a
atomically (STM (TBQueue API) -> IO (TBQueue API))
-> STM (TBQueue API) -> IO (TBQueue API)
forall a b. (a -> b) -> a -> b
$ Natural -> STM (TBQueue API)
forall a. Natural -> STM (TBQueue a)
newTBQueue Natural
1000000

        TVar ZREState
s <- ByteString
-> Endpoint
-> UUID
-> TBQueue Event
-> TBQueue API
-> Bool
-> ZRECfg
-> IO (TVar ZREState)
newZREState ByteString
zreName Endpoint
zreEndpoint UUID
u TBQueue Event
inQ TBQueue API
outQ Bool
zreDbg ZRECfg
cfg

        -- FIXME: support multiple gossip clients
        case Maybe Endpoint
zreZGossip of
          Maybe Endpoint
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          Just Endpoint
end -> IO (Async ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Async ()) -> IO ()) -> IO (Async ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ ByteString -> Endpoint -> Endpoint -> (ZGSMsg -> IO ()) -> IO ()
zgossipClient ByteString
uuid Endpoint
end Endpoint
zreEndpoint (TBQueue API -> ZGSMsg -> IO ()
zgossipZRE TBQueue API
outQ)

        (AddrInfo
mCastAddr:[AddrInfo]
_) <- Endpoint -> IO [AddrInfo]
toAddrInfo Endpoint
zreMCast
        Async ()
_beaconAsync <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Float -> AddrInfo -> ByteString -> Int -> IO ()
beacon Float
zreBeaconPeriod AddrInfo
mCastAddr ByteString
uuid Int
zrePort
        Async Any
_beaconRecvAsync <- IO Any -> IO (Async Any)
forall a. IO a -> IO (Async a)
async (IO Any -> IO (Async Any)) -> IO Any -> IO (Async Any)
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> Endpoint -> IO Any
forall b. TVar ZREState -> Endpoint -> IO b
beaconRecv TVar ZREState
s Endpoint
zreMCast

        ((ByteString, ByteString, ByteString) -> IO ())
-> [(ByteString, ByteString, ByteString)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TVar ZREState
-> Int -> (ByteString, ByteString, ByteString) -> IO ()
forall a.
Show a =>
TVar ZREState -> Int -> (ByteString, ByteString, a) -> IO ()
runIface TVar ZREState
s Int
zrePort) [(ByteString, ByteString, ByteString)]
ifaces

        Async ()
apiAsync <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> IO ()
api TVar ZREState
s
        Int -> IO ()
threadDelay Int
500000
        Async a
_userAppAsync <- IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
async
          (IO a -> IO (Async a)) -> IO a -> IO (Async a)
forall a b. (a -> b) -> a -> b
$ ZRE a -> TBQueue Event -> TBQueue API -> IO a
forall a. ZRE a -> TBQueue Event -> TBQueue API -> IO a
runZ
              (ZRE a
app
                ZRE a -> (SomeException -> ZRE a) -> ZRE a
forall (m :: * -> *) e a.
(MonadBaseControl IO m, Exception e) =>
m a -> (e -> m a) -> m a
`Control.Exception.Lifted.catch`
                (\SomeException
e -> do let err :: String
err = SomeException -> String
forall a. Show a => a -> String
show (SomeException
e :: SomeException) in String -> ZRE a
forall a. String -> ZRE a
zfail String
err)
                ZRE a -> ZRE () -> ZRE a
forall (m :: * -> *) a b.
MonadBaseControl IO m =>
m a -> m b -> m a
`Control.Exception.Lifted.finally`
                ZRE ()
zquit
              )
              TBQueue Event
inQ
              TBQueue API
outQ

        Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
apiAsync
        () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

api :: TVar ZREState -> IO ()
api :: TVar ZREState -> IO ()
api TVar ZREState
s = do
  API
a <- STM API -> IO API
forall a. STM a -> IO a
atomically (STM API -> IO API) -> STM API -> IO API
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> STM ZREState
forall a. TVar a -> STM a
readTVar TVar ZREState
s STM ZREState -> (ZREState -> STM API) -> STM API
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= TBQueue API -> STM API
forall a. TBQueue a -> STM a
readTBQueue (TBQueue API -> STM API)
-> (ZREState -> TBQueue API) -> ZREState -> STM API
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ZREState -> TBQueue API
zreOut
  TVar ZREState -> API -> IO ()
handleApi TVar ZREState
s API
a
  case API
a of
    API
DoQuit -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    API
_ -> TVar ZREState -> IO ()
api TVar ZREState
s

handleApi :: TVar ZREState -> API -> IO ()
handleApi :: TVar ZREState -> API -> IO ()
handleApi TVar ZREState
s API
act = do
  case API
act of
    DoJoin Group
group -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      STM ()
incGroupSeq
      TVar ZREState -> (ZREState -> ZREState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s ((ZREState -> ZREState) -> STM ())
-> (ZREState -> ZREState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zreGroups :: Groups
zreGroups = Group -> Groups -> Groups
forall a. Ord a => a -> Set a -> Set a
S.insert Group
group (ZREState -> Groups
zreGroups ZREState
x) }
      ZREState
st <- TVar ZREState -> STM ZREState
forall a. TVar a -> STM a
readTVar TVar ZREState
s
      TVar ZREState -> Group -> Int -> STM ()
msgAllJoin TVar ZREState
s Group
group (ZREState -> Int
zreGroupSeq ZREState
st)

    DoLeave Group
group -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      STM ()
incGroupSeq
      TVar ZREState -> (ZREState -> ZREState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s ((ZREState -> ZREState) -> STM ())
-> (ZREState -> ZREState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zreGroups :: Groups
zreGroups = Group -> Groups -> Groups
forall a. Ord a => a -> Set a -> Set a
S.delete Group
group (ZREState -> Groups
zreGroups ZREState
x) }
      ZREState
st <- TVar ZREState -> STM ZREState
forall a. TVar a -> STM a
readTVar TVar ZREState
s
      TVar ZREState -> Group -> Int -> STM ()
msgAllLeave TVar ZREState
s Group
group (ZREState -> Int
zreGroupSeq ZREState
st)

    DoShout Group
group ByteString
msg -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> Group -> ByteString -> STM ()
shoutGroup TVar ZREState
s Group
group ByteString
msg
    DoShoutMulti Group
group [ByteString]
mmsg -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> Group -> [ByteString] -> STM ()
shoutGroupMulti TVar ZREState
s Group
group [ByteString]
mmsg
    DoWhisper UUID
uuid ByteString
msg -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> UUID -> ByteString -> STM ()
whisperPeerUUID TVar ZREState
s UUID
uuid ByteString
msg

    DoDiscover UUID
uuid Endpoint
endpoint -> do
      Maybe (TVar Peer)
mp <- STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer))
forall a. STM a -> IO a
atomically (STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer)))
-> STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer))
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer TVar ZREState
s UUID
uuid
      case Maybe (TVar Peer)
mp of
        Just TVar Peer
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Maybe (TVar Peer)
Nothing -> do
          IO (TVar Peer) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (TVar Peer) -> IO ()) -> IO (TVar Peer) -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState
-> UUID
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid ((UTCTime
  -> UUID
  -> TVar ZREState
  -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
 -> IO (TVar Peer))
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
forall a b. (a -> b) -> a -> b
$ Endpoint
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ()))
forall (m :: * -> *) a b.
MonadIO m =>
Endpoint
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromEndpoint Endpoint
endpoint

    DoDebug Bool
bool -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> (ZREState -> ZREState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s ((ZREState -> ZREState) -> STM ())
-> (ZREState -> ZREState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zreDebug :: Bool
zreDebug = Bool
bool }

    API
DoQuit -> do
      let chk :: IO Bool
chk = STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
                   ZREState
s' <- TVar ZREState -> STM ZREState
forall a. TVar a -> STM a
readTVar TVar ZREState
s
                   [Bool]
pqs <- [(UUID, TVar Peer)]
-> ((UUID, TVar Peer) -> STM Bool) -> STM [Bool]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM (Map UUID (TVar Peer) -> [(UUID, TVar Peer)]
forall k a. Map k a -> [(k, a)]
M.toList (Map UUID (TVar Peer) -> [(UUID, TVar Peer)])
-> Map UUID (TVar Peer) -> [(UUID, TVar Peer)]
forall a b. (a -> b) -> a -> b
$ ZREState -> Map UUID (TVar Peer)
zrePeers ZREState
s') (((UUID, TVar Peer) -> STM Bool) -> STM [Bool])
-> ((UUID, TVar Peer) -> STM Bool) -> STM [Bool]
forall a b. (a -> b) -> a -> b
$ \(UUID
_, TVar Peer
tp) -> TVar Peer -> STM Peer
forall a. TVar a -> STM a
readTVar TVar Peer
tp STM Peer -> (Peer -> STM Bool) -> STM Bool
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= TBQueue ZRECmd -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue (TBQueue ZRECmd -> STM Bool)
-> (Peer -> TBQueue ZRECmd) -> Peer -> STM Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Peer -> TBQueue ZRECmd
peerQueue
                   Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> STM Bool) -> Bool -> STM Bool
forall a b. (a -> b) -> a -> b
$ [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
and [Bool]
pqs

      let loop :: IO ()
loop = do
              Bool
res <- IO Bool
chk
              Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
res (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Float -> Int
forall a. RealFrac a => a -> Int
sec (Float
0.1 :: Float)) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop

      IO ()
loop
  where
    incGroupSeq :: STM ()
incGroupSeq = TVar ZREState -> (ZREState -> ZREState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s ((ZREState -> ZREState) -> STM ())
-> (ZREState -> ZREState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \ZREState
x -> ZREState
x { zreGroupSeq :: Int
zreGroupSeq = (ZREState -> Int
zreGroupSeq ZREState
x) Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 }

-- handles incoming ZRE messages
-- creates peers, updates state
inbox :: TVar ZREState -> Z.ZREMsg -> IO ()
inbox :: TVar ZREState -> ZREMsg -> IO ()
inbox TVar ZREState
s msg :: ZREMsg
msg@Z.ZREMsg{Int
Maybe UTCTime
Maybe UUID
ZRECmd
msgCmd :: ZREMsg -> ZRECmd
msgTime :: ZREMsg -> Maybe UTCTime
msgSeq :: ZREMsg -> Int
msgFrom :: ZREMsg -> Maybe UUID
msgCmd :: ZRECmd
msgTime :: Maybe UTCTime
msgSeq :: Int
msgFrom :: Maybe UUID
..} = do
  let uuid :: UUID
uuid = Maybe UUID -> UUID
forall a. HasCallStack => Maybe a -> a
fromJust Maybe UUID
msgFrom

  -- print msg , "state pre-msg", printAll s

  Maybe (TVar Peer)
mpt <- STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer))
forall a. STM a -> IO a
atomically (STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer)))
-> STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer))
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer TVar ZREState
s UUID
uuid
  case Maybe (TVar Peer)
mpt of
    Maybe (TVar Peer)
Nothing -> do
      case ZRECmd
msgCmd of
        -- if the peer is not known but a message is HELLO we create a new
        -- peer, for other messages we don't know the endpoint to connect to
        h :: ZRECmd
h@(Z.Hello Endpoint
_endpoint Groups
_groups Int
_groupSeq ByteString
_name Headers
_headers) -> do
          TVar Peer
peer <- TVar ZREState
-> UUID
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid ((UTCTime
  -> UUID
  -> TVar ZREState
  -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
 -> IO (TVar Peer))
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
forall a b. (a -> b) -> a -> b
$ ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ()))
forall (m :: * -> *) a b.
MonadIO m =>
ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromHello ZRECmd
h
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer ((Peer -> Peer) -> STM ()) -> (Peer -> Peer) -> STM ()
forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerSeq :: Int
peerSeq = (Peer -> Int
peerSeq Peer
x) Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 }
        -- silently drop any other messages
        ZRECmd
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    (Just TVar Peer
peer) -> do
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Peer -> UTCTime -> STM ()
updateLastHeard TVar Peer
peer (UTCTime -> STM ()) -> UTCTime -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe UTCTime -> UTCTime
forall a. HasCallStack => Maybe a -> a
fromJust Maybe UTCTime
msgTime

      -- destroy/re-start peer when this doesn't match
      Peer
p <- STM Peer -> IO Peer
forall a. STM a -> IO a
atomically (STM Peer -> IO Peer) -> STM Peer -> IO Peer
forall a b. (a -> b) -> a -> b
$ TVar Peer -> STM Peer
forall a. TVar a -> STM a
readTVar TVar Peer
peer
      case Peer -> Int
peerSeq Peer
p Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
msgSeq of
        Bool
True -> do
          -- rename to peerExpectSeq, need to update at line 127 too
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer ((Peer -> Peer) -> STM ()) -> (Peer -> Peer) -> STM ()
forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerSeq :: Int
peerSeq = (Peer -> Int
peerSeq Peer
x) Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 }
          TVar ZREState -> ZREMsg -> TVar Peer -> IO ()
handleCmd TVar ZREState
s ZREMsg
msg TVar Peer
peer
        Bool
_ -> do
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s ByteString
"sequence mismatch, recreating peer"
          UUID -> ZRECmd -> IO ()
recreatePeer (Peer -> UUID
peerUUID Peer
p) ZRECmd
msgCmd

  -- "state post-msg", printAll s
  where
    recreatePeer :: UUID -> ZRECmd -> IO ()
recreatePeer UUID
uuid h :: ZRECmd
h@(Z.Hello Endpoint
_ Groups
_ Int
_ ByteString
_ Headers
_) = do
          TVar ZREState -> UUID -> IO ()
destroyPeer TVar ZREState
s UUID
uuid
          TVar Peer
peer <- TVar ZREState
-> UUID
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid ((UTCTime
  -> UUID
  -> TVar ZREState
  -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
 -> IO (TVar Peer))
-> (UTCTime
    -> UUID
    -> TVar ZREState
    -> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
forall a b. (a -> b) -> a -> b
$ ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ()))
forall (m :: * -> *) a b.
MonadIO m =>
ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromHello ZRECmd
h
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer ((Peer -> Peer) -> STM ()) -> (Peer -> Peer) -> STM ()
forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x { peerSeq :: Int
peerSeq = (Peer -> Int
peerSeq Peer
x) Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1 }
    recreatePeer UUID
uuid ZRECmd
_ = TVar ZREState -> UUID -> IO ()
destroyPeer TVar ZREState
s UUID
uuid

handleCmd :: TVar ZREState -> Z.ZREMsg -> TVar Peer -> IO ()
handleCmd :: TVar ZREState -> ZREMsg -> TVar Peer -> IO ()
handleCmd TVar ZREState
s Z.ZREMsg{msgFrom :: ZREMsg -> Maybe UUID
msgFrom=(Just UUID
from), msgTime :: ZREMsg -> Maybe UTCTime
msgTime=(Just UTCTime
time), msgCmd :: ZREMsg -> ZRECmd
msgCmd=ZRECmd
cmd} TVar Peer
peer = do
      case ZRECmd
cmd of
        (Z.Whisper [ByteString]
content) -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          TVar ZREState -> Event -> STM ()
emit TVar ZREState
s (Event -> STM ()) -> Event -> STM ()
forall a b. (a -> b) -> a -> b
$ UUID -> [ByteString] -> UTCTime -> Event
Whisper UUID
from [ByteString]
content UTCTime
time
          TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" " [ByteString
"whisper", [ByteString] -> ByteString
B.concat [ByteString]
content]

        Z.Shout Group
group [ByteString]
content -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          TVar ZREState -> Event -> STM ()
emit TVar ZREState
s (Event -> STM ()) -> Event -> STM ()
forall a b. (a -> b) -> a -> b
$ UUID -> Group -> [ByteString] -> UTCTime -> Event
Shout UUID
from Group
group [ByteString]
content UTCTime
time
          TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" " [ByteString
"shout for group", Group -> ByteString
Z.unGroup Group
group, ByteString
">", [ByteString] -> ByteString
B.concat [ByteString]
content]

        Z.Join Group
group Int
groupSeq -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          TVar ZREState -> TVar Peer -> Group -> Int -> STM ()
joinGroup TVar ZREState
s TVar Peer
peer Group
group Int
groupSeq
          TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" " [ByteString
"join", Group -> ByteString
Z.unGroup Group
group, Int -> ByteString
forall a. Show a => a -> ByteString
bshow Int
groupSeq]

        Z.Leave Group
group Int
groupSeq -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          TVar ZREState -> TVar Peer -> Group -> Int -> STM ()
leaveGroup TVar ZREState
s TVar Peer
peer Group
group Int
groupSeq
          TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate ByteString
" " [ByteString
"leave", Group -> ByteString
Z.unGroup Group
group, Int -> ByteString
forall a. Show a => a -> ByteString
bshow Int
groupSeq]

        ZRECmd
Z.Ping -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
          TVar Peer -> ZRECmd -> STM ()
msgPeer TVar Peer
peer ZRECmd
Z.PingOk
          Peer
p <- TVar Peer -> STM Peer
forall a. TVar a -> STM a
readTVar TVar Peer
peer
          TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
B.unwords [ByteString
"sending pings to ", Peer -> ByteString
forall a. Show a => a -> ByteString
bshow Peer
p]
        ZRECmd
Z.PingOk -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Z.Hello Endpoint
endpoint Groups
groups Int
groupSeq ByteString
name Headers
headers -> do
          -- if this peer was already registered
          -- (e.g. from beacon) update appropriate data
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            TVar ZREState -> TVar Peer -> Groups -> Int -> STM ()
joinGroups TVar ZREState
s TVar Peer
peer Groups
groups Int
groupSeq
            TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer ((Peer -> Peer) -> STM ()) -> (Peer -> Peer) -> STM ()
forall a b. (a -> b) -> a -> b
$ \Peer
x -> Peer
x {
                         peerName :: Maybe ByteString
peerName = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
name
                       , peerHeaders :: Headers
peerHeaders = Headers
headers
                       }
            Peer
p <- TVar Peer -> STM Peer
forall a. TVar a -> STM a
readTVar TVar Peer
peer
            TVar ZREState -> Event -> STM ()
emit TVar ZREState
s (Event -> STM ()) -> Event -> STM ()
forall a b. (a -> b) -> a -> b
$ UUID -> ByteString -> Groups -> Headers -> Endpoint -> Event
Ready (Peer -> UUID
peerUUID Peer
p) ByteString
name Groups
groups Headers
headers Endpoint
endpoint
            TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString
"update peer"
          () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleCmd TVar ZREState
_ ZREMsg
_ TVar Peer
_ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()