{-# LANGUAGE DeriveDataTypeable         #-}
{-# LANGUAGE DeriveGeneric              #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings          #-}
{-# LANGUAGE RecordWildCards            #-}
module Database.EventStore.Internal.Discovery
    ( Discovery(..)
    , GossipSeed
    , DnsDiscoveryException(..)
    , ClusterSettings(..)
    , DnsServer(..)
    , EndPoint(..)
    , staticEndPointDiscovery
    , clusterDnsEndPointDiscovery
    , gossipSeedClusterSettings
    , simpleDnsEndPointDiscovery
    , dnsClusterSettings
    , gossipSeed
    , gossipSeedWithHeader
    , gossipSeedHeader
    , gossipSeedHost
    , gossipSeedPort
    ) where
import Prelude (String, fail)
import Data.Maybe
import Control.Exception.Safe (tryAny)
import Data.Aeson
import Data.Aeson.Types
import Data.Array.IO
import Data.DotNet.TimeSpan
import Data.List.NonEmpty (NonEmpty)
import Data.UUID
import Network.HTTP.Client
import Network.DNS hiding (decode)
import System.Random
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
data DnsDiscoveryException
    = MaxDiscoveryAttemptReached ByteString
    | DNSDiscoveryError DNSError
    deriving (Int -> DnsDiscoveryException -> ShowS
[DnsDiscoveryException] -> ShowS
DnsDiscoveryException -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [DnsDiscoveryException] -> ShowS
$cshowList :: [DnsDiscoveryException] -> ShowS
show :: DnsDiscoveryException -> String
$cshow :: DnsDiscoveryException -> String
showsPrec :: Int -> DnsDiscoveryException -> ShowS
$cshowsPrec :: Int -> DnsDiscoveryException -> ShowS
Show, Typeable)
instance Exception DnsDiscoveryException
httpRequest :: EndPoint -> String -> IO Request
httpRequest :: EndPoint -> String -> IO Request
httpRequest (EndPoint String
ip Int
p) String
path = forall (m :: * -> *). MonadThrow m => String -> m Request
parseUrlThrow String
url
  where
    url :: String
url = String
"http://" forall a. Semigroup a => a -> a -> a
<> String
ip forall a. Semigroup a => a -> a -> a
<> String
":" forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
p forall a. Semigroup a => a -> a -> a
<> String
path
data GossipSeed =
    GossipSeed
    { GossipSeed -> EndPoint
gossipEndpoint :: !EndPoint
      
      
      
      
    ,  :: !String
      
    } deriving Int -> GossipSeed -> ShowS
[GossipSeed] -> ShowS
GossipSeed -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GossipSeed] -> ShowS
$cshowList :: [GossipSeed] -> ShowS
show :: GossipSeed -> String
$cshow :: GossipSeed -> String
showsPrec :: Int -> GossipSeed -> ShowS
$cshowsPrec :: Int -> GossipSeed -> ShowS
Show
gossipSeed :: String -> Int -> GossipSeed
gossipSeed :: String -> Int -> GossipSeed
gossipSeed String
h Int
p = EndPoint -> String -> GossipSeed
GossipSeed (String -> Int -> EndPoint
EndPoint String
h Int
p) String
""
gossipSeedWithHeader :: String -> Int -> String -> GossipSeed
 String
h Int
p String
hd = EndPoint -> String -> GossipSeed
GossipSeed (String -> Int -> EndPoint
EndPoint String
h Int
p) String
hd
gossipSeedHost :: GossipSeed -> String
gossipSeedHost :: GossipSeed -> String
gossipSeedHost = EndPoint -> String
endPointIp forall b c a. (b -> c) -> (a -> b) -> a -> c
. GossipSeed -> EndPoint
gossipEndpoint
gossipSeedPort :: GossipSeed -> Int
gossipSeedPort :: GossipSeed -> Int
gossipSeedPort = EndPoint -> Int
endPointPort forall b c a. (b -> c) -> (a -> b) -> a -> c
. GossipSeed -> EndPoint
gossipEndpoint
emptyGossipSeed :: GossipSeed
emptyGossipSeed :: GossipSeed
emptyGossipSeed = EndPoint -> String -> GossipSeed
GossipSeed EndPoint
emptyEndPoint String
""
newtype Discovery =
    Discovery { Discovery -> Maybe EndPoint -> EventStore (Maybe EndPoint)
runDiscovery :: Maybe EndPoint -> EventStore (Maybe EndPoint) }
staticEndPointDiscovery :: String -> Int -> Discovery
staticEndPointDiscovery :: String -> Int -> Discovery
staticEndPointDiscovery String
host Int
port =
    (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
Discovery forall a b. (a -> b) -> a -> b
$ \Maybe EndPoint
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ String -> Int -> EndPoint
EndPoint String
host Int
port
simpleDnsEndPointDiscovery :: ByteString -> Maybe DnsServer -> Int -> Discovery
simpleDnsEndPointDiscovery :: ByteString -> Maybe DnsServer -> Int -> Discovery
simpleDnsEndPointDiscovery ByteString
domain Maybe DnsServer
srv Int
port = (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
Discovery forall a b. (a -> b) -> a -> b
$ \Maybe EndPoint
_ -> do
    let conf :: ResolvConf
conf =
            case Maybe DnsServer
srv of
                Maybe DnsServer
Nothing  -> ResolvConf
defaultResolvConf
                Just DnsServer
tpe ->
                    let rc :: FileOrNumericHost
rc =
                            case DnsServer
tpe of
                                DnsFilePath String
p   -> String -> FileOrNumericHost
RCFilePath String
p
                                DnsHostName String
h   -> String -> FileOrNumericHost
RCHostName String
h
                                DnsHostPort String
h Int
p -> String -> PortNumber -> FileOrNumericHost
RCHostPort String
h (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
p)
                    in ResolvConf
defaultResolvConf { resolvInfo :: FileOrNumericHost
resolvInfo = FileOrNumericHost
rc }
    ResolvSeed
dnsSeed <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ResolvConf -> IO ResolvSeed
makeResolvSeed ResolvConf
conf
    Either DNSError [IPv4]
res     <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. ResolvSeed -> (Resolver -> IO a) -> IO a
withResolver ResolvSeed
dnsSeed forall a b. (a -> b) -> a -> b
$ \Resolver
resv -> Resolver -> ByteString -> IO (Either DNSError [IPv4])
lookupA Resolver
resv ByteString
domain
    case Either DNSError [IPv4]
res of
        Left DNSError
e    -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO forall a b. (a -> b) -> a -> b
$ DNSError -> DnsDiscoveryException
DNSDiscoveryError DNSError
e
        Right [IPv4]
ips -> do
            let pts :: [EndPoint]
pts = [ String -> Int -> EndPoint
EndPoint (forall a. Show a => a -> String
show IPv4
ip) Int
port | IPv4
ip <- [IPv4]
ips ]
            case [EndPoint]
pts of
                []   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
                EndPoint
pt:[EndPoint]
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just EndPoint
pt
data DnsServer
    = DnsFilePath String
    | DnsHostName String
    | DnsHostPort String Int
data ClusterSettings =
    ClusterSettings
    { ClusterSettings -> ByteString
clusterDns :: !ByteString
      
    , ClusterSettings -> Int
clusterMaxDiscoverAttempts :: !Int
      
    , ClusterSettings -> Int
clusterExternalGossipPort :: !Int
      
    , ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterGossipSeeds :: (Maybe (NonEmpty GossipSeed))
      
    , ClusterSettings -> TimeSpan
clusterGossipTimeout :: !TimeSpan
      
    , ClusterSettings -> Maybe DnsServer
clusterDnsServer :: !(Maybe DnsServer)
      
    }
gossipSeedClusterSettings :: NonEmpty GossipSeed -> ClusterSettings
gossipSeedClusterSettings :: NonEmpty GossipSeed -> ClusterSettings
gossipSeedClusterSettings NonEmpty GossipSeed
xs =
    ClusterSettings
    { clusterDns :: ByteString
clusterDns                 = ByteString
""
    , clusterMaxDiscoverAttempts :: Int
clusterMaxDiscoverAttempts = Int
10
    , clusterExternalGossipPort :: Int
clusterExternalGossipPort  = Int
0
    , clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterGossipSeeds         = forall a. a -> Maybe a
Just NonEmpty GossipSeed
xs
    , clusterGossipTimeout :: TimeSpan
clusterGossipTimeout       = Double -> TimeSpan
fromSeconds Double
1
    , clusterDnsServer :: Maybe DnsServer
clusterDnsServer           = forall a. Maybe a
Nothing
    }
dnsClusterSettings :: ByteString -> ClusterSettings
dnsClusterSettings :: ByteString -> ClusterSettings
dnsClusterSettings ByteString
clusterDns = ClusterSettings{Int
ByteString
TimeSpan
forall a. Maybe a
clusterDnsServer :: forall a. Maybe a
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: forall a. Maybe a
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
..}
  where
    clusterMaxDiscoverAttempts :: Int
clusterMaxDiscoverAttempts = Int
10
    clusterExternalGossipPort :: Int
clusterExternalGossipPort  = Int
0
    clusterGossipSeeds :: Maybe a
clusterGossipSeeds         = forall a. Maybe a
Nothing
    clusterGossipTimeout :: TimeSpan
clusterGossipTimeout       = Double -> TimeSpan
fromSeconds Double
1
    clusterDnsServer :: Maybe a
clusterDnsServer           = forall a. Maybe a
Nothing
clusterDnsEndPointDiscovery :: ClusterSettings -> IO Discovery
clusterDnsEndPointDiscovery :: ClusterSettings -> IO Discovery
clusterDnsEndPointDiscovery ClusterSettings
settings = do
    IORef (Maybe [MemberInfo])
ref     <- forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef forall a. Maybe a
Nothing
    Manager
manager <- ManagerSettings -> IO Manager
newManager ManagerSettings
defaultManagerSettings
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
Discovery forall a b. (a -> b) -> a -> b
$ \Maybe EndPoint
fend -> Manager
-> IORef (Maybe [MemberInfo])
-> Maybe EndPoint
-> ClusterSettings
-> EventStore (Maybe EndPoint)
discoverEndPoint Manager
manager IORef (Maybe [MemberInfo])
ref Maybe EndPoint
fend ClusterSettings
settings
data VNodeState
    = Initializing
    | Unknown
    | PreReplica
    | CatchingUp
    | Clone
    | Slave
    | PreMaster
    | Master
    | Manager
    | ShuttingDown
    | Shutdown
    deriving (VNodeState -> VNodeState -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: VNodeState -> VNodeState -> Bool
$c/= :: VNodeState -> VNodeState -> Bool
== :: VNodeState -> VNodeState -> Bool
$c== :: VNodeState -> VNodeState -> Bool
Eq, Eq VNodeState
VNodeState -> VNodeState -> Bool
VNodeState -> VNodeState -> Ordering
VNodeState -> VNodeState -> VNodeState
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: VNodeState -> VNodeState -> VNodeState
$cmin :: VNodeState -> VNodeState -> VNodeState
max :: VNodeState -> VNodeState -> VNodeState
$cmax :: VNodeState -> VNodeState -> VNodeState
>= :: VNodeState -> VNodeState -> Bool
$c>= :: VNodeState -> VNodeState -> Bool
> :: VNodeState -> VNodeState -> Bool
$c> :: VNodeState -> VNodeState -> Bool
<= :: VNodeState -> VNodeState -> Bool
$c<= :: VNodeState -> VNodeState -> Bool
< :: VNodeState -> VNodeState -> Bool
$c< :: VNodeState -> VNodeState -> Bool
compare :: VNodeState -> VNodeState -> Ordering
$ccompare :: VNodeState -> VNodeState -> Ordering
Ord, forall x. Rep VNodeState x -> VNodeState
forall x. VNodeState -> Rep VNodeState x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep VNodeState x -> VNodeState
$cfrom :: forall x. VNodeState -> Rep VNodeState x
Generic, Int -> VNodeState -> ShowS
[VNodeState] -> ShowS
VNodeState -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [VNodeState] -> ShowS
$cshowList :: [VNodeState] -> ShowS
show :: VNodeState -> String
$cshow :: VNodeState -> String
showsPrec :: Int -> VNodeState -> ShowS
$cshowsPrec :: Int -> VNodeState -> ShowS
Show)
instance FromJSON VNodeState
newtype GUUID = GUUID UUID deriving Int -> GUUID -> ShowS
[GUUID] -> ShowS
GUUID -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GUUID] -> ShowS
$cshowList :: [GUUID] -> ShowS
show :: GUUID -> String
$cshow :: GUUID -> String
showsPrec :: Int -> GUUID -> ShowS
$cshowsPrec :: Int -> GUUID -> ShowS
Show
instance FromJSON GUUID where
    parseJSON :: Value -> Parser GUUID
parseJSON (String Text
txt) =
        case Text -> Maybe UUID
fromText Text
txt of
            Just UUID
uuid -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ UUID -> GUUID
GUUID UUID
uuid
            Maybe UUID
_         -> forall (m :: * -> *) a. MonadFail m => String -> m a
fail forall a b. (a -> b) -> a -> b
$ String
"Wrong UUID format " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Text
txt
    parseJSON Value
invalid = forall a. String -> Value -> Parser a
typeMismatch String
"UUID" Value
invalid
data MemberInfo =
    MemberInfo
    { MemberInfo -> GUUID
_instanceId :: !GUUID
    , MemberInfo -> VNodeState
_state :: !VNodeState
    , MemberInfo -> Bool
_isAlive :: !Bool
    , MemberInfo -> String
_internalTcpIp :: !String
    , MemberInfo -> Int
_internalTcpPort :: !Int
    , MemberInfo -> String
_externalTcpIp :: !String
    , MemberInfo -> Int
_externalTcpPort :: !Int
    , MemberInfo -> String
_internalHttpIp :: !String
    , MemberInfo -> Int
_internalHttpPort :: !Int
    , MemberInfo -> String
_externalHttpIp :: !String
    , MemberInfo -> Int
_externalHttpPort :: !Int
    , MemberInfo -> Int64
_lastCommitPosition :: !Int64
    , MemberInfo -> Int64
_writerCheckpoint :: !Int64
    , MemberInfo -> Int64
_chaserCheckpoint :: !Int64
    , MemberInfo -> Int64
_epochPosition :: !Int64
    , MemberInfo -> Int
_epochNumber :: !Int
    , MemberInfo -> GUUID
_epochId :: !GUUID
    , MemberInfo -> Int
_nodePriority :: !Int
    } deriving Int -> MemberInfo -> ShowS
[MemberInfo] -> ShowS
MemberInfo -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [MemberInfo] -> ShowS
$cshowList :: [MemberInfo] -> ShowS
show :: MemberInfo -> String
$cshow :: MemberInfo -> String
showsPrec :: Int -> MemberInfo -> ShowS
$cshowsPrec :: Int -> MemberInfo -> ShowS
Show
instance FromJSON MemberInfo where
    parseJSON :: Value -> Parser MemberInfo
parseJSON (Object Object
m) =
        GUUID
-> VNodeState
-> Bool
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo
MemberInfo
        forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"instanceId"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"state"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"isAlive"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalTcpIp"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalTcpPort"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalTcpIp"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalTcpPort"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalHttpIp"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalHttpPort"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalHttpIp"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalHttpPort"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"lastCommitPosition"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"writerCheckpoint"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"chaserCheckpoint"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"epochPosition"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"epochNumber"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"epochId"
        forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"nodePriority"
    parseJSON Value
invalid = forall a. String -> Value -> Parser a
typeMismatch String
"MemberInfo" Value
invalid
data ClusterInfo =
    ClusterInfo { ClusterInfo -> [MemberInfo]
members :: [MemberInfo] }
    deriving (Int -> ClusterInfo -> ShowS
[ClusterInfo] -> ShowS
ClusterInfo -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ClusterInfo] -> ShowS
$cshowList :: [ClusterInfo] -> ShowS
show :: ClusterInfo -> String
$cshow :: ClusterInfo -> String
showsPrec :: Int -> ClusterInfo -> ShowS
$cshowsPrec :: Int -> ClusterInfo -> ShowS
Show, forall x. Rep ClusterInfo x -> ClusterInfo
forall x. ClusterInfo -> Rep ClusterInfo x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ClusterInfo x -> ClusterInfo
$cfrom :: forall x. ClusterInfo -> Rep ClusterInfo x
Generic)
instance FromJSON ClusterInfo
discoverEndPoint :: Manager
                 -> IORef (Maybe [MemberInfo])
                 -> Maybe EndPoint
                 -> ClusterSettings
                 -> EventStore (Maybe EndPoint)
discoverEndPoint :: Manager
-> IORef (Maybe [MemberInfo])
-> Maybe EndPoint
-> ClusterSettings
-> EventStore (Maybe EndPoint)
discoverEndPoint Manager
mgr IORef (Maybe [MemberInfo])
ref Maybe EndPoint
fend ClusterSettings
settings = do
    Maybe [MemberInfo]
old_m <- forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef (Maybe [MemberInfo])
ref
    forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef IORef (Maybe [MemberInfo])
ref forall a. Maybe a
Nothing
    IOArray Int GossipSeed
candidates <- case Maybe [MemberInfo]
old_m of
        Maybe [MemberInfo]
Nothing  -> ClusterSettings -> EventStore (IOArray Int GossipSeed)
gossipCandidatesFromDns ClusterSettings
settings
        Just [MemberInfo]
old -> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Maybe EndPoint -> [MemberInfo] -> IO (IOArray Int GossipSeed)
gossipCandidatesFromOldGossip Maybe EndPoint
fend [MemberInfo]
old
    forall a b.
IOArray Int a
-> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forArrayFirst IOArray Int GossipSeed
candidates forall a b. (a -> b) -> a -> b
$ \Int
idx -> do
        GossipSeed
c   <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> m e
readArray IOArray Int GossipSeed
candidates Int
idx
        Maybe ClusterInfo
res <- ClusterSettings
-> Manager -> GossipSeed -> EventStore (Maybe ClusterInfo)
tryGetGossipFrom ClusterSettings
settings Manager
mgr GossipSeed
c
        let fin_end :: Maybe (ClusterInfo, EndPoint)
fin_end = do
                ClusterInfo
info <- Maybe ClusterInfo
res
                EndPoint
best <- [MemberInfo] -> Maybe EndPoint
tryDetermineBestNode forall a b. (a -> b) -> a -> b
$ ClusterInfo -> [MemberInfo]
members ClusterInfo
info
                forall (m :: * -> *) a. Monad m => a -> m a
return (ClusterInfo
info, EndPoint
best)
        case Maybe (ClusterInfo, EndPoint)
fin_end of
            Maybe (ClusterInfo, EndPoint)
Nothing   -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
            Just (ClusterInfo
info, EndPoint
best) -> do
                forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef  IORef (Maybe [MemberInfo])
ref (forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ ClusterInfo -> [MemberInfo]
members ClusterInfo
info)
                forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. a -> Maybe a
Just EndPoint
best
tryGetGossipFrom :: ClusterSettings
                 -> Manager
                 -> GossipSeed
                 -> EventStore (Maybe ClusterInfo)
tryGetGossipFrom :: ClusterSettings
-> Manager -> GossipSeed -> EventStore (Maybe ClusterInfo)
tryGetGossipFrom ClusterSettings{Int
Maybe (NonEmpty GossipSeed)
Maybe DnsServer
ByteString
TimeSpan
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: ClusterSettings -> Maybe DnsServer
clusterGossipTimeout :: ClusterSettings -> TimeSpan
clusterGossipSeeds :: ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: ClusterSettings -> Int
clusterMaxDiscoverAttempts :: ClusterSettings -> Int
clusterDns :: ClusterSettings -> ByteString
..} Manager
mgr GossipSeed
seed = do
    Request
init_req <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ EndPoint -> String -> IO Request
httpRequest (GossipSeed -> EndPoint
gossipEndpoint GossipSeed
seed) String
"/gossip?format=json"
    let timeout :: Int
timeout = forall a b. (RealFrac a, Integral b) => a -> b
truncate (TimeSpan -> Double
totalMillis TimeSpan
clusterGossipTimeout forall a. Num a => a -> a -> a
* Double
1000)
        req :: Request
req     = Request
init_req { responseTimeout :: ResponseTimeout
responseTimeout = Int -> ResponseTimeout
responseTimeoutMicro Int
timeout }
    Either SomeException (Response ByteString)
eithResp <- forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Request -> Manager -> IO (Response ByteString)
httpLbs Request
req Manager
mgr
    case Either SomeException (Response ByteString)
eithResp of
        Right Response ByteString
resp -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ forall a. FromJSON a => ByteString -> Maybe a
decode forall a b. (a -> b) -> a -> b
$ forall body. Response body -> body
responseBody Response ByteString
resp
        Left SomeException
err   -> do
            $Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
logInfo [i|Failed to get cluster info from [#{seed}], error: #{err}.|]
            forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Maybe a
Nothing
tryDetermineBestNode :: [MemberInfo] -> Maybe EndPoint
tryDetermineBestNode :: [MemberInfo] -> Maybe EndPoint
tryDetermineBestNode [MemberInfo]
members = Maybe EndPoint
node_m
  where
    nodes :: [MemberInfo]
nodes = [MemberInfo
m | MemberInfo
m <- [MemberInfo]
members
               , MemberInfo -> Bool
_isAlive MemberInfo
m
               , VNodeState -> Bool
allowedState forall a b. (a -> b) -> a -> b
$ MemberInfo -> VNodeState
_state MemberInfo
m
               ]
    node_m :: Maybe EndPoint
node_m =
        case forall o seq.
(Ord o, SemiSequence seq) =>
(Element seq -> o) -> seq -> seq
sortOn (forall a. a -> Down a
Down forall b c a. (b -> c) -> (a -> b) -> a -> c
. MemberInfo -> VNodeState
_state) [MemberInfo]
nodes of
            []  -> forall a. Maybe a
Nothing
            MemberInfo
n:[MemberInfo]
_ -> forall a. a -> Maybe a
Just forall a b. (a -> b) -> a -> b
$ String -> Int -> EndPoint
EndPoint (MemberInfo -> String
_externalTcpIp MemberInfo
n) (MemberInfo -> Int
_externalTcpPort MemberInfo
n)
    allowedState :: VNodeState -> Bool
allowedState VNodeState
Manager      = Bool
False
    allowedState VNodeState
ShuttingDown = Bool
False
    allowedState VNodeState
Shutdown     = Bool
False
    allowedState VNodeState
_            = Bool
True
gossipCandidatesFromOldGossip :: Maybe EndPoint
                              -> [MemberInfo]
                              -> IO (IOArray Int GossipSeed)
gossipCandidatesFromOldGossip :: Maybe EndPoint -> [MemberInfo] -> IO (IOArray Int GossipSeed)
gossipCandidatesFromOldGossip Maybe EndPoint
fend_m [MemberInfo]
oldGossip =
    [MemberInfo] -> IO (IOArray Int GossipSeed)
arrangeGossipCandidates [MemberInfo]
candidates
  where
    candidates :: [MemberInfo]
candidates =
        case Maybe EndPoint
fend_m of
            Maybe EndPoint
Nothing   -> [MemberInfo]
oldGossip
            Just EndPoint
fend -> [ MemberInfo
c | MemberInfo
c <- [MemberInfo]
oldGossip
                             , String -> Int -> EndPoint
EndPoint (MemberInfo -> String
_externalTcpIp MemberInfo
c) (MemberInfo -> Int
_externalTcpPort MemberInfo
c) forall a. Eq a => a -> a -> Bool
/= EndPoint
fend
                             ]
data AState = AState !Int !Int
arrangeGossipCandidates :: [MemberInfo] -> IO (IOArray Int GossipSeed)
arrangeGossipCandidates :: [MemberInfo] -> IO (IOArray Int GossipSeed)
arrangeGossipCandidates [MemberInfo]
members = do
    IOArray Int GossipSeed
arr <- forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
(i, i) -> e -> m (a i e)
newArray (Int
0, Int
len) GossipSeed
emptyGossipSeed
    AState Int
idx Int
j <- forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM (IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState
go IOArray Int GossipSeed
arr) (Int -> Int -> AState
AState (-Int
1) Int
len) [MemberInfo]
members
    forall a. IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int GossipSeed
arr Int
0 Int
idx         
    forall a. IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int GossipSeed
arr Int
j (Int
len forall a. Num a => a -> a -> a
- Int
1) 
    forall (m :: * -> *) a. Monad m => a -> m a
return IOArray Int GossipSeed
arr
  where
    len :: Int
len = forall mono. MonoFoldable mono => mono -> Int
length [MemberInfo]
members
    go :: IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState
    go :: IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState
go IOArray Int GossipSeed
arr (AState Int
idx Int
j) MemberInfo
m =
        case MemberInfo -> VNodeState
_state MemberInfo
m of
            VNodeState
Manager -> do
                let new_j :: Int
new_j = Int
j forall a. Num a => a -> a -> a
- Int
1
                forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int GossipSeed
arr Int
new_j GossipSeed
seed
                forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Int -> AState
AState Int
idx Int
new_j)
            VNodeState
_ -> do
                let new_i :: Int
new_i = Int
idx forall a. Num a => a -> a -> a
+ Int
1
                forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int GossipSeed
arr Int
new_i GossipSeed
seed
                forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Int -> AState
AState Int
new_i Int
j)
      where
        end :: EndPoint
end  = String -> Int -> EndPoint
EndPoint (MemberInfo -> String
_externalHttpIp MemberInfo
m) (MemberInfo -> Int
_externalHttpPort MemberInfo
m)
        seed :: GossipSeed
seed = EndPoint -> String -> GossipSeed
GossipSeed EndPoint
end String
""
gossipCandidatesFromDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
gossipCandidatesFromDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
gossipCandidatesFromDns settings :: ClusterSettings
settings@ClusterSettings{Int
Maybe (NonEmpty GossipSeed)
Maybe DnsServer
ByteString
TimeSpan
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: ClusterSettings -> Maybe DnsServer
clusterGossipTimeout :: ClusterSettings -> TimeSpan
clusterGossipSeeds :: ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: ClusterSettings -> Int
clusterMaxDiscoverAttempts :: ClusterSettings -> Int
clusterDns :: ClusterSettings -> ByteString
..} = do
    IOArray Int GossipSeed
arr <- EventStore (IOArray Int GossipSeed)
endpoints
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IOArray Int a -> IO ()
shuffleAll IOArray Int GossipSeed
arr
    forall (m :: * -> *) a. Monad m => a -> m a
return IOArray Int GossipSeed
arr
  where
    endpoints :: EventStore (IOArray Int GossipSeed)
endpoints =
        case Maybe (NonEmpty GossipSeed)
clusterGossipSeeds of
            Maybe (NonEmpty GossipSeed)
Nothing -> ClusterSettings -> EventStore (IOArray Int GossipSeed)
resolveDns ClusterSettings
settings
            Just NonEmpty GossipSeed
ss -> let ls :: [GossipSeed]
ls  = forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty GossipSeed
ss
                           len :: Int
len = forall mono. MonoFoldable mono => mono -> Int
length [GossipSeed]
ls
                  in forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
(i, i) -> [e] -> m (a i e)
newListArray (Int
0, Int
len forall a. Num a => a -> a -> a
- Int
1) [GossipSeed]
ls
resolveDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
resolveDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
resolveDns ClusterSettings{Int
Maybe (NonEmpty GossipSeed)
Maybe DnsServer
ByteString
TimeSpan
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: ClusterSettings -> Maybe DnsServer
clusterGossipTimeout :: ClusterSettings -> TimeSpan
clusterGossipSeeds :: ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: ClusterSettings -> Int
clusterMaxDiscoverAttempts :: ClusterSettings -> Int
clusterDns :: ClusterSettings -> ByteString
..} = do
    let timeoutMicros :: Double
timeoutMicros = TimeSpan -> Double
totalMillis TimeSpan
clusterGossipTimeout forall a. Num a => a -> a -> a
* Double
1000
        conf :: ResolvConf
conf =
            case Maybe DnsServer
clusterDnsServer of
                Maybe DnsServer
Nothing  -> ResolvConf
defaultResolvConf
                Just DnsServer
tpe ->
                    let rc :: FileOrNumericHost
rc =
                            case DnsServer
tpe of
                                DnsFilePath String
p   -> String -> FileOrNumericHost
RCFilePath String
p
                                DnsHostName String
h   -> String -> FileOrNumericHost
RCHostName String
h
                                DnsHostPort String
h Int
p -> String -> PortNumber -> FileOrNumericHost
RCHostPort String
h (forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
p)
                    in ResolvConf
defaultResolvConf { resolvInfo :: FileOrNumericHost
resolvInfo = FileOrNumericHost
rc  }
    ResolvSeed
dnsSeed <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ ResolvConf -> IO ResolvSeed
makeResolvSeed ResolvConf
conf
               { resolvTimeout :: Int
resolvTimeout = forall a b. (RealFrac a, Integral b) => a -> b
truncate Double
timeoutMicros
               , resolvRetry :: Int
resolvRetry   = Int
clusterMaxDiscoverAttempts
               }
    forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. ResolvSeed -> (Resolver -> IO a) -> IO a
withResolver ResolvSeed
dnsSeed forall a b. (a -> b) -> a -> b
$ \Resolver
resv -> do
        Either DNSError [IPv4]
result <- Resolver -> ByteString -> IO (Either DNSError [IPv4])
lookupA Resolver
resv ByteString
clusterDns
        case Either DNSError [IPv4]
result of
            Left DNSError
e    -> forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO forall a b. (a -> b) -> a -> b
$ DNSError -> DnsDiscoveryException
DNSDiscoveryError DNSError
e
            Right [IPv4]
ips -> do
                let len :: Int
len = forall mono. MonoFoldable mono => mono -> Int
length [IPv4]
ips forall a. Num a => a -> a -> a
- Int
1
                IOArray Int GossipSeed
arr <- forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
(i, i) -> m (a i e)
newArray_ (Int
0, Int
len)
                forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0..] [IPv4]
ips) forall a b. (a -> b) -> a -> b
$ \(Int
idx, IPv4
ip) -> do
                    let end :: EndPoint
end  = String -> Int -> EndPoint
EndPoint (forall a. Show a => a -> String
show IPv4
ip) Int
clusterExternalGossipPort
                        seed :: GossipSeed
seed = EndPoint -> String -> GossipSeed
GossipSeed EndPoint
end String
""
                    forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int GossipSeed
arr Int
idx GossipSeed
seed
                forall (m :: * -> *) a. Monad m => a -> m a
return IOArray Int GossipSeed
arr
shuffleAll :: IOArray Int a -> IO ()
shuffleAll :: forall a. IOArray Int a -> IO ()
shuffleAll IOArray Int a
arr = do
    (Int
low, Int
hig) <- forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> m (i, i)
getBounds IOArray Int a
arr
    forall a. IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int a
arr Int
low Int
hig
shuffle :: IOArray Int a -> Int -> Int -> IO ()
shuffle :: forall a. IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int a
arr Int
from Int
to = Int -> Int -> (Int -> IO ()) -> IO ()
forRange_ Int
from Int
to forall a b. (a -> b) -> a -> b
$ \Int
cur -> do
    Int
idx   <- forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
cur, Int
to)
    a
tmp   <- forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> m e
readArray IOArray Int a
arr Int
idx
    a
value <- forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> m e
readArray IOArray Int a
arr Int
cur
    forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int a
arr Int
idx a
value
    forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int a
arr Int
cur a
tmp
forRange_ :: Int -> Int -> (Int -> IO ()) -> IO ()
forRange_ :: Int -> Int -> (Int -> IO ()) -> IO ()
forRange_ Int
from Int
to Int -> IO ()
k = do
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
from forall a. Ord a => a -> a -> Bool
<= Int
to) forall a b. (a -> b) -> a -> b
$ Int -> Int -> IO ()
loop (Int
to forall a. Num a => a -> a -> a
+ Int
1) Int
from
  where
    loop :: Int -> Int -> IO ()
loop Int
len Int
cur
        | Int
len forall a. Eq a => a -> a -> Bool
== Int
cur = forall (m :: * -> *) a. Monad m => a -> m a
return ()
        | Bool
otherwise  = do
              Int -> IO ()
k Int
cur
              Int -> Int -> IO ()
loop Int
len (Int
cur forall a. Num a => a -> a -> a
+ Int
1)
forArrayFirst :: IOArray Int a
              -> (Int -> EventStore (Maybe b))
              -> EventStore (Maybe b)
forArrayFirst :: forall a b.
IOArray Int a
-> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forArrayFirst IOArray Int a
arr Int -> EventStore (Maybe b)
k = do
    (Int
low, Int
hig) <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> m (i, i)
getBounds IOArray Int a
arr
    forall b.
Int -> Int -> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forRangeFirst Int
low Int
hig Int -> EventStore (Maybe b)
k
forRangeFirst :: Int
              -> Int
              -> (Int -> EventStore (Maybe b))
              -> EventStore (Maybe b)
forRangeFirst :: forall b.
Int -> Int -> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forRangeFirst Int
from Int
to Int -> EventStore (Maybe b)
k = do
    if Int
from forall a. Ord a => a -> a -> Bool
<= Int
to then Int -> Int -> EventStore (Maybe b)
loop (Int
to forall a. Num a => a -> a -> a
+ Int
1) Int
from else forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
  where
    loop :: Int -> Int -> EventStore (Maybe b)
loop Int
len Int
cur
        | Int
len forall a. Eq a => a -> a -> Bool
== Int
cur = forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Maybe a
Nothing
        | Bool
otherwise  = do
              Maybe b
res <- Int -> EventStore (Maybe b)
k Int
cur
              if forall a. Maybe a -> Bool
isJust Maybe b
res then forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
res else Int -> Int -> EventStore (Maybe b)
loop Int
len (Int
cur forall a. Num a => a -> a -> a
+ Int
1)