{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Database.EventStore.Internal.Discovery
( Discovery(..)
, GossipSeed
, DnsDiscoveryException(..)
, ClusterSettings(..)
, DnsServer(..)
, EndPoint(..)
, staticEndPointDiscovery
, clusterDnsEndPointDiscovery
, gossipSeedClusterSettings
, simpleDnsEndPointDiscovery
, dnsClusterSettings
, gossipSeed
, gossipSeedWithHeader
, gossipSeedHeader
, gossipSeedHost
, gossipSeedPort
) where
import Prelude (String, fail)
import Data.Maybe
import Control.Exception.Safe (tryAny)
import Data.Aeson
import Data.Aeson.Types
import Data.Array.IO
import Data.DotNet.TimeSpan
import Data.List.NonEmpty (NonEmpty)
import Data.UUID
import Network.HTTP.Client
import Network.DNS hiding (decode)
import System.Random
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
data DnsDiscoveryException
= MaxDiscoveryAttemptReached ByteString
| DNSDiscoveryError DNSError
deriving (Int -> DnsDiscoveryException -> ShowS
[DnsDiscoveryException] -> ShowS
DnsDiscoveryException -> String
(Int -> DnsDiscoveryException -> ShowS)
-> (DnsDiscoveryException -> String)
-> ([DnsDiscoveryException] -> ShowS)
-> Show DnsDiscoveryException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [DnsDiscoveryException] -> ShowS
$cshowList :: [DnsDiscoveryException] -> ShowS
show :: DnsDiscoveryException -> String
$cshow :: DnsDiscoveryException -> String
showsPrec :: Int -> DnsDiscoveryException -> ShowS
$cshowsPrec :: Int -> DnsDiscoveryException -> ShowS
Show, Typeable)
instance Exception DnsDiscoveryException
httpRequest :: EndPoint -> String -> IO Request
httpRequest :: EndPoint -> String -> IO Request
httpRequest (EndPoint String
ip Int
p) String
path = String -> IO Request
forall (m :: * -> *). MonadThrow m => String -> m Request
parseUrlThrow String
url
where
url :: String
url = String
"http://" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
ip String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
":" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
p String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
path
data GossipSeed =
GossipSeed
{ GossipSeed -> EndPoint
gossipEndpoint :: !EndPoint
, :: !String
} deriving Int -> GossipSeed -> ShowS
[GossipSeed] -> ShowS
GossipSeed -> String
(Int -> GossipSeed -> ShowS)
-> (GossipSeed -> String)
-> ([GossipSeed] -> ShowS)
-> Show GossipSeed
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GossipSeed] -> ShowS
$cshowList :: [GossipSeed] -> ShowS
show :: GossipSeed -> String
$cshow :: GossipSeed -> String
showsPrec :: Int -> GossipSeed -> ShowS
$cshowsPrec :: Int -> GossipSeed -> ShowS
Show
gossipSeed :: String -> Int -> GossipSeed
gossipSeed :: String -> Int -> GossipSeed
gossipSeed String
h Int
p = EndPoint -> String -> GossipSeed
GossipSeed (String -> Int -> EndPoint
EndPoint String
h Int
p) String
""
gossipSeedWithHeader :: String -> Int -> String -> GossipSeed
String
h Int
p String
hd = EndPoint -> String -> GossipSeed
GossipSeed (String -> Int -> EndPoint
EndPoint String
h Int
p) String
hd
gossipSeedHost :: GossipSeed -> String
gossipSeedHost :: GossipSeed -> String
gossipSeedHost = EndPoint -> String
endPointIp (EndPoint -> String)
-> (GossipSeed -> EndPoint) -> GossipSeed -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GossipSeed -> EndPoint
gossipEndpoint
gossipSeedPort :: GossipSeed -> Int
gossipSeedPort :: GossipSeed -> Int
gossipSeedPort = EndPoint -> Int
endPointPort (EndPoint -> Int) -> (GossipSeed -> EndPoint) -> GossipSeed -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GossipSeed -> EndPoint
gossipEndpoint
emptyGossipSeed :: GossipSeed
emptyGossipSeed :: GossipSeed
emptyGossipSeed = EndPoint -> String -> GossipSeed
GossipSeed EndPoint
emptyEndPoint String
""
newtype Discovery =
Discovery { Discovery -> Maybe EndPoint -> EventStore (Maybe EndPoint)
runDiscovery :: Maybe EndPoint -> EventStore (Maybe EndPoint) }
staticEndPointDiscovery :: String -> Int -> Discovery
staticEndPointDiscovery :: String -> Int -> Discovery
staticEndPointDiscovery String
host Int
port =
(Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
Discovery ((Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery)
-> (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
forall a b. (a -> b) -> a -> b
$ \Maybe EndPoint
_ -> Maybe EndPoint -> EventStore (Maybe EndPoint)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe EndPoint -> EventStore (Maybe EndPoint))
-> Maybe EndPoint -> EventStore (Maybe EndPoint)
forall a b. (a -> b) -> a -> b
$ EndPoint -> Maybe EndPoint
forall a. a -> Maybe a
Just (EndPoint -> Maybe EndPoint) -> EndPoint -> Maybe EndPoint
forall a b. (a -> b) -> a -> b
$ String -> Int -> EndPoint
EndPoint String
host Int
port
simpleDnsEndPointDiscovery :: ByteString -> Maybe DnsServer -> Int -> Discovery
simpleDnsEndPointDiscovery :: ByteString -> Maybe DnsServer -> Int -> Discovery
simpleDnsEndPointDiscovery ByteString
domain Maybe DnsServer
srv Int
port = (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
Discovery ((Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery)
-> (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
forall a b. (a -> b) -> a -> b
$ \Maybe EndPoint
_ -> do
let conf :: ResolvConf
conf =
case Maybe DnsServer
srv of
Maybe DnsServer
Nothing -> ResolvConf
defaultResolvConf
Just DnsServer
tpe ->
let rc :: FileOrNumericHost
rc =
case DnsServer
tpe of
DnsFilePath String
p -> String -> FileOrNumericHost
RCFilePath String
p
DnsHostName String
h -> String -> FileOrNumericHost
RCHostName String
h
DnsHostPort String
h Int
p -> String -> PortNumber -> FileOrNumericHost
RCHostPort String
h (Int -> PortNumber
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
p)
in ResolvConf
defaultResolvConf { resolvInfo :: FileOrNumericHost
resolvInfo = FileOrNumericHost
rc }
ResolvSeed
dnsSeed <- IO ResolvSeed -> EventStore ResolvSeed
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ResolvSeed -> EventStore ResolvSeed)
-> IO ResolvSeed -> EventStore ResolvSeed
forall a b. (a -> b) -> a -> b
$ ResolvConf -> IO ResolvSeed
makeResolvSeed ResolvConf
conf
Either DNSError [IPv4]
res <- IO (Either DNSError [IPv4]) -> EventStore (Either DNSError [IPv4])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either DNSError [IPv4])
-> EventStore (Either DNSError [IPv4]))
-> IO (Either DNSError [IPv4])
-> EventStore (Either DNSError [IPv4])
forall a b. (a -> b) -> a -> b
$ ResolvSeed
-> (Resolver -> IO (Either DNSError [IPv4]))
-> IO (Either DNSError [IPv4])
forall a. ResolvSeed -> (Resolver -> IO a) -> IO a
withResolver ResolvSeed
dnsSeed ((Resolver -> IO (Either DNSError [IPv4]))
-> IO (Either DNSError [IPv4]))
-> (Resolver -> IO (Either DNSError [IPv4]))
-> IO (Either DNSError [IPv4])
forall a b. (a -> b) -> a -> b
$ \Resolver
resv -> Resolver -> ByteString -> IO (Either DNSError [IPv4])
lookupA Resolver
resv ByteString
domain
case Either DNSError [IPv4]
res of
Left DNSError
e -> DnsDiscoveryException -> EventStore (Maybe EndPoint)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (DnsDiscoveryException -> EventStore (Maybe EndPoint))
-> DnsDiscoveryException -> EventStore (Maybe EndPoint)
forall a b. (a -> b) -> a -> b
$ DNSError -> DnsDiscoveryException
DNSDiscoveryError DNSError
e
Right [IPv4]
ips -> do
let pts :: [EndPoint]
pts = [ String -> Int -> EndPoint
EndPoint (IPv4 -> String
forall a. Show a => a -> String
show IPv4
ip) Int
port | IPv4
ip <- [IPv4]
ips ]
case [EndPoint]
pts of
[] -> Maybe EndPoint -> EventStore (Maybe EndPoint)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe EndPoint
forall a. Maybe a
Nothing
EndPoint
pt:[EndPoint]
_ -> Maybe EndPoint -> EventStore (Maybe EndPoint)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe EndPoint -> EventStore (Maybe EndPoint))
-> Maybe EndPoint -> EventStore (Maybe EndPoint)
forall a b. (a -> b) -> a -> b
$ EndPoint -> Maybe EndPoint
forall a. a -> Maybe a
Just EndPoint
pt
data DnsServer
= DnsFilePath String
| DnsHostName String
| DnsHostPort String Int
data ClusterSettings =
ClusterSettings
{ ClusterSettings -> ByteString
clusterDns :: !ByteString
, ClusterSettings -> Int
clusterMaxDiscoverAttempts :: !Int
, ClusterSettings -> Int
clusterExternalGossipPort :: !Int
, ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterGossipSeeds :: (Maybe (NonEmpty GossipSeed))
, ClusterSettings -> TimeSpan
clusterGossipTimeout :: !TimeSpan
, ClusterSettings -> Maybe DnsServer
clusterDnsServer :: !(Maybe DnsServer)
}
gossipSeedClusterSettings :: NonEmpty GossipSeed -> ClusterSettings
gossipSeedClusterSettings :: NonEmpty GossipSeed -> ClusterSettings
gossipSeedClusterSettings NonEmpty GossipSeed
xs =
ClusterSettings :: ByteString
-> Int
-> Int
-> Maybe (NonEmpty GossipSeed)
-> TimeSpan
-> Maybe DnsServer
-> ClusterSettings
ClusterSettings
{ clusterDns :: ByteString
clusterDns = ByteString
""
, clusterMaxDiscoverAttempts :: Int
clusterMaxDiscoverAttempts = Int
10
, clusterExternalGossipPort :: Int
clusterExternalGossipPort = Int
0
, clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterGossipSeeds = NonEmpty GossipSeed -> Maybe (NonEmpty GossipSeed)
forall a. a -> Maybe a
Just NonEmpty GossipSeed
xs
, clusterGossipTimeout :: TimeSpan
clusterGossipTimeout = Double -> TimeSpan
fromSeconds Double
1
, clusterDnsServer :: Maybe DnsServer
clusterDnsServer = Maybe DnsServer
forall a. Maybe a
Nothing
}
dnsClusterSettings :: ByteString -> ClusterSettings
dnsClusterSettings :: ByteString -> ClusterSettings
dnsClusterSettings ByteString
clusterDns = ClusterSettings :: ByteString
-> Int
-> Int
-> Maybe (NonEmpty GossipSeed)
-> TimeSpan
-> Maybe DnsServer
-> ClusterSettings
ClusterSettings{Int
Maybe (NonEmpty GossipSeed)
Maybe DnsServer
ByteString
TimeSpan
forall a. Maybe a
clusterDnsServer :: forall a. Maybe a
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: forall a. Maybe a
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
..}
where
clusterMaxDiscoverAttempts :: Int
clusterMaxDiscoverAttempts = Int
10
clusterExternalGossipPort :: Int
clusterExternalGossipPort = Int
0
clusterGossipSeeds :: Maybe a
clusterGossipSeeds = Maybe a
forall a. Maybe a
Nothing
clusterGossipTimeout :: TimeSpan
clusterGossipTimeout = Double -> TimeSpan
fromSeconds Double
1
clusterDnsServer :: Maybe a
clusterDnsServer = Maybe a
forall a. Maybe a
Nothing
clusterDnsEndPointDiscovery :: ClusterSettings -> IO Discovery
clusterDnsEndPointDiscovery :: ClusterSettings -> IO Discovery
clusterDnsEndPointDiscovery ClusterSettings
settings = do
IORef (Maybe [MemberInfo])
ref <- Maybe [MemberInfo] -> IO (IORef (Maybe [MemberInfo]))
forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef Maybe [MemberInfo]
forall a. Maybe a
Nothing
Manager
manager <- ManagerSettings -> IO Manager
newManager ManagerSettings
defaultManagerSettings
Discovery -> IO Discovery
forall (m :: * -> *) a. Monad m => a -> m a
return (Discovery -> IO Discovery) -> Discovery -> IO Discovery
forall a b. (a -> b) -> a -> b
$ (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
Discovery ((Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery)
-> (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
forall a b. (a -> b) -> a -> b
$ \Maybe EndPoint
fend -> Manager
-> IORef (Maybe [MemberInfo])
-> Maybe EndPoint
-> ClusterSettings
-> EventStore (Maybe EndPoint)
discoverEndPoint Manager
manager IORef (Maybe [MemberInfo])
ref Maybe EndPoint
fend ClusterSettings
settings
data VNodeState
= Initializing
| Unknown
| PreReplica
| CatchingUp
| Clone
| Slave
| PreMaster
| Master
| Manager
| ShuttingDown
| Shutdown
deriving (VNodeState -> VNodeState -> Bool
(VNodeState -> VNodeState -> Bool)
-> (VNodeState -> VNodeState -> Bool) -> Eq VNodeState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: VNodeState -> VNodeState -> Bool
$c/= :: VNodeState -> VNodeState -> Bool
== :: VNodeState -> VNodeState -> Bool
$c== :: VNodeState -> VNodeState -> Bool
Eq, Eq VNodeState
Eq VNodeState
-> (VNodeState -> VNodeState -> Ordering)
-> (VNodeState -> VNodeState -> Bool)
-> (VNodeState -> VNodeState -> Bool)
-> (VNodeState -> VNodeState -> Bool)
-> (VNodeState -> VNodeState -> Bool)
-> (VNodeState -> VNodeState -> VNodeState)
-> (VNodeState -> VNodeState -> VNodeState)
-> Ord VNodeState
VNodeState -> VNodeState -> Bool
VNodeState -> VNodeState -> Ordering
VNodeState -> VNodeState -> VNodeState
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: VNodeState -> VNodeState -> VNodeState
$cmin :: VNodeState -> VNodeState -> VNodeState
max :: VNodeState -> VNodeState -> VNodeState
$cmax :: VNodeState -> VNodeState -> VNodeState
>= :: VNodeState -> VNodeState -> Bool
$c>= :: VNodeState -> VNodeState -> Bool
> :: VNodeState -> VNodeState -> Bool
$c> :: VNodeState -> VNodeState -> Bool
<= :: VNodeState -> VNodeState -> Bool
$c<= :: VNodeState -> VNodeState -> Bool
< :: VNodeState -> VNodeState -> Bool
$c< :: VNodeState -> VNodeState -> Bool
compare :: VNodeState -> VNodeState -> Ordering
$ccompare :: VNodeState -> VNodeState -> Ordering
$cp1Ord :: Eq VNodeState
Ord, (forall x. VNodeState -> Rep VNodeState x)
-> (forall x. Rep VNodeState x -> VNodeState) -> Generic VNodeState
forall x. Rep VNodeState x -> VNodeState
forall x. VNodeState -> Rep VNodeState x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep VNodeState x -> VNodeState
$cfrom :: forall x. VNodeState -> Rep VNodeState x
Generic, Int -> VNodeState -> ShowS
[VNodeState] -> ShowS
VNodeState -> String
(Int -> VNodeState -> ShowS)
-> (VNodeState -> String)
-> ([VNodeState] -> ShowS)
-> Show VNodeState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [VNodeState] -> ShowS
$cshowList :: [VNodeState] -> ShowS
show :: VNodeState -> String
$cshow :: VNodeState -> String
showsPrec :: Int -> VNodeState -> ShowS
$cshowsPrec :: Int -> VNodeState -> ShowS
Show)
instance FromJSON VNodeState
newtype GUUID = GUUID UUID deriving Int -> GUUID -> ShowS
[GUUID] -> ShowS
GUUID -> String
(Int -> GUUID -> ShowS)
-> (GUUID -> String) -> ([GUUID] -> ShowS) -> Show GUUID
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GUUID] -> ShowS
$cshowList :: [GUUID] -> ShowS
show :: GUUID -> String
$cshow :: GUUID -> String
showsPrec :: Int -> GUUID -> ShowS
$cshowsPrec :: Int -> GUUID -> ShowS
Show
instance FromJSON GUUID where
parseJSON :: Value -> Parser GUUID
parseJSON (String Text
txt) =
case Text -> Maybe UUID
fromText Text
txt of
Just UUID
uuid -> GUUID -> Parser GUUID
forall (m :: * -> *) a. Monad m => a -> m a
return (GUUID -> Parser GUUID) -> GUUID -> Parser GUUID
forall a b. (a -> b) -> a -> b
$ UUID -> GUUID
GUUID UUID
uuid
Maybe UUID
_ -> String -> Parser GUUID
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Parser GUUID) -> String -> Parser GUUID
forall a b. (a -> b) -> a -> b
$ String
"Wrong UUID format " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
forall a. Show a => a -> String
show Text
txt
parseJSON Value
invalid = String -> Value -> Parser GUUID
forall a. String -> Value -> Parser a
typeMismatch String
"UUID" Value
invalid
data MemberInfo =
MemberInfo
{ MemberInfo -> GUUID
_instanceId :: !GUUID
, MemberInfo -> VNodeState
_state :: !VNodeState
, MemberInfo -> Bool
_isAlive :: !Bool
, MemberInfo -> String
_internalTcpIp :: !String
, MemberInfo -> Int
_internalTcpPort :: !Int
, MemberInfo -> String
_externalTcpIp :: !String
, MemberInfo -> Int
_externalTcpPort :: !Int
, MemberInfo -> String
_internalHttpIp :: !String
, MemberInfo -> Int
_internalHttpPort :: !Int
, MemberInfo -> String
_externalHttpIp :: !String
, MemberInfo -> Int
_externalHttpPort :: !Int
, MemberInfo -> Int64
_lastCommitPosition :: !Int64
, MemberInfo -> Int64
_writerCheckpoint :: !Int64
, MemberInfo -> Int64
_chaserCheckpoint :: !Int64
, MemberInfo -> Int64
_epochPosition :: !Int64
, MemberInfo -> Int
_epochNumber :: !Int
, MemberInfo -> GUUID
_epochId :: !GUUID
, MemberInfo -> Int
_nodePriority :: !Int
} deriving Int -> MemberInfo -> ShowS
[MemberInfo] -> ShowS
MemberInfo -> String
(Int -> MemberInfo -> ShowS)
-> (MemberInfo -> String)
-> ([MemberInfo] -> ShowS)
-> Show MemberInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [MemberInfo] -> ShowS
$cshowList :: [MemberInfo] -> ShowS
show :: MemberInfo -> String
$cshow :: MemberInfo -> String
showsPrec :: Int -> MemberInfo -> ShowS
$cshowsPrec :: Int -> MemberInfo -> ShowS
Show
instance FromJSON MemberInfo where
parseJSON :: Value -> Parser MemberInfo
parseJSON (Object Object
m) =
GUUID
-> VNodeState
-> Bool
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo
MemberInfo
(GUUID
-> VNodeState
-> Bool
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
-> Parser GUUID
-> Parser
(VNodeState
-> Bool
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
m Object -> Key -> Parser GUUID
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"instanceId"
Parser
(VNodeState
-> Bool
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
-> Parser VNodeState
-> Parser
(Bool
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser VNodeState
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"state"
Parser
(Bool
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
-> Parser Bool
-> Parser
(String
-> Int
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Bool
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"isAlive"
Parser
(String
-> Int
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
-> Parser String
-> Parser
(Int
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser String
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalTcpIp"
Parser
(Int
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
-> Parser Int
-> Parser
(String
-> Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalTcpPort"
Parser
(String
-> Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
-> Parser String
-> Parser
(Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser String
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalTcpIp"
Parser
(Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
-> Parser Int
-> Parser
(String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalTcpPort"
Parser
(String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
-> Parser String
-> Parser
(Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser String
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalHttpIp"
Parser
(Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
-> Parser Int
-> Parser
(String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalHttpPort"
Parser
(String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
-> Parser String
-> Parser
(Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser String
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalHttpIp"
Parser
(Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo)
-> Parser Int
-> Parser
(Int64
-> Int64 -> Int64 -> Int64 -> Int -> GUUID -> Int -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalHttpPort"
Parser
(Int64
-> Int64 -> Int64 -> Int64 -> Int -> GUUID -> Int -> MemberInfo)
-> Parser Int64
-> Parser
(Int64 -> Int64 -> Int64 -> Int -> GUUID -> Int -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int64
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"lastCommitPosition"
Parser
(Int64 -> Int64 -> Int64 -> Int -> GUUID -> Int -> MemberInfo)
-> Parser Int64
-> Parser (Int64 -> Int64 -> Int -> GUUID -> Int -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int64
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"writerCheckpoint"
Parser (Int64 -> Int64 -> Int -> GUUID -> Int -> MemberInfo)
-> Parser Int64
-> Parser (Int64 -> Int -> GUUID -> Int -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int64
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"chaserCheckpoint"
Parser (Int64 -> Int -> GUUID -> Int -> MemberInfo)
-> Parser Int64 -> Parser (Int -> GUUID -> Int -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int64
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"epochPosition"
Parser (Int -> GUUID -> Int -> MemberInfo)
-> Parser Int -> Parser (GUUID -> Int -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"epochNumber"
Parser (GUUID -> Int -> MemberInfo)
-> Parser GUUID -> Parser (Int -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser GUUID
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"epochId"
Parser (Int -> MemberInfo) -> Parser Int -> Parser MemberInfo
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"nodePriority"
parseJSON Value
invalid = String -> Value -> Parser MemberInfo
forall a. String -> Value -> Parser a
typeMismatch String
"MemberInfo" Value
invalid
data ClusterInfo =
ClusterInfo { ClusterInfo -> [MemberInfo]
members :: [MemberInfo] }
deriving (Int -> ClusterInfo -> ShowS
[ClusterInfo] -> ShowS
ClusterInfo -> String
(Int -> ClusterInfo -> ShowS)
-> (ClusterInfo -> String)
-> ([ClusterInfo] -> ShowS)
-> Show ClusterInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ClusterInfo] -> ShowS
$cshowList :: [ClusterInfo] -> ShowS
show :: ClusterInfo -> String
$cshow :: ClusterInfo -> String
showsPrec :: Int -> ClusterInfo -> ShowS
$cshowsPrec :: Int -> ClusterInfo -> ShowS
Show, (forall x. ClusterInfo -> Rep ClusterInfo x)
-> (forall x. Rep ClusterInfo x -> ClusterInfo)
-> Generic ClusterInfo
forall x. Rep ClusterInfo x -> ClusterInfo
forall x. ClusterInfo -> Rep ClusterInfo x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ClusterInfo x -> ClusterInfo
$cfrom :: forall x. ClusterInfo -> Rep ClusterInfo x
Generic)
instance FromJSON ClusterInfo
discoverEndPoint :: Manager
-> IORef (Maybe [MemberInfo])
-> Maybe EndPoint
-> ClusterSettings
-> EventStore (Maybe EndPoint)
discoverEndPoint :: Manager
-> IORef (Maybe [MemberInfo])
-> Maybe EndPoint
-> ClusterSettings
-> EventStore (Maybe EndPoint)
discoverEndPoint Manager
mgr IORef (Maybe [MemberInfo])
ref Maybe EndPoint
fend ClusterSettings
settings = do
Maybe [MemberInfo]
old_m <- IORef (Maybe [MemberInfo]) -> EventStore (Maybe [MemberInfo])
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef (Maybe [MemberInfo])
ref
IORef (Maybe [MemberInfo]) -> Maybe [MemberInfo] -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef IORef (Maybe [MemberInfo])
ref Maybe [MemberInfo]
forall a. Maybe a
Nothing
IOArray Int GossipSeed
candidates <- case Maybe [MemberInfo]
old_m of
Maybe [MemberInfo]
Nothing -> ClusterSettings -> EventStore (IOArray Int GossipSeed)
gossipCandidatesFromDns ClusterSettings
settings
Just [MemberInfo]
old -> IO (IOArray Int GossipSeed) -> EventStore (IOArray Int GossipSeed)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IOArray Int GossipSeed)
-> EventStore (IOArray Int GossipSeed))
-> IO (IOArray Int GossipSeed)
-> EventStore (IOArray Int GossipSeed)
forall a b. (a -> b) -> a -> b
$ Maybe EndPoint -> [MemberInfo] -> IO (IOArray Int GossipSeed)
gossipCandidatesFromOldGossip Maybe EndPoint
fend [MemberInfo]
old
IOArray Int GossipSeed
-> (Int -> EventStore (Maybe EndPoint))
-> EventStore (Maybe EndPoint)
forall a b.
IOArray Int a
-> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forArrayFirst IOArray Int GossipSeed
candidates ((Int -> EventStore (Maybe EndPoint))
-> EventStore (Maybe EndPoint))
-> (Int -> EventStore (Maybe EndPoint))
-> EventStore (Maybe EndPoint)
forall a b. (a -> b) -> a -> b
$ \Int
idx -> do
GossipSeed
c <- IO GossipSeed -> EventStore GossipSeed
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO GossipSeed -> EventStore GossipSeed)
-> IO GossipSeed -> EventStore GossipSeed
forall a b. (a -> b) -> a -> b
$ IOArray Int GossipSeed -> Int -> IO GossipSeed
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> m e
readArray IOArray Int GossipSeed
candidates Int
idx
Maybe ClusterInfo
res <- ClusterSettings
-> Manager -> GossipSeed -> EventStore (Maybe ClusterInfo)
tryGetGossipFrom ClusterSettings
settings Manager
mgr GossipSeed
c
let fin_end :: Maybe (ClusterInfo, EndPoint)
fin_end = do
ClusterInfo
info <- Maybe ClusterInfo
res
EndPoint
best <- [MemberInfo] -> Maybe EndPoint
tryDetermineBestNode ([MemberInfo] -> Maybe EndPoint) -> [MemberInfo] -> Maybe EndPoint
forall a b. (a -> b) -> a -> b
$ ClusterInfo -> [MemberInfo]
members ClusterInfo
info
(ClusterInfo, EndPoint) -> Maybe (ClusterInfo, EndPoint)
forall (m :: * -> *) a. Monad m => a -> m a
return (ClusterInfo
info, EndPoint
best)
case Maybe (ClusterInfo, EndPoint)
fin_end of
Maybe (ClusterInfo, EndPoint)
Nothing -> Maybe EndPoint -> EventStore (Maybe EndPoint)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe EndPoint
forall a. Maybe a
Nothing
Just (ClusterInfo
info, EndPoint
best) -> do
IORef (Maybe [MemberInfo]) -> Maybe [MemberInfo] -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef IORef (Maybe [MemberInfo])
ref ([MemberInfo] -> Maybe [MemberInfo]
forall a. a -> Maybe a
Just ([MemberInfo] -> Maybe [MemberInfo])
-> [MemberInfo] -> Maybe [MemberInfo]
forall a b. (a -> b) -> a -> b
$ ClusterInfo -> [MemberInfo]
members ClusterInfo
info)
Maybe EndPoint -> EventStore (Maybe EndPoint)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe EndPoint -> EventStore (Maybe EndPoint))
-> Maybe EndPoint -> EventStore (Maybe EndPoint)
forall a b. (a -> b) -> a -> b
$ EndPoint -> Maybe EndPoint
forall a. a -> Maybe a
Just EndPoint
best
tryGetGossipFrom :: ClusterSettings
-> Manager
-> GossipSeed
-> EventStore (Maybe ClusterInfo)
tryGetGossipFrom :: ClusterSettings
-> Manager -> GossipSeed -> EventStore (Maybe ClusterInfo)
tryGetGossipFrom ClusterSettings{Int
Maybe (NonEmpty GossipSeed)
Maybe DnsServer
ByteString
TimeSpan
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: ClusterSettings -> Maybe DnsServer
clusterGossipTimeout :: ClusterSettings -> TimeSpan
clusterGossipSeeds :: ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: ClusterSettings -> Int
clusterMaxDiscoverAttempts :: ClusterSettings -> Int
clusterDns :: ClusterSettings -> ByteString
..} Manager
mgr GossipSeed
seed = do
Request
init_req <- IO Request -> EventStore Request
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Request -> EventStore Request)
-> IO Request -> EventStore Request
forall a b. (a -> b) -> a -> b
$ EndPoint -> String -> IO Request
httpRequest (GossipSeed -> EndPoint
gossipEndpoint GossipSeed
seed) String
"/gossip?format=json"
let timeout :: Int
timeout = Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
truncate (TimeSpan -> Double
totalMillis TimeSpan
clusterGossipTimeout Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000)
req :: Request
req = Request
init_req { responseTimeout :: ResponseTimeout
responseTimeout = Int -> ResponseTimeout
responseTimeoutMicro Int
timeout }
Either SomeException (Response ByteString)
eithResp <- EventStore (Response ByteString)
-> EventStore (Either SomeException (Response ByteString))
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny (EventStore (Response ByteString)
-> EventStore (Either SomeException (Response ByteString)))
-> EventStore (Response ByteString)
-> EventStore (Either SomeException (Response ByteString))
forall a b. (a -> b) -> a -> b
$ IO (Response ByteString) -> EventStore (Response ByteString)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Response ByteString) -> EventStore (Response ByteString))
-> IO (Response ByteString) -> EventStore (Response ByteString)
forall a b. (a -> b) -> a -> b
$ Request -> Manager -> IO (Response ByteString)
httpLbs Request
req Manager
mgr
case Either SomeException (Response ByteString)
eithResp of
Right Response ByteString
resp -> Maybe ClusterInfo -> EventStore (Maybe ClusterInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ClusterInfo -> EventStore (Maybe ClusterInfo))
-> Maybe ClusterInfo -> EventStore (Maybe ClusterInfo)
forall a b. (a -> b) -> a -> b
$ ByteString -> Maybe ClusterInfo
forall a. FromJSON a => ByteString -> Maybe a
decode (ByteString -> Maybe ClusterInfo)
-> ByteString -> Maybe ClusterInfo
forall a b. (a -> b) -> a -> b
$ Response ByteString -> ByteString
forall body. Response body -> body
responseBody Response ByteString
resp
Left SomeException
err -> do
Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logInfo [i|Failed to get cluster info from [#{seed}], error: #{err}.|]
Maybe ClusterInfo -> EventStore (Maybe ClusterInfo)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ClusterInfo
forall a. Maybe a
Nothing
tryDetermineBestNode :: [MemberInfo] -> Maybe EndPoint
tryDetermineBestNode :: [MemberInfo] -> Maybe EndPoint
tryDetermineBestNode [MemberInfo]
members = Maybe EndPoint
node_m
where
nodes :: [MemberInfo]
nodes = [MemberInfo
m | MemberInfo
m <- [MemberInfo]
members
, MemberInfo -> Bool
_isAlive MemberInfo
m
, VNodeState -> Bool
allowedState (VNodeState -> Bool) -> VNodeState -> Bool
forall a b. (a -> b) -> a -> b
$ MemberInfo -> VNodeState
_state MemberInfo
m
]
node_m :: Maybe EndPoint
node_m =
case (Element [MemberInfo] -> Down VNodeState)
-> [MemberInfo] -> [MemberInfo]
forall o seq.
(Ord o, SemiSequence seq) =>
(Element seq -> o) -> seq -> seq
sortOn (VNodeState -> Down VNodeState
forall a. a -> Down a
Down (VNodeState -> Down VNodeState)
-> (MemberInfo -> VNodeState) -> MemberInfo -> Down VNodeState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MemberInfo -> VNodeState
_state) [MemberInfo]
nodes of
[] -> Maybe EndPoint
forall a. Maybe a
Nothing
MemberInfo
n:[MemberInfo]
_ -> EndPoint -> Maybe EndPoint
forall a. a -> Maybe a
Just (EndPoint -> Maybe EndPoint) -> EndPoint -> Maybe EndPoint
forall a b. (a -> b) -> a -> b
$ String -> Int -> EndPoint
EndPoint (MemberInfo -> String
_externalTcpIp MemberInfo
n) (MemberInfo -> Int
_externalTcpPort MemberInfo
n)
allowedState :: VNodeState -> Bool
allowedState VNodeState
Manager = Bool
False
allowedState VNodeState
ShuttingDown = Bool
False
allowedState VNodeState
Shutdown = Bool
False
allowedState VNodeState
_ = Bool
True
gossipCandidatesFromOldGossip :: Maybe EndPoint
-> [MemberInfo]
-> IO (IOArray Int GossipSeed)
gossipCandidatesFromOldGossip :: Maybe EndPoint -> [MemberInfo] -> IO (IOArray Int GossipSeed)
gossipCandidatesFromOldGossip Maybe EndPoint
fend_m [MemberInfo]
oldGossip =
[MemberInfo] -> IO (IOArray Int GossipSeed)
arrangeGossipCandidates [MemberInfo]
candidates
where
candidates :: [MemberInfo]
candidates =
case Maybe EndPoint
fend_m of
Maybe EndPoint
Nothing -> [MemberInfo]
oldGossip
Just EndPoint
fend -> [ MemberInfo
c | MemberInfo
c <- [MemberInfo]
oldGossip
, String -> Int -> EndPoint
EndPoint (MemberInfo -> String
_externalTcpIp MemberInfo
c) (MemberInfo -> Int
_externalTcpPort MemberInfo
c) EndPoint -> EndPoint -> Bool
forall a. Eq a => a -> a -> Bool
/= EndPoint
fend
]
data AState = AState !Int !Int
arrangeGossipCandidates :: [MemberInfo] -> IO (IOArray Int GossipSeed)
arrangeGossipCandidates :: [MemberInfo] -> IO (IOArray Int GossipSeed)
arrangeGossipCandidates [MemberInfo]
members = do
IOArray Int GossipSeed
arr <- CharPos -> GossipSeed -> IO (IOArray Int GossipSeed)
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
(i, i) -> e -> m (a i e)
newArray (Int
0, Int
len) GossipSeed
emptyGossipSeed
AState Int
idx Int
j <- (AState -> MemberInfo -> IO AState)
-> AState -> [MemberInfo] -> IO AState
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM (IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState
go IOArray Int GossipSeed
arr) (Int -> Int -> AState
AState (-Int
1) Int
len) [MemberInfo]
members
IOArray Int GossipSeed -> Int -> Int -> IO ()
forall a. IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int GossipSeed
arr Int
0 Int
idx
IOArray Int GossipSeed -> Int -> Int -> IO ()
forall a. IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int GossipSeed
arr Int
j (Int
len Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)
IOArray Int GossipSeed -> IO (IOArray Int GossipSeed)
forall (m :: * -> *) a. Monad m => a -> m a
return IOArray Int GossipSeed
arr
where
len :: Int
len = [MemberInfo] -> Int
forall mono. MonoFoldable mono => mono -> Int
length [MemberInfo]
members
go :: IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState
go :: IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState
go IOArray Int GossipSeed
arr (AState Int
idx Int
j) MemberInfo
m =
case MemberInfo -> VNodeState
_state MemberInfo
m of
VNodeState
Manager -> do
let new_j :: Int
new_j = Int
j Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
IOArray Int GossipSeed -> Int -> GossipSeed -> IO ()
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int GossipSeed
arr Int
new_j GossipSeed
seed
AState -> IO AState
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Int -> AState
AState Int
idx Int
new_j)
VNodeState
_ -> do
let new_i :: Int
new_i = Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
IOArray Int GossipSeed -> Int -> GossipSeed -> IO ()
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int GossipSeed
arr Int
new_i GossipSeed
seed
AState -> IO AState
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Int -> AState
AState Int
new_i Int
j)
where
end :: EndPoint
end = String -> Int -> EndPoint
EndPoint (MemberInfo -> String
_externalHttpIp MemberInfo
m) (MemberInfo -> Int
_externalHttpPort MemberInfo
m)
seed :: GossipSeed
seed = EndPoint -> String -> GossipSeed
GossipSeed EndPoint
end String
""
gossipCandidatesFromDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
gossipCandidatesFromDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
gossipCandidatesFromDns settings :: ClusterSettings
settings@ClusterSettings{Int
Maybe (NonEmpty GossipSeed)
Maybe DnsServer
ByteString
TimeSpan
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: ClusterSettings -> Maybe DnsServer
clusterGossipTimeout :: ClusterSettings -> TimeSpan
clusterGossipSeeds :: ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: ClusterSettings -> Int
clusterMaxDiscoverAttempts :: ClusterSettings -> Int
clusterDns :: ClusterSettings -> ByteString
..} = do
IOArray Int GossipSeed
arr <- EventStore (IOArray Int GossipSeed)
endpoints
IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ()) -> IO () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ IOArray Int GossipSeed -> IO ()
forall a. IOArray Int a -> IO ()
shuffleAll IOArray Int GossipSeed
arr
IOArray Int GossipSeed -> EventStore (IOArray Int GossipSeed)
forall (m :: * -> *) a. Monad m => a -> m a
return IOArray Int GossipSeed
arr
where
endpoints :: EventStore (IOArray Int GossipSeed)
endpoints =
case Maybe (NonEmpty GossipSeed)
clusterGossipSeeds of
Maybe (NonEmpty GossipSeed)
Nothing -> ClusterSettings -> EventStore (IOArray Int GossipSeed)
resolveDns ClusterSettings
settings
Just NonEmpty GossipSeed
ss -> let ls :: [GossipSeed]
ls = NonEmpty GossipSeed -> [GossipSeed]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty GossipSeed
ss
len :: Int
len = [GossipSeed] -> Int
forall mono. MonoFoldable mono => mono -> Int
length [GossipSeed]
ls
in IO (IOArray Int GossipSeed) -> EventStore (IOArray Int GossipSeed)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IOArray Int GossipSeed)
-> EventStore (IOArray Int GossipSeed))
-> IO (IOArray Int GossipSeed)
-> EventStore (IOArray Int GossipSeed)
forall a b. (a -> b) -> a -> b
$ CharPos -> [GossipSeed] -> IO (IOArray Int GossipSeed)
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
(i, i) -> [e] -> m (a i e)
newListArray (Int
0, Int
len Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [GossipSeed]
ls
resolveDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
resolveDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
resolveDns ClusterSettings{Int
Maybe (NonEmpty GossipSeed)
Maybe DnsServer
ByteString
TimeSpan
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: ClusterSettings -> Maybe DnsServer
clusterGossipTimeout :: ClusterSettings -> TimeSpan
clusterGossipSeeds :: ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: ClusterSettings -> Int
clusterMaxDiscoverAttempts :: ClusterSettings -> Int
clusterDns :: ClusterSettings -> ByteString
..} = do
let timeoutMicros :: Double
timeoutMicros = TimeSpan -> Double
totalMillis TimeSpan
clusterGossipTimeout Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000
conf :: ResolvConf
conf =
case Maybe DnsServer
clusterDnsServer of
Maybe DnsServer
Nothing -> ResolvConf
defaultResolvConf
Just DnsServer
tpe ->
let rc :: FileOrNumericHost
rc =
case DnsServer
tpe of
DnsFilePath String
p -> String -> FileOrNumericHost
RCFilePath String
p
DnsHostName String
h -> String -> FileOrNumericHost
RCHostName String
h
DnsHostPort String
h Int
p -> String -> PortNumber -> FileOrNumericHost
RCHostPort String
h (Int -> PortNumber
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
p)
in ResolvConf
defaultResolvConf { resolvInfo :: FileOrNumericHost
resolvInfo = FileOrNumericHost
rc }
ResolvSeed
dnsSeed <- IO ResolvSeed -> EventStore ResolvSeed
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ResolvSeed -> EventStore ResolvSeed)
-> IO ResolvSeed -> EventStore ResolvSeed
forall a b. (a -> b) -> a -> b
$ ResolvConf -> IO ResolvSeed
makeResolvSeed ResolvConf
conf
{ resolvTimeout :: Int
resolvTimeout = Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
truncate Double
timeoutMicros
, resolvRetry :: Int
resolvRetry = Int
clusterMaxDiscoverAttempts
}
IO (IOArray Int GossipSeed) -> EventStore (IOArray Int GossipSeed)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IOArray Int GossipSeed)
-> EventStore (IOArray Int GossipSeed))
-> IO (IOArray Int GossipSeed)
-> EventStore (IOArray Int GossipSeed)
forall a b. (a -> b) -> a -> b
$ ResolvSeed
-> (Resolver -> IO (IOArray Int GossipSeed))
-> IO (IOArray Int GossipSeed)
forall a. ResolvSeed -> (Resolver -> IO a) -> IO a
withResolver ResolvSeed
dnsSeed ((Resolver -> IO (IOArray Int GossipSeed))
-> IO (IOArray Int GossipSeed))
-> (Resolver -> IO (IOArray Int GossipSeed))
-> IO (IOArray Int GossipSeed)
forall a b. (a -> b) -> a -> b
$ \Resolver
resv -> do
Either DNSError [IPv4]
result <- Resolver -> ByteString -> IO (Either DNSError [IPv4])
lookupA Resolver
resv ByteString
clusterDns
case Either DNSError [IPv4]
result of
Left DNSError
e -> DnsDiscoveryException -> IO (IOArray Int GossipSeed)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (DnsDiscoveryException -> IO (IOArray Int GossipSeed))
-> DnsDiscoveryException -> IO (IOArray Int GossipSeed)
forall a b. (a -> b) -> a -> b
$ DNSError -> DnsDiscoveryException
DNSDiscoveryError DNSError
e
Right [IPv4]
ips -> do
let len :: Int
len = [IPv4] -> Int
forall mono. MonoFoldable mono => mono -> Int
length [IPv4]
ips Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
IOArray Int GossipSeed
arr <- CharPos -> IO (IOArray Int GossipSeed)
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
(i, i) -> m (a i e)
newArray_ (Int
0, Int
len)
[(Int, IPv4)] -> ((Int, IPv4) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Int] -> [IPv4] -> [(Int, IPv4)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0..] [IPv4]
ips) (((Int, IPv4) -> IO ()) -> IO ())
-> ((Int, IPv4) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Int
idx, IPv4
ip) -> do
let end :: EndPoint
end = String -> Int -> EndPoint
EndPoint (IPv4 -> String
forall a. Show a => a -> String
show IPv4
ip) Int
clusterExternalGossipPort
seed :: GossipSeed
seed = EndPoint -> String -> GossipSeed
GossipSeed EndPoint
end String
""
IOArray Int GossipSeed -> Int -> GossipSeed -> IO ()
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int GossipSeed
arr Int
idx GossipSeed
seed
IOArray Int GossipSeed -> IO (IOArray Int GossipSeed)
forall (m :: * -> *) a. Monad m => a -> m a
return IOArray Int GossipSeed
arr
shuffleAll :: IOArray Int a -> IO ()
shuffleAll :: IOArray Int a -> IO ()
shuffleAll IOArray Int a
arr = do
(Int
low, Int
hig) <- IOArray Int a -> IO CharPos
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> m (i, i)
getBounds IOArray Int a
arr
IOArray Int a -> Int -> Int -> IO ()
forall a. IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int a
arr Int
low Int
hig
shuffle :: IOArray Int a -> Int -> Int -> IO ()
shuffle :: IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int a
arr Int
from Int
to = Int -> Int -> (Int -> IO ()) -> IO ()
forRange_ Int
from Int
to ((Int -> IO ()) -> IO ()) -> (Int -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
cur -> do
Int
idx <- CharPos -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
cur, Int
to)
a
tmp <- IOArray Int a -> Int -> IO a
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> m e
readArray IOArray Int a
arr Int
idx
a
value <- IOArray Int a -> Int -> IO a
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> m e
readArray IOArray Int a
arr Int
cur
IOArray Int a -> Int -> a -> IO ()
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int a
arr Int
idx a
value
IOArray Int a -> Int -> a -> IO ()
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int a
arr Int
cur a
tmp
forRange_ :: Int -> Int -> (Int -> IO ()) -> IO ()
forRange_ :: Int -> Int -> (Int -> IO ()) -> IO ()
forRange_ Int
from Int
to Int -> IO ()
k = do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
from Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
to) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Int -> IO ()
loop (Int
to Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
from
where
loop :: Int -> Int -> IO ()
loop Int
len Int
cur
| Int
len Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
cur = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
| Bool
otherwise = do
Int -> IO ()
k Int
cur
Int -> Int -> IO ()
loop Int
len (Int
cur Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
forArrayFirst :: IOArray Int a
-> (Int -> EventStore (Maybe b))
-> EventStore (Maybe b)
forArrayFirst :: IOArray Int a
-> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forArrayFirst IOArray Int a
arr Int -> EventStore (Maybe b)
k = do
(Int
low, Int
hig) <- IO CharPos -> EventStore CharPos
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO CharPos -> EventStore CharPos)
-> IO CharPos -> EventStore CharPos
forall a b. (a -> b) -> a -> b
$ IOArray Int a -> IO CharPos
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> m (i, i)
getBounds IOArray Int a
arr
Int -> Int -> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forall b.
Int -> Int -> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forRangeFirst Int
low Int
hig Int -> EventStore (Maybe b)
k
forRangeFirst :: Int
-> Int
-> (Int -> EventStore (Maybe b))
-> EventStore (Maybe b)
forRangeFirst :: Int -> Int -> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forRangeFirst Int
from Int
to Int -> EventStore (Maybe b)
k = do
if Int
from Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
to then Int -> Int -> EventStore (Maybe b)
loop (Int
to Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
from else Maybe b -> EventStore (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
where
loop :: Int -> Int -> EventStore (Maybe b)
loop Int
len Int
cur
| Int
len Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
cur = Maybe b -> EventStore (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
| Bool
otherwise = do
Maybe b
res <- Int -> EventStore (Maybe b)
k Int
cur
if Maybe b -> Bool
forall a. Maybe a -> Bool
isJust Maybe b
res then Maybe b -> EventStore (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
res else Int -> Int -> EventStore (Maybe b)
loop Int
len (Int
cur Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)