module Database.EventStore.Internal.Discovery
( Discovery(..)
, GossipSeed
, DnsDiscoveryException(..)
, ClusterSettings(..)
, DnsServer(..)
, EndPoint(..)
, staticEndPointDiscovery
, clusterDnsEndPointDiscovery
, gossipSeedClusterSettings
, simpleDnsEndPointDiscovery
, dnsClusterSettings
, gossipSeed
, gossipSeedWithHeader
, gossipSeedHeader
, gossipSeedHost
, gossipSeedPort
) where
import Data.Maybe
import ClassyPrelude
import Data.Aeson
import Data.Aeson.Types
import Data.Array.IO
import Data.DotNet.TimeSpan
import Data.List.NonEmpty (NonEmpty)
import Data.UUID
import Network.HTTP.Client
import Network.DNS hiding (decode)
import System.Random
data DnsDiscoveryException
= MaxDiscoveryAttemptReached ByteString
| DNSDiscoveryError DNSError
deriving (Show, Typeable)
instance Exception DnsDiscoveryException
data EndPoint =
EndPoint
{ endPointIp :: !String
, endPointPort :: !Int
} deriving Show
emptyEndPoint :: EndPoint
emptyEndPoint = EndPoint "" 0
httpRequest :: EndPoint -> String -> IO Request
httpRequest (EndPoint ip p) path = parseUrlThrow url
where
url = "http://" ++ ip ++ ":" ++ show p ++ path
data GossipSeed =
GossipSeed
{ gossipEndpoint :: !EndPoint
, gossipSeedHeader :: !String
} deriving Show
gossipSeed :: String -> Int -> GossipSeed
gossipSeed h p = GossipSeed (EndPoint h p) ""
gossipSeedWithHeader :: String -> Int -> String -> GossipSeed
gossipSeedWithHeader h p hd = GossipSeed (EndPoint h p) hd
gossipSeedHost :: GossipSeed -> String
gossipSeedHost = endPointIp . gossipEndpoint
gossipSeedPort :: GossipSeed -> Int
gossipSeedPort = endPointPort . gossipEndpoint
emptyGossipSeed :: GossipSeed
emptyGossipSeed = GossipSeed emptyEndPoint ""
newtype Discovery =
Discovery { runDiscovery :: Maybe EndPoint -> IO (Maybe EndPoint) }
staticEndPointDiscovery :: String -> Int -> Discovery
staticEndPointDiscovery host port =
Discovery $ \_ -> return $ Just $ EndPoint host port
simpleDnsEndPointDiscovery :: ByteString -> Maybe DnsServer -> Int -> Discovery
simpleDnsEndPointDiscovery domain srv port = Discovery $ \_ -> do
let conf =
case srv of
Nothing -> defaultResolvConf
Just tpe ->
let rc =
case tpe of
DnsFilePath p -> RCFilePath p
DnsHostName h -> RCHostName h
DnsHostPort h p -> RCHostPort h (fromIntegral p)
in defaultResolvConf { resolvInfo = rc }
dnsSeed <- makeResolvSeed conf
res <- withResolver dnsSeed $ \resv -> lookupA resv domain
case res of
Left e -> throwIO $ DNSDiscoveryError e
Right ips -> do
let pts = [ EndPoint (show ip) port | ip <- ips ]
case pts of
[] -> return Nothing
pt:_ -> return $ Just pt
data DnsServer
= DnsFilePath String
| DnsHostName String
| DnsHostPort String Int
data ClusterSettings =
ClusterSettings
{ clusterDns :: !ByteString
, clusterMaxDiscoverAttempts :: !Int
, clusterExternalGossipPort :: !Int
, clusterGossipSeeds :: (Maybe (NonEmpty GossipSeed))
, clusterGossipTimeout :: !TimeSpan
, clusterDnsServer :: !(Maybe DnsServer)
}
gossipSeedClusterSettings :: NonEmpty GossipSeed -> ClusterSettings
gossipSeedClusterSettings xs =
ClusterSettings
{ clusterDns = ""
, clusterMaxDiscoverAttempts = 10
, clusterExternalGossipPort = 0
, clusterGossipSeeds = Just xs
, clusterGossipTimeout = fromSeconds 1
, clusterDnsServer = Nothing
}
dnsClusterSettings :: ByteString -> ClusterSettings
dnsClusterSettings clusterDns = ClusterSettings{..}
where
clusterMaxDiscoverAttempts = 10
clusterExternalGossipPort = 0
clusterGossipSeeds = Nothing
clusterGossipTimeout = fromSeconds 1
clusterDnsServer = Nothing
clusterDnsEndPointDiscovery :: ClusterSettings -> IO Discovery
clusterDnsEndPointDiscovery settings = do
ref <- newIORef Nothing
manager <- newManager defaultManagerSettings
return $ Discovery $ \fend -> discoverEndPoint manager ref fend settings
data VNodeState
= Initializing
| Unknown
| PreReplica
| CatchingUp
| Clone
| Slave
| PreMaster
| Master
| Manager
| ShuttingDown
| Shutdown
deriving (Eq, Ord, Generic, Show)
instance FromJSON VNodeState
newtype GUUID = GUUID UUID deriving Show
instance FromJSON GUUID where
parseJSON (String txt) =
case fromText txt of
Just uuid -> return $ GUUID uuid
_ -> fail $ "Wrong UUID format " ++ show txt
parseJSON invalid = typeMismatch "UUID" invalid
data MemberInfo =
MemberInfo
{ _instanceId :: !GUUID
, _state :: !VNodeState
, _isAlive :: !Bool
, _internalTcpIp :: !String
, _internalTcpPort :: !Int
, _externalTcpIp :: !String
, _externalTcpPort :: !Int
, _internalHttpIp :: !String
, _internalHttpPort :: !Int
, _externalHttpIp :: !String
, _externalHttpPort :: !Int
, _lastCommitPosition :: !Int64
, _writerCheckpoint :: !Int64
, _chaserCheckpoint :: !Int64
, _epochPosition :: !Int64
, _epochNumber :: !Int
, _epochId :: !GUUID
, _nodePriority :: !Int
} deriving Show
instance FromJSON MemberInfo where
parseJSON (Object m) =
MemberInfo
<$> m .: "instanceId"
<*> m .: "state"
<*> m .: "isAlive"
<*> m .: "internalTcpIp"
<*> m .: "internalTcpPort"
<*> m .: "externalTcpIp"
<*> m .: "externalTcpPort"
<*> m .: "internalHttpIp"
<*> m .: "internalHttpPort"
<*> m .: "externalHttpIp"
<*> m .: "externalHttpPort"
<*> m .: "lastCommitPosition"
<*> m .: "writerCheckpoint"
<*> m .: "chaserCheckpoint"
<*> m .: "epochPosition"
<*> m .: "epochNumber"
<*> m .: "epochId"
<*> m .: "nodePriority"
parseJSON invalid = typeMismatch "MemberInfo" invalid
data ClusterInfo =
ClusterInfo { members :: [MemberInfo] }
deriving (Show, Generic)
instance FromJSON ClusterInfo
discoverEndPoint :: Manager
-> IORef (Maybe [MemberInfo])
-> Maybe EndPoint
-> ClusterSettings
-> IO (Maybe EndPoint)
discoverEndPoint mgr ref fend settings = do
old_m <- readIORef ref
writeIORef ref Nothing
candidates <- case old_m of
Nothing -> gossipCandidatesFromDns settings
Just old -> gossipCandidatesFromOldGossip fend old
forArrayFirst candidates $ \i -> do
c <- readArray candidates i
res <- tryGetGossipFrom settings mgr c
print (i, res)
let fin_end = do
info <- res
best <- tryDetermineBestNode $ members info
return (info, best)
case fin_end of
Nothing -> return Nothing
Just (info, best) -> do
writeIORef ref (Just $ members info)
return $ Just best
tryGetGossipFrom :: ClusterSettings
-> Manager
-> GossipSeed
-> IO (Maybe ClusterInfo)
tryGetGossipFrom ClusterSettings{..} mgr seed = do
init_req <- httpRequest (gossipEndpoint seed) "/gossip?format=json"
let timeout = truncate (totalMillis clusterGossipTimeout * 1000)
req = init_req { responseTimeout = responseTimeoutMicro timeout }
resp <- httpLbs req mgr
return $ decode $ responseBody resp
tryDetermineBestNode :: [MemberInfo] -> Maybe EndPoint
tryDetermineBestNode members = node_m
where
nodes = [m | m <- members
, _isAlive m
, allowedState $ _state m
]
node_m =
case sortOn (Down . _state) nodes of
[] -> Nothing
n:_ -> Just $ EndPoint (_externalTcpIp n) (_externalTcpPort n)
allowedState Manager = False
allowedState ShuttingDown = False
allowedState Shutdown = False
allowedState _ = True
gossipCandidatesFromOldGossip :: Maybe EndPoint
-> [MemberInfo]
-> IO (IOArray Int GossipSeed)
gossipCandidatesFromOldGossip fend_m oldGossip =
arrangeGossipCandidates candidates
where
candidates =
case fend_m of
Nothing -> oldGossip
Just fend -> [ c | c <- oldGossip
, _externalTcpPort c == endPointPort fend
, _externalTcpIp c == endPointIp fend
]
data AState = AState !Int !Int
arrangeGossipCandidates :: [MemberInfo] -> IO (IOArray Int GossipSeed)
arrangeGossipCandidates members = do
arr <- newArray (0, len) emptyGossipSeed
AState i j <- foldM (go arr) (AState (1) len) members
shuffle arr 0 i
shuffle arr j (len 1)
return arr
where
len = length members
go :: IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState
go arr (AState i j) m =
case _state m of
Manager -> do
let new_j = j 1
writeArray arr new_j seed
return (AState i new_j)
_ -> do
let new_i = i + 1
writeArray arr new_i seed
return (AState new_i j)
where
end = EndPoint (_externalHttpIp m) (_externalHttpPort m)
seed = GossipSeed end ""
gossipCandidatesFromDns :: ClusterSettings -> IO (IOArray Int GossipSeed)
gossipCandidatesFromDns settings@ClusterSettings{..} = do
arr <- endpoints
shuffleAll arr
return arr
where
endpoints =
case clusterGossipSeeds of
Nothing -> resolveDns settings
Just ss -> let ls = toList ss
len = length ls
in newListArray (0, len 1) ls
resolveDns :: ClusterSettings -> IO (IOArray Int GossipSeed)
resolveDns ClusterSettings{..} = do
let timeoutMicros = totalMillis clusterGossipTimeout * 1000
conf =
case clusterDnsServer of
Nothing -> defaultResolvConf
Just tpe ->
let rc =
case tpe of
DnsFilePath p -> RCFilePath p
DnsHostName h -> RCHostName h
DnsHostPort h p -> RCHostPort h (fromIntegral p)
in defaultResolvConf { resolvInfo = rc }
dnsSeed <- makeResolvSeed conf
{ resolvTimeout = truncate timeoutMicros
, resolvRetry = clusterMaxDiscoverAttempts
}
withResolver dnsSeed $ \resv -> do
result <- lookupA resv clusterDns
case result of
Left e -> throwIO $ DNSDiscoveryError e
Right ips -> do
let len = length ips 1
arr <- newArray_ (0, len)
forM_ (zip [0..] ips) $ \(idx, ip) -> do
let end = EndPoint (show ip) clusterExternalGossipPort
seed = GossipSeed end ""
writeArray arr idx seed
return arr
shuffleAll :: IOArray Int a -> IO ()
shuffleAll arr = do
(low, hig) <- getBounds arr
shuffle arr low hig
shuffle :: IOArray Int a -> Int -> Int -> IO ()
shuffle arr from to = forRange_ from to $ \cur -> do
idx <- randomRIO (cur, to)
tmp <- readArray arr idx
value <- readArray arr cur
writeArray arr idx value
writeArray arr cur tmp
forRange_ :: Int -> Int -> (Int -> IO ()) -> IO ()
forRange_ from to k = do
when (from <= to) $ loop (to + 1) from
where
loop len cur
| len == cur = return ()
| otherwise = do
k cur
loop len (cur + 1)
forArrayFirst :: IOArray Int a
-> (Int -> IO (Maybe b))
-> IO (Maybe b)
forArrayFirst arr k = do
(low, hig) <- getBounds arr
forRangeFirst low hig k
forRangeFirst :: Int
-> Int
-> (Int -> IO (Maybe b))
-> IO (Maybe b)
forRangeFirst from to k = do
if from <= to then loop (to + 1) from else return Nothing
where
loop len cur
| len == cur = return Nothing
| otherwise = do
res <- k cur
if isJust res then return res else loop len (cur + 1)