{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.ZRE (
runZre
, runZreCfg
, runZreOpts
, readZ
, writeZ
, unReadZ
, defaultConf
, API(..)
, Event(..)
, ZRE
, Z.Group
, zjoin
, zleave
, zshout
, zshout'
, zwhisper
, zdebug
, znodebug
, zquit
, zrecv
, pEndpoint
, toASCIIBytes
, getApiQueue
, getEventQueue
, module Network.ZRE.Lib
) where
import Prelude hiding (putStrLn, take)
import Control.Monad hiding (join)
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Data.UUID
import Data.UUID.V1
import Data.Maybe
import qualified Data.Set as S
import qualified Data.Map as M
import qualified Data.ByteString.Char8 as B
import qualified Data.ZRE as Z
import Network.ZRE.Beacon
import Network.ZRE.Config
import Network.ZRE.Lib
import Network.ZRE.Options
import Network.ZRE.Peer
import Network.ZRE.Types
import Network.ZRE.Utils
import Network.ZRE.ZMQ
import Network.ZGossip
import System.ZMQ4.Endpoint
import Options.Applicative
import Data.Semigroup ((<>))
runZreOpts :: ZRE a -> IO ()
runZreOpts :: ZRE a -> IO ()
runZreOpts app :: ZRE a
app = do
ZRECfg
cfg <- ParserInfo ZRECfg -> IO ZRECfg
forall a. ParserInfo a -> IO a
execParser ParserInfo ZRECfg
opts
ZRECfg -> ZRE a -> IO ()
forall a. ZRECfg -> ZRE a -> IO ()
runZreCfg ZRECfg
cfg ZRE a
app
where
opts :: ParserInfo ZRECfg
opts = Parser ZRECfg -> InfoMod ZRECfg -> ParserInfo ZRECfg
forall a. Parser a -> InfoMod a -> ParserInfo a
info (Parser ZRECfg
parseOptions Parser ZRECfg -> Parser (ZRECfg -> ZRECfg) -> Parser ZRECfg
forall (f :: * -> *) a b. Applicative f => f a -> f (a -> b) -> f b
<**> Parser (ZRECfg -> ZRECfg)
forall a. Parser (a -> a)
helper)
( InfoMod ZRECfg
forall a. InfoMod a
fullDesc
InfoMod ZRECfg -> InfoMod ZRECfg -> InfoMod ZRECfg
forall a. Semigroup a => a -> a -> a
<> String -> InfoMod ZRECfg
forall a. String -> InfoMod a
progDesc "ZRE"
InfoMod ZRECfg -> InfoMod ZRECfg -> InfoMod ZRECfg
forall a. Semigroup a => a -> a -> a
<> String -> InfoMod ZRECfg
forall a. String -> InfoMod a
header "zre tools" )
getIfaces :: [B.ByteString]
-> IO [(B.ByteString, B.ByteString, B.ByteString)]
getIfaces :: [ByteString] -> IO [(ByteString, ByteString, ByteString)]
getIfaces ifcs :: [ByteString]
ifcs = do
case [ByteString]
ifcs of
[] -> do
Maybe (ByteString, ByteString)
dr <- IO (Maybe (ByteString, ByteString))
getDefRoute
case Maybe (ByteString, ByteString)
dr of
Nothing -> ByteString -> IO [(ByteString, ByteString, ByteString)]
forall b. ByteString -> IO b
exitFail "Unable to get default route"
Just (_route :: ByteString
_route, iface :: ByteString
iface) -> do
(ByteString, ByteString, ByteString)
i <- ByteString -> IO (ByteString, ByteString, ByteString)
getIfaceReport ByteString
iface
[(ByteString, ByteString, ByteString)]
-> IO [(ByteString, ByteString, ByteString)]
forall (m :: * -> *) a. Monad m => a -> m a
return ([(ByteString, ByteString, ByteString)]
-> IO [(ByteString, ByteString, ByteString)])
-> [(ByteString, ByteString, ByteString)]
-> IO [(ByteString, ByteString, ByteString)]
forall a b. (a -> b) -> a -> b
$ [(ByteString, ByteString, ByteString)
i]
x :: [ByteString]
x -> do
[ByteString]
-> (ByteString -> IO (ByteString, ByteString, ByteString))
-> IO [(ByteString, ByteString, ByteString)]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [ByteString]
x ByteString -> IO (ByteString, ByteString, ByteString)
getIfaceReport
runIface :: Show a
=> TVar ZREState
-> Int
-> (B.ByteString, B.ByteString, a)
-> IO ()
runIface :: TVar ZREState -> Int -> (ByteString, ByteString, a) -> IO ()
runIface s :: TVar ZREState
s port :: Int
port (iface :: ByteString
iface, ipv4 :: ByteString
ipv4, ipv6 :: a
ipv6) = do
Async ()
r <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ Endpoint -> (ZREMsg -> IO ()) -> IO ()
forall (m :: * -> *) a1 a.
MonadIO m =>
Endpoint -> (ZREMsg -> IO a1) -> m a
zreRouter (ByteString -> Int -> Endpoint
newTCPEndpoint ByteString
ipv4 Int
port) (TVar ZREState -> ZREMsg -> IO ()
inbox TVar ZREState
s)
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> (ZREState -> ZREState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s ((ZREState -> ZREState) -> STM ())
-> (ZREState -> ZREState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: ZREState
x ->
ZREState
x { zreIfaces :: Map ByteString [Async ()]
zreIfaces = ByteString
-> [Async ()]
-> Map ByteString [Async ()]
-> Map ByteString [Async ()]
forall k a. Ord k => k -> a -> Map k a -> Map k a
M.insert ByteString
iface [Async ()
r] (ZREState -> Map ByteString [Async ()]
zreIfaces ZREState
x) }
runZre :: ZRE a -> IO ()
runZre :: ZRE a -> IO ()
runZre a :: ZRE a
a = do
ZRECfg
cfg <- IO ZRECfg
envZRECfg
ZRECfg -> ZRE a -> IO ()
forall a. ZRECfg -> ZRE a -> IO ()
runZreCfg ZRECfg
cfg ZRE a
a
runZreCfg :: ZRECfg -> ZRE a -> IO ()
runZreCfg :: ZRECfg -> ZRE a -> IO ()
runZreCfg ZRECfg{..} app :: ZRE a
app = do
[(ByteString, ByteString, ByteString)]
ifcs <- [ByteString] -> IO [(ByteString, ByteString, ByteString)]
getIfaces [ByteString]
zreInterfaces
UUID
u <- IO UUID -> (UUID -> IO UUID) -> IO (Maybe UUID) -> IO UUID
forall (m :: * -> *) b a.
Monad m =>
m b -> (a -> m b) -> m (Maybe a) -> m b
maybeM (ByteString -> IO UUID
forall b. ByteString -> IO b
exitFail "Unable to get UUID") UUID -> IO UUID
forall (m :: * -> *) a. Monad m => a -> m a
return IO (Maybe UUID)
nextUUID
let uuid :: ByteString
uuid = UUID -> ByteString
uuidByteString UUID
u
case [(ByteString, ByteString, ByteString)]
ifcs of
[] -> ByteString -> IO ()
forall b. ByteString -> IO b
exitFail "No interfaces found"
ifaces :: [(ByteString, ByteString, ByteString)]
ifaces@((_ifcname :: ByteString
_ifcname, ipv4 :: ByteString
ipv4, _ipv6 :: ByteString
_ipv6):_) -> do
Int
zrePort <- ByteString -> IO Int
randPort ByteString
ipv4
let zreEndpoint :: Endpoint
zreEndpoint = ByteString -> Int -> Endpoint
newTCPEndpoint ByteString
ipv4 Int
zrePort
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
zreDbg (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> IO ()
B.putStrLn (ByteString -> IO ()) -> ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ "Starting with " ByteString -> ByteString -> ByteString
forall a. Semigroup a => a -> a -> a
<> (Endpoint -> ByteString
forall a. Show a => a -> ByteString
bshow Endpoint
zreEndpoint)
ByteString
zreName <- ByteString -> IO ByteString
getName ByteString
zreNamed
TBQueue Event
inQ <- STM (TBQueue Event) -> IO (TBQueue Event)
forall a. STM a -> IO a
atomically (STM (TBQueue Event) -> IO (TBQueue Event))
-> STM (TBQueue Event) -> IO (TBQueue Event)
forall a b. (a -> b) -> a -> b
$ Natural -> STM (TBQueue Event)
forall a. Natural -> STM (TBQueue a)
newTBQueue 1000000
TBQueue API
outQ <- STM (TBQueue API) -> IO (TBQueue API)
forall a. STM a -> IO a
atomically (STM (TBQueue API) -> IO (TBQueue API))
-> STM (TBQueue API) -> IO (TBQueue API)
forall a b. (a -> b) -> a -> b
$ Natural -> STM (TBQueue API)
forall a. Natural -> STM (TBQueue a)
newTBQueue 1000000
TVar ZREState
s <- ByteString
-> Endpoint
-> UUID
-> TBQueue Event
-> TBQueue API
-> Bool
-> IO (TVar ZREState)
newZREState ByteString
zreName Endpoint
zreEndpoint UUID
u TBQueue Event
inQ TBQueue API
outQ Bool
zreDbg
case Maybe Endpoint
zreZGossip of
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Just end :: Endpoint
end -> IO (Async ()) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Async ()) -> IO ()) -> IO (Async ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ ByteString -> Endpoint -> Endpoint -> (ZGSMsg -> IO ()) -> IO ()
zgossipClient ByteString
uuid Endpoint
end Endpoint
zreEndpoint (TBQueue API -> ZGSMsg -> IO ()
zgossipZRE TBQueue API
outQ)
(mCastAddr :: AddrInfo
mCastAddr:_) <- Endpoint -> IO [AddrInfo]
toAddrInfo Endpoint
zreMCast
Async Any
_beaconAsync <- IO Any -> IO (Async Any)
forall a. IO a -> IO (Async a)
async (IO Any -> IO (Async Any)) -> IO Any -> IO (Async Any)
forall a b. (a -> b) -> a -> b
$ AddrInfo -> ByteString -> Int -> IO Any
forall a. AddrInfo -> ByteString -> Int -> IO a
beacon AddrInfo
mCastAddr ByteString
uuid Int
zrePort
Async Any
_beaconRecvAsync <- IO Any -> IO (Async Any)
forall a. IO a -> IO (Async a)
async (IO Any -> IO (Async Any)) -> IO Any -> IO (Async Any)
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> Endpoint -> IO Any
forall b. TVar ZREState -> Endpoint -> IO b
beaconRecv TVar ZREState
s Endpoint
zreMCast
((ByteString, ByteString, ByteString) -> IO ())
-> [(ByteString, ByteString, ByteString)] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (TVar ZREState
-> Int -> (ByteString, ByteString, ByteString) -> IO ()
forall a.
Show a =>
TVar ZREState -> Int -> (ByteString, ByteString, a) -> IO ()
runIface TVar ZREState
s Int
zrePort) [(ByteString, ByteString, ByteString)]
ifaces
Async ()
apiAsync <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> IO ()
api TVar ZREState
s
Int -> IO ()
threadDelay 500000
Async ()
_userAppAsync <- IO () -> IO (Async ())
forall a. IO a -> IO (Async a)
async (IO () -> IO (Async ())) -> IO () -> IO (Async ())
forall a b. (a -> b) -> a -> b
$ ZRE () -> TBQueue Event -> TBQueue API -> IO ()
forall a. ZRE a -> TBQueue Event -> TBQueue API -> IO a
runZ (ZRE a
app ZRE a -> ZRE () -> ZRE ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> ZRE ()
zquit) TBQueue Event
inQ TBQueue API
outQ
Async () -> IO ()
forall a. Async a -> IO a
wait Async ()
apiAsync
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
api :: TVar ZREState -> IO ()
api :: TVar ZREState -> IO ()
api s :: TVar ZREState
s = do
API
a <- STM API -> IO API
forall a. STM a -> IO a
atomically (STM API -> IO API) -> STM API -> IO API
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> STM ZREState
forall a. TVar a -> STM a
readTVar TVar ZREState
s STM ZREState -> (ZREState -> STM API) -> STM API
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= TBQueue API -> STM API
forall a. TBQueue a -> STM a
readTBQueue (TBQueue API -> STM API)
-> (ZREState -> TBQueue API) -> ZREState -> STM API
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ZREState -> TBQueue API
zreOut
TVar ZREState -> API -> IO ()
handleApi TVar ZREState
s API
a
case API
a of
DoQuit -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
_ -> TVar ZREState -> IO ()
api TVar ZREState
s
handleApi :: TVar ZREState -> API -> IO ()
handleApi :: TVar ZREState -> API -> IO ()
handleApi s :: TVar ZREState
s act :: API
act = do
case API
act of
DoJoin group :: ByteString
group -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
STM ()
incGroupSeq
TVar ZREState -> (ZREState -> ZREState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s ((ZREState -> ZREState) -> STM ())
-> (ZREState -> ZREState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: ZREState
x -> ZREState
x { zreGroups :: Groups
zreGroups = ByteString -> Groups -> Groups
forall a. Ord a => a -> Set a -> Set a
S.insert ByteString
group (ZREState -> Groups
zreGroups ZREState
x) }
ZREState
st <- TVar ZREState -> STM ZREState
forall a. TVar a -> STM a
readTVar TVar ZREState
s
TVar ZREState -> ByteString -> Int -> STM ()
msgAllJoin TVar ZREState
s ByteString
group (ZREState -> Int
zreGroupSeq ZREState
st)
DoLeave group :: ByteString
group -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
STM ()
incGroupSeq
TVar ZREState -> (ZREState -> ZREState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s ((ZREState -> ZREState) -> STM ())
-> (ZREState -> ZREState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: ZREState
x -> ZREState
x { zreGroups :: Groups
zreGroups = ByteString -> Groups -> Groups
forall a. Ord a => a -> Set a -> Set a
S.delete ByteString
group (ZREState -> Groups
zreGroups ZREState
x) }
ZREState
st <- TVar ZREState -> STM ZREState
forall a. TVar a -> STM a
readTVar TVar ZREState
s
TVar ZREState -> ByteString -> Int -> STM ()
msgAllLeave TVar ZREState
s ByteString
group (ZREState -> Int
zreGroupSeq ZREState
st)
DoShout group :: ByteString
group msg :: ByteString
msg -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> ByteString -> ByteString -> STM ()
shoutGroup TVar ZREState
s ByteString
group ByteString
msg
DoShoutMulti group :: ByteString
group mmsg :: [ByteString]
mmsg -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> ByteString -> [ByteString] -> STM ()
shoutGroupMulti TVar ZREState
s ByteString
group [ByteString]
mmsg
DoWhisper uuid :: UUID
uuid msg :: ByteString
msg -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> UUID -> ByteString -> STM ()
whisperPeerUUID TVar ZREState
s UUID
uuid ByteString
msg
DoDiscover uuid :: UUID
uuid endpoint :: Endpoint
endpoint -> do
Maybe (TVar Peer)
mp <- STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer))
forall a. STM a -> IO a
atomically (STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer)))
-> STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer))
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer TVar ZREState
s UUID
uuid
case Maybe (TVar Peer)
mp of
Just _ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Nothing -> do
IO (TVar Peer) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (TVar Peer) -> IO ()) -> IO (TVar Peer) -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState
-> UUID
-> (UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid ((UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer))
-> (UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
forall a b. (a -> b) -> a -> b
$ Endpoint
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ()))
forall (m :: * -> *) a b.
MonadIO m =>
Endpoint
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromEndpoint Endpoint
endpoint
DoDebug bool :: Bool
bool -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> (ZREState -> ZREState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s ((ZREState -> ZREState) -> STM ())
-> (ZREState -> ZREState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: ZREState
x -> ZREState
x { zreDebug :: Bool
zreDebug = Bool
bool }
DoQuit -> do
let chk :: IO Bool
chk = STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ do
ZREState
s' <- TVar ZREState -> STM ZREState
forall a. TVar a -> STM a
readTVar TVar ZREState
s
[Bool]
pqs <- [(UUID, TVar Peer)]
-> ((UUID, TVar Peer) -> STM Bool) -> STM [Bool]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM (Map UUID (TVar Peer) -> [(UUID, TVar Peer)]
forall k a. Map k a -> [(k, a)]
M.toList (Map UUID (TVar Peer) -> [(UUID, TVar Peer)])
-> Map UUID (TVar Peer) -> [(UUID, TVar Peer)]
forall a b. (a -> b) -> a -> b
$ ZREState -> Map UUID (TVar Peer)
zrePeers ZREState
s') (((UUID, TVar Peer) -> STM Bool) -> STM [Bool])
-> ((UUID, TVar Peer) -> STM Bool) -> STM [Bool]
forall a b. (a -> b) -> a -> b
$ \(_, tp :: TVar Peer
tp) -> TVar Peer -> STM Peer
forall a. TVar a -> STM a
readTVar TVar Peer
tp STM Peer -> (Peer -> STM Bool) -> STM Bool
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= TBQueue ZRECmd -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue (TBQueue ZRECmd -> STM Bool)
-> (Peer -> TBQueue ZRECmd) -> Peer -> STM Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Peer -> TBQueue ZRECmd
peerQueue
Bool -> STM Bool
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> STM Bool) -> Bool -> STM Bool
forall a b. (a -> b) -> a -> b
$ [Bool] -> Bool
forall (t :: * -> *). Foldable t => t Bool -> Bool
and [Bool]
pqs
let loop :: IO ()
loop = do
Bool
res <- IO Bool
chk
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
res (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Float -> Int
forall a. RealFrac a => a -> Int
sec (0.1 :: Float)) IO () -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> IO ()
loop
IO ()
loop
where
incGroupSeq :: STM ()
incGroupSeq = TVar ZREState -> (ZREState -> ZREState) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar ZREState
s ((ZREState -> ZREState) -> STM ())
-> (ZREState -> ZREState) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: ZREState
x -> ZREState
x { zreGroupSeq :: Int
zreGroupSeq = (ZREState -> Int
zreGroupSeq ZREState
x) Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1 }
inbox :: TVar ZREState -> Z.ZREMsg -> IO ()
inbox :: TVar ZREState -> ZREMsg -> IO ()
inbox s :: TVar ZREState
s msg :: ZREMsg
msg@Z.ZREMsg{..} = do
let uuid :: UUID
uuid = Maybe UUID -> UUID
forall a. HasCallStack => Maybe a -> a
fromJust Maybe UUID
msgFrom
Maybe (TVar Peer)
mpt <- STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer))
forall a. STM a -> IO a
atomically (STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer)))
-> STM (Maybe (TVar Peer)) -> IO (Maybe (TVar Peer))
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> UUID -> STM (Maybe (TVar Peer))
lookupPeer TVar ZREState
s UUID
uuid
case Maybe (TVar Peer)
mpt of
Nothing -> do
case ZRECmd
msgCmd of
h :: ZRECmd
h@(Z.Hello _endpoint :: Endpoint
_endpoint _groups :: Groups
_groups _groupSeq :: Int
_groupSeq _name :: ByteString
_name _headers :: Headers
_headers) -> do
TVar Peer
peer <- TVar ZREState
-> UUID
-> (UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid ((UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer))
-> (UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
forall a b. (a -> b) -> a -> b
$ ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ()))
forall (m :: * -> *) a b.
MonadIO m =>
ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromHello ZRECmd
h
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer ((Peer -> Peer) -> STM ()) -> (Peer -> Peer) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: Peer
x -> Peer
x { peerSeq :: Int
peerSeq = (Peer -> Int
peerSeq Peer
x) Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1 }
_ -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
(Just peer :: TVar Peer
peer) -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Peer -> UTCTime -> STM ()
updateLastHeard TVar Peer
peer (UTCTime -> STM ()) -> UTCTime -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe UTCTime -> UTCTime
forall a. HasCallStack => Maybe a -> a
fromJust Maybe UTCTime
msgTime
Peer
p <- STM Peer -> IO Peer
forall a. STM a -> IO a
atomically (STM Peer -> IO Peer) -> STM Peer -> IO Peer
forall a b. (a -> b) -> a -> b
$ TVar Peer -> STM Peer
forall a. TVar a -> STM a
readTVar TVar Peer
peer
case Peer -> Int
peerSeq Peer
p Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
msgSeq of
True -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer ((Peer -> Peer) -> STM ()) -> (Peer -> Peer) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: Peer
x -> Peer
x { peerSeq :: Int
peerSeq = (Peer -> Int
peerSeq Peer
x) Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1 }
TVar ZREState -> ZREMsg -> TVar Peer -> IO ()
handleCmd TVar ZREState
s ZREMsg
msg TVar Peer
peer
_ -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s "sequence mismatch, recreating peer"
UUID -> ZRECmd -> IO ()
recreatePeer (Peer -> UUID
peerUUID Peer
p) ZRECmd
msgCmd
where
recreatePeer :: UUID -> ZRECmd -> IO ()
recreatePeer uuid :: UUID
uuid h :: ZRECmd
h@(Z.Hello _ _ _ _ _) = do
TVar ZREState -> UUID -> IO ()
destroyPeer TVar ZREState
s UUID
uuid
TVar Peer
peer <- TVar ZREState
-> UUID
-> (UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
makePeer TVar ZREState
s UUID
uuid ((UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer))
-> (UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ())))
-> IO (TVar Peer)
forall a b. (a -> b) -> a -> b
$ ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (IO ()), Maybe (IO ()))
forall (m :: * -> *) a b.
MonadIO m =>
ZRECmd
-> UTCTime
-> UUID
-> TVar ZREState
-> STM (TVar Peer, Maybe (m a), Maybe (IO b))
newPeerFromHello ZRECmd
h
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer ((Peer -> Peer) -> STM ()) -> (Peer -> Peer) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: Peer
x -> Peer
x { peerSeq :: Int
peerSeq = (Peer -> Int
peerSeq Peer
x) Int -> Int -> Int
forall a. Num a => a -> a -> a
+ 1 }
recreatePeer uuid :: UUID
uuid _ = TVar ZREState -> UUID -> IO ()
destroyPeer TVar ZREState
s UUID
uuid
handleCmd :: TVar ZREState -> Z.ZREMsg -> TVar Peer -> IO ()
handleCmd :: TVar ZREState -> ZREMsg -> TVar Peer -> IO ()
handleCmd s :: TVar ZREState
s Z.ZREMsg{msgFrom :: ZREMsg -> Maybe UUID
msgFrom=(Just from :: UUID
from), msgTime :: ZREMsg -> Maybe UTCTime
msgTime=(Just time :: UTCTime
time), msgCmd :: ZREMsg -> ZRECmd
msgCmd=ZRECmd
cmd} peer :: TVar Peer
peer = do
case ZRECmd
cmd of
(Z.Whisper content :: [ByteString]
content) -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar ZREState -> Event -> STM ()
emit TVar ZREState
s (Event -> STM ()) -> Event -> STM ()
forall a b. (a -> b) -> a -> b
$ UUID -> [ByteString] -> UTCTime -> Event
Whisper UUID
from [ByteString]
content UTCTime
time
TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate " " ["whisper", [ByteString] -> ByteString
B.concat [ByteString]
content]
Z.Shout group :: ByteString
group content :: [ByteString]
content -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar ZREState -> Event -> STM ()
emit TVar ZREState
s (Event -> STM ()) -> Event -> STM ()
forall a b. (a -> b) -> a -> b
$ UUID -> ByteString -> [ByteString] -> UTCTime -> Event
Shout UUID
from ByteString
group [ByteString]
content UTCTime
time
TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate " " ["shout for group", ByteString
group, ">", [ByteString] -> ByteString
B.concat [ByteString]
content]
Z.Join group :: ByteString
group groupSeq :: Int
groupSeq -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar ZREState -> TVar Peer -> ByteString -> Int -> STM ()
joinGroup TVar ZREState
s TVar Peer
peer ByteString
group Int
groupSeq
TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate " " ["join", ByteString
group, Int -> ByteString
forall a. Show a => a -> ByteString
bshow Int
groupSeq]
Z.Leave group :: ByteString
group groupSeq :: Int
groupSeq -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar ZREState -> TVar Peer -> ByteString -> Int -> STM ()
leaveGroup TVar ZREState
s TVar Peer
peer ByteString
group Int
groupSeq
TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ ByteString -> [ByteString] -> ByteString
B.intercalate " " ["leave", ByteString
group, Int -> ByteString
forall a. Show a => a -> ByteString
bshow Int
groupSeq]
Z.Ping -> STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar Peer -> ZRECmd -> STM ()
msgPeer TVar Peer
peer ZRECmd
Z.PingOk
Peer
p <- TVar Peer -> STM Peer
forall a. TVar a -> STM a
readTVar TVar Peer
peer
TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
B.unwords ["sending pings to ", Peer -> ByteString
forall a. Show a => a -> ByteString
bshow Peer
p]
Z.PingOk -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Z.Hello endpoint :: Endpoint
endpoint groups :: Groups
groups groupSeq :: Int
groupSeq name :: ByteString
name headers :: Headers
headers -> do
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
TVar ZREState -> TVar Peer -> Groups -> Int -> STM ()
joinGroups TVar ZREState
s TVar Peer
peer Groups
groups Int
groupSeq
TVar Peer -> (Peer -> Peer) -> STM ()
updatePeer TVar Peer
peer ((Peer -> Peer) -> STM ()) -> (Peer -> Peer) -> STM ()
forall a b. (a -> b) -> a -> b
$ \x :: Peer
x -> Peer
x {
peerName :: Maybe ByteString
peerName = ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
name
, peerHeaders :: Headers
peerHeaders = Headers
headers
}
Peer
p <- TVar Peer -> STM Peer
forall a. TVar a -> STM a
readTVar TVar Peer
peer
TVar ZREState -> Event -> STM ()
emit TVar ZREState
s (Event -> STM ()) -> Event -> STM ()
forall a b. (a -> b) -> a -> b
$ UUID -> ByteString -> Groups -> Headers -> Endpoint -> Event
Ready (Peer -> UUID
peerUUID Peer
p) ByteString
name Groups
groups Headers
headers Endpoint
endpoint
TVar ZREState -> ByteString -> STM ()
emitdbg TVar ZREState
s (ByteString -> STM ()) -> ByteString -> STM ()
forall a b. (a -> b) -> a -> b
$ "update peer"
() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
handleCmd _ _ _ = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()