{-# LANGUAGE DeriveDataTypeable         #-}
{-# LANGUAGE DeriveGeneric              #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings          #-}
{-# LANGUAGE RecordWildCards            #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Discovery
-- Copyright : (C) 2016 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Discovery
    ( Discovery(..)
    , GossipSeed
    , DnsDiscoveryException(..)
    , ClusterSettings(..)
    , DnsServer(..)
    , EndPoint(..)
    , staticEndPointDiscovery
    , clusterDnsEndPointDiscovery
    , gossipSeedClusterSettings
    , simpleDnsEndPointDiscovery
    , dnsClusterSettings
    , gossipSeed
    , gossipSeedWithHeader
    , gossipSeedHeader
    , gossipSeedHost
    , gossipSeedPort
    ) where

--------------------------------------------------------------------------------
import Prelude (String, fail)
import Data.Maybe

--------------------------------------------------------------------------------
import Control.Exception.Safe (tryAny)
import Data.Aeson
import Data.Aeson.Types
import Data.Array.IO
import Data.DotNet.TimeSpan
import Data.List.NonEmpty (NonEmpty)
import Data.UUID
import Network.HTTP.Client
import Network.DNS hiding (decode)
import System.Random

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude

--------------------------------------------------------------------------------
data DnsDiscoveryException
    = MaxDiscoveryAttemptReached ByteString
    | DNSDiscoveryError DNSError
    deriving (Int -> DnsDiscoveryException -> ShowS
[DnsDiscoveryException] -> ShowS
DnsDiscoveryException -> String
(Int -> DnsDiscoveryException -> ShowS)
-> (DnsDiscoveryException -> String)
-> ([DnsDiscoveryException] -> ShowS)
-> Show DnsDiscoveryException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [DnsDiscoveryException] -> ShowS
$cshowList :: [DnsDiscoveryException] -> ShowS
show :: DnsDiscoveryException -> String
$cshow :: DnsDiscoveryException -> String
showsPrec :: Int -> DnsDiscoveryException -> ShowS
$cshowsPrec :: Int -> DnsDiscoveryException -> ShowS
Show, Typeable)

--------------------------------------------------------------------------------
instance Exception DnsDiscoveryException

--------------------------------------------------------------------------------
httpRequest :: EndPoint -> String -> IO Request
httpRequest :: EndPoint -> String -> IO Request
httpRequest (EndPoint String
ip Int
p) String
path = String -> IO Request
forall (m :: * -> *). MonadThrow m => String -> m Request
parseUrlThrow String
url
  where
    url :: String
url = String
"http://" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
ip String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
":" String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
p String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
path

--------------------------------------------------------------------------------
-- | Represents a source of cluster gossip.
data GossipSeed =
    GossipSeed
    { GossipSeed -> EndPoint
gossipEndpoint :: !EndPoint
      -- ^ The endpoint for the external HTTP endpoint of the gossip seed. The
      --   HTTP endpoint is used rather than the TCP endpoint because it is
      --   required for the client to exchange gossip with the server.
      --   standard port which should be used here in 2113.
    , GossipSeed -> String
gossipSeedHeader :: !String
      -- ^ The host header to be sent when requesting gossip.
    } deriving Int -> GossipSeed -> ShowS
[GossipSeed] -> ShowS
GossipSeed -> String
(Int -> GossipSeed -> ShowS)
-> (GossipSeed -> String)
-> ([GossipSeed] -> ShowS)
-> Show GossipSeed
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GossipSeed] -> ShowS
$cshowList :: [GossipSeed] -> ShowS
show :: GossipSeed -> String
$cshow :: GossipSeed -> String
showsPrec :: Int -> GossipSeed -> ShowS
$cshowsPrec :: Int -> GossipSeed -> ShowS
Show

--------------------------------------------------------------------------------
-- | Creates a 'GossipSeed'.
gossipSeed :: String -> Int -> GossipSeed
gossipSeed :: String -> Int -> GossipSeed
gossipSeed String
h Int
p = EndPoint -> String -> GossipSeed
GossipSeed (String -> Int -> EndPoint
EndPoint String
h Int
p) String
""

--------------------------------------------------------------------------------
-- | Creates a 'GossipSeed' with a specific HTTP header.
gossipSeedWithHeader :: String -> Int -> String -> GossipSeed
gossipSeedWithHeader :: String -> Int -> String -> GossipSeed
gossipSeedWithHeader String
h Int
p String
hd = EndPoint -> String -> GossipSeed
GossipSeed (String -> Int -> EndPoint
EndPoint String
h Int
p) String
hd

--------------------------------------------------------------------------------
-- | Returns 'GossipSeed' host IP address.
gossipSeedHost :: GossipSeed -> String
gossipSeedHost :: GossipSeed -> String
gossipSeedHost = EndPoint -> String
endPointIp (EndPoint -> String)
-> (GossipSeed -> EndPoint) -> GossipSeed -> String
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GossipSeed -> EndPoint
gossipEndpoint

--------------------------------------------------------------------------------
-- | Returns 'GossipSeed' port.
gossipSeedPort :: GossipSeed -> Int
gossipSeedPort :: GossipSeed -> Int
gossipSeedPort = EndPoint -> Int
endPointPort (EndPoint -> Int) -> (GossipSeed -> EndPoint) -> GossipSeed -> Int
forall b c a. (b -> c) -> (a -> b) -> a -> c
. GossipSeed -> EndPoint
gossipEndpoint

--------------------------------------------------------------------------------
emptyGossipSeed :: GossipSeed
emptyGossipSeed :: GossipSeed
emptyGossipSeed = EndPoint -> String -> GossipSeed
GossipSeed EndPoint
emptyEndPoint String
""

--------------------------------------------------------------------------------
-- | Procedure used to discover an network 'EndPoint'.
newtype Discovery =
    Discovery { Discovery -> Maybe EndPoint -> EventStore (Maybe EndPoint)
runDiscovery :: Maybe EndPoint -> EventStore (Maybe EndPoint) }

--------------------------------------------------------------------------------
staticEndPointDiscovery :: String -> Int -> Discovery
staticEndPointDiscovery :: String -> Int -> Discovery
staticEndPointDiscovery String
host Int
port =
    (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
Discovery ((Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery)
-> (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
forall a b. (a -> b) -> a -> b
$ \Maybe EndPoint
_ -> Maybe EndPoint -> EventStore (Maybe EndPoint)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe EndPoint -> EventStore (Maybe EndPoint))
-> Maybe EndPoint -> EventStore (Maybe EndPoint)
forall a b. (a -> b) -> a -> b
$ EndPoint -> Maybe EndPoint
forall a. a -> Maybe a
Just (EndPoint -> Maybe EndPoint) -> EndPoint -> Maybe EndPoint
forall a b. (a -> b) -> a -> b
$ String -> Int -> EndPoint
EndPoint String
host Int
port

--------------------------------------------------------------------------------
simpleDnsEndPointDiscovery :: ByteString -> Maybe DnsServer -> Int -> Discovery
simpleDnsEndPointDiscovery :: ByteString -> Maybe DnsServer -> Int -> Discovery
simpleDnsEndPointDiscovery ByteString
domain Maybe DnsServer
srv Int
port = (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
Discovery ((Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery)
-> (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
forall a b. (a -> b) -> a -> b
$ \Maybe EndPoint
_ -> do
    let conf :: ResolvConf
conf =
            case Maybe DnsServer
srv of
                Maybe DnsServer
Nothing  -> ResolvConf
defaultResolvConf
                Just DnsServer
tpe ->
                    let rc :: FileOrNumericHost
rc =
                            case DnsServer
tpe of
                                DnsFilePath String
p   -> String -> FileOrNumericHost
RCFilePath String
p
                                DnsHostName String
h   -> String -> FileOrNumericHost
RCHostName String
h
                                DnsHostPort String
h Int
p -> String -> PortNumber -> FileOrNumericHost
RCHostPort String
h (Int -> PortNumber
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
p)
                    in ResolvConf
defaultResolvConf { resolvInfo :: FileOrNumericHost
resolvInfo = FileOrNumericHost
rc }
    ResolvSeed
dnsSeed <- IO ResolvSeed -> EventStore ResolvSeed
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ResolvSeed -> EventStore ResolvSeed)
-> IO ResolvSeed -> EventStore ResolvSeed
forall a b. (a -> b) -> a -> b
$ ResolvConf -> IO ResolvSeed
makeResolvSeed ResolvConf
conf
    Either DNSError [IPv4]
res     <- IO (Either DNSError [IPv4]) -> EventStore (Either DNSError [IPv4])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either DNSError [IPv4])
 -> EventStore (Either DNSError [IPv4]))
-> IO (Either DNSError [IPv4])
-> EventStore (Either DNSError [IPv4])
forall a b. (a -> b) -> a -> b
$ ResolvSeed
-> (Resolver -> IO (Either DNSError [IPv4]))
-> IO (Either DNSError [IPv4])
forall a. ResolvSeed -> (Resolver -> IO a) -> IO a
withResolver ResolvSeed
dnsSeed ((Resolver -> IO (Either DNSError [IPv4]))
 -> IO (Either DNSError [IPv4]))
-> (Resolver -> IO (Either DNSError [IPv4]))
-> IO (Either DNSError [IPv4])
forall a b. (a -> b) -> a -> b
$ \Resolver
resv -> Resolver -> ByteString -> IO (Either DNSError [IPv4])
lookupA Resolver
resv ByteString
domain
    case Either DNSError [IPv4]
res of
        Left DNSError
e    -> DnsDiscoveryException -> EventStore (Maybe EndPoint)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (DnsDiscoveryException -> EventStore (Maybe EndPoint))
-> DnsDiscoveryException -> EventStore (Maybe EndPoint)
forall a b. (a -> b) -> a -> b
$ DNSError -> DnsDiscoveryException
DNSDiscoveryError DNSError
e
        Right [IPv4]
ips -> do
            let pts :: [EndPoint]
pts = [ String -> Int -> EndPoint
EndPoint (IPv4 -> String
forall a. Show a => a -> String
show IPv4
ip) Int
port | IPv4
ip <- [IPv4]
ips ]
            case [EndPoint]
pts of
                []   -> Maybe EndPoint -> EventStore (Maybe EndPoint)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe EndPoint
forall a. Maybe a
Nothing
                EndPoint
pt:[EndPoint]
_ -> Maybe EndPoint -> EventStore (Maybe EndPoint)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe EndPoint -> EventStore (Maybe EndPoint))
-> Maybe EndPoint -> EventStore (Maybe EndPoint)
forall a b. (a -> b) -> a -> b
$ EndPoint -> Maybe EndPoint
forall a. a -> Maybe a
Just EndPoint
pt

--------------------------------------------------------------------------------
-- | Tells how the DNS server should be contacted.
data DnsServer
    = DnsFilePath String
    | DnsHostName String
    | DnsHostPort String Int

--------------------------------------------------------------------------------
-- | Contains settings related to a connection to a cluster.
data ClusterSettings =
    ClusterSettings
    { ClusterSettings -> ByteString
clusterDns :: !ByteString
      -- ^ The DNS name to use for discovering endpoints.
    , ClusterSettings -> Int
clusterMaxDiscoverAttempts :: !Int
      -- ^ The maximum number of attempts for discovering endpoints.
    , ClusterSettings -> Int
clusterExternalGossipPort :: !Int
      -- ^ The well-known endpoint on which cluster managers are running.
    , ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterGossipSeeds :: (Maybe (NonEmpty GossipSeed))
      -- ^ Endpoints for seeding gossip if not using DNS.
    , ClusterSettings -> TimeSpan
clusterGossipTimeout :: !TimeSpan
      -- ^ Timeout for cluster gossip.
    , ClusterSettings -> Maybe DnsServer
clusterDnsServer :: !(Maybe DnsServer)
      -- ^ Indicates a specific DNS server should be contacted.
    }

--------------------------------------------------------------------------------
-- | Configures a 'ClusterSettings' for connecting to a cluster using gossip
--   seeds.
--   clusterDns                 = ""
--   clusterMaxDiscoverAttempts = 10
--   clusterExternalGossipPort  = 0
--   clusterGossipTimeout       = 1s
gossipSeedClusterSettings :: NonEmpty GossipSeed -> ClusterSettings
gossipSeedClusterSettings :: NonEmpty GossipSeed -> ClusterSettings
gossipSeedClusterSettings NonEmpty GossipSeed
xs =
    ClusterSettings :: ByteString
-> Int
-> Int
-> Maybe (NonEmpty GossipSeed)
-> TimeSpan
-> Maybe DnsServer
-> ClusterSettings
ClusterSettings
    { clusterDns :: ByteString
clusterDns                 = ByteString
""
    , clusterMaxDiscoverAttempts :: Int
clusterMaxDiscoverAttempts = Int
10
    , clusterExternalGossipPort :: Int
clusterExternalGossipPort  = Int
0
    , clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterGossipSeeds         = NonEmpty GossipSeed -> Maybe (NonEmpty GossipSeed)
forall a. a -> Maybe a
Just NonEmpty GossipSeed
xs
    , clusterGossipTimeout :: TimeSpan
clusterGossipTimeout       = Double -> TimeSpan
fromSeconds Double
1
    , clusterDnsServer :: Maybe DnsServer
clusterDnsServer           = Maybe DnsServer
forall a. Maybe a
Nothing
    }

--------------------------------------------------------------------------------
-- | Configures a 'ClusterSettings' for connecting to a cluster using DNS
--   discovery.
--   clusterMaxDiscoverAttempts = 10
--   clusterExternalGossipPort  = 0
--   clusterGossipSeeds         = Nothing
--   clusterGossipTimeout       = 1s
dnsClusterSettings :: ByteString -> ClusterSettings
dnsClusterSettings :: ByteString -> ClusterSettings
dnsClusterSettings ByteString
clusterDns = ClusterSettings :: ByteString
-> Int
-> Int
-> Maybe (NonEmpty GossipSeed)
-> TimeSpan
-> Maybe DnsServer
-> ClusterSettings
ClusterSettings{Int
Maybe (NonEmpty GossipSeed)
Maybe DnsServer
ByteString
TimeSpan
forall a. Maybe a
clusterDnsServer :: forall a. Maybe a
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: forall a. Maybe a
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
..}
  where
    clusterMaxDiscoverAttempts :: Int
clusterMaxDiscoverAttempts = Int
10
    clusterExternalGossipPort :: Int
clusterExternalGossipPort  = Int
0
    clusterGossipSeeds :: Maybe a
clusterGossipSeeds         = Maybe a
forall a. Maybe a
Nothing
    clusterGossipTimeout :: TimeSpan
clusterGossipTimeout       = Double -> TimeSpan
fromSeconds Double
1
    clusterDnsServer :: Maybe a
clusterDnsServer           = Maybe a
forall a. Maybe a
Nothing

--------------------------------------------------------------------------------
clusterDnsEndPointDiscovery :: ClusterSettings -> IO Discovery
clusterDnsEndPointDiscovery :: ClusterSettings -> IO Discovery
clusterDnsEndPointDiscovery ClusterSettings
settings = do
    IORef (Maybe [MemberInfo])
ref     <- Maybe [MemberInfo] -> IO (IORef (Maybe [MemberInfo]))
forall (m :: * -> *) a. MonadBase IO m => a -> m (IORef a)
newIORef Maybe [MemberInfo]
forall a. Maybe a
Nothing
    Manager
manager <- ManagerSettings -> IO Manager
newManager ManagerSettings
defaultManagerSettings
    Discovery -> IO Discovery
forall (m :: * -> *) a. Monad m => a -> m a
return (Discovery -> IO Discovery) -> Discovery -> IO Discovery
forall a b. (a -> b) -> a -> b
$ (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
Discovery ((Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery)
-> (Maybe EndPoint -> EventStore (Maybe EndPoint)) -> Discovery
forall a b. (a -> b) -> a -> b
$ \Maybe EndPoint
fend -> Manager
-> IORef (Maybe [MemberInfo])
-> Maybe EndPoint
-> ClusterSettings
-> EventStore (Maybe EndPoint)
discoverEndPoint Manager
manager IORef (Maybe [MemberInfo])
ref Maybe EndPoint
fend ClusterSettings
settings

--------------------------------------------------------------------------------
data VNodeState
    = Initializing
    | Unknown
    | PreReplica
    | CatchingUp
    | Clone
    | Slave
    | PreMaster
    | Master
    | Manager
    | ShuttingDown
    | Shutdown
    deriving (VNodeState -> VNodeState -> Bool
(VNodeState -> VNodeState -> Bool)
-> (VNodeState -> VNodeState -> Bool) -> Eq VNodeState
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: VNodeState -> VNodeState -> Bool
$c/= :: VNodeState -> VNodeState -> Bool
== :: VNodeState -> VNodeState -> Bool
$c== :: VNodeState -> VNodeState -> Bool
Eq, Eq VNodeState
Eq VNodeState
-> (VNodeState -> VNodeState -> Ordering)
-> (VNodeState -> VNodeState -> Bool)
-> (VNodeState -> VNodeState -> Bool)
-> (VNodeState -> VNodeState -> Bool)
-> (VNodeState -> VNodeState -> Bool)
-> (VNodeState -> VNodeState -> VNodeState)
-> (VNodeState -> VNodeState -> VNodeState)
-> Ord VNodeState
VNodeState -> VNodeState -> Bool
VNodeState -> VNodeState -> Ordering
VNodeState -> VNodeState -> VNodeState
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: VNodeState -> VNodeState -> VNodeState
$cmin :: VNodeState -> VNodeState -> VNodeState
max :: VNodeState -> VNodeState -> VNodeState
$cmax :: VNodeState -> VNodeState -> VNodeState
>= :: VNodeState -> VNodeState -> Bool
$c>= :: VNodeState -> VNodeState -> Bool
> :: VNodeState -> VNodeState -> Bool
$c> :: VNodeState -> VNodeState -> Bool
<= :: VNodeState -> VNodeState -> Bool
$c<= :: VNodeState -> VNodeState -> Bool
< :: VNodeState -> VNodeState -> Bool
$c< :: VNodeState -> VNodeState -> Bool
compare :: VNodeState -> VNodeState -> Ordering
$ccompare :: VNodeState -> VNodeState -> Ordering
$cp1Ord :: Eq VNodeState
Ord, (forall x. VNodeState -> Rep VNodeState x)
-> (forall x. Rep VNodeState x -> VNodeState) -> Generic VNodeState
forall x. Rep VNodeState x -> VNodeState
forall x. VNodeState -> Rep VNodeState x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep VNodeState x -> VNodeState
$cfrom :: forall x. VNodeState -> Rep VNodeState x
Generic, Int -> VNodeState -> ShowS
[VNodeState] -> ShowS
VNodeState -> String
(Int -> VNodeState -> ShowS)
-> (VNodeState -> String)
-> ([VNodeState] -> ShowS)
-> Show VNodeState
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [VNodeState] -> ShowS
$cshowList :: [VNodeState] -> ShowS
show :: VNodeState -> String
$cshow :: VNodeState -> String
showsPrec :: Int -> VNodeState -> ShowS
$cshowsPrec :: Int -> VNodeState -> ShowS
Show)

--------------------------------------------------------------------------------
instance FromJSON VNodeState

--------------------------------------------------------------------------------
newtype GUUID = GUUID UUID deriving Int -> GUUID -> ShowS
[GUUID] -> ShowS
GUUID -> String
(Int -> GUUID -> ShowS)
-> (GUUID -> String) -> ([GUUID] -> ShowS) -> Show GUUID
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [GUUID] -> ShowS
$cshowList :: [GUUID] -> ShowS
show :: GUUID -> String
$cshow :: GUUID -> String
showsPrec :: Int -> GUUID -> ShowS
$cshowsPrec :: Int -> GUUID -> ShowS
Show

--------------------------------------------------------------------------------
instance FromJSON GUUID where
    parseJSON :: Value -> Parser GUUID
parseJSON (String Text
txt) =
        case Text -> Maybe UUID
fromText Text
txt of
            Just UUID
uuid -> GUUID -> Parser GUUID
forall (m :: * -> *) a. Monad m => a -> m a
return (GUUID -> Parser GUUID) -> GUUID -> Parser GUUID
forall a b. (a -> b) -> a -> b
$ UUID -> GUUID
GUUID UUID
uuid
            Maybe UUID
_         -> String -> Parser GUUID
forall (m :: * -> *) a. MonadFail m => String -> m a
fail (String -> Parser GUUID) -> String -> Parser GUUID
forall a b. (a -> b) -> a -> b
$ String
"Wrong UUID format " String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Text -> String
forall a. Show a => a -> String
show Text
txt
    parseJSON Value
invalid = String -> Value -> Parser GUUID
forall a. String -> Value -> Parser a
typeMismatch String
"UUID" Value
invalid

--------------------------------------------------------------------------------
data MemberInfo =
    MemberInfo
    { MemberInfo -> GUUID
_instanceId :: !GUUID
    , MemberInfo -> VNodeState
_state :: !VNodeState
    , MemberInfo -> Bool
_isAlive :: !Bool
    , MemberInfo -> String
_internalTcpIp :: !String
    , MemberInfo -> Int
_internalTcpPort :: !Int
    , MemberInfo -> String
_externalTcpIp :: !String
    , MemberInfo -> Int
_externalTcpPort :: !Int
    , MemberInfo -> String
_internalHttpIp :: !String
    , MemberInfo -> Int
_internalHttpPort :: !Int
    , MemberInfo -> String
_externalHttpIp :: !String
    , MemberInfo -> Int
_externalHttpPort :: !Int
    , MemberInfo -> Int64
_lastCommitPosition :: !Int64
    , MemberInfo -> Int64
_writerCheckpoint :: !Int64
    , MemberInfo -> Int64
_chaserCheckpoint :: !Int64
    , MemberInfo -> Int64
_epochPosition :: !Int64
    , MemberInfo -> Int
_epochNumber :: !Int
    , MemberInfo -> GUUID
_epochId :: !GUUID
    , MemberInfo -> Int
_nodePriority :: !Int
    } deriving Int -> MemberInfo -> ShowS
[MemberInfo] -> ShowS
MemberInfo -> String
(Int -> MemberInfo -> ShowS)
-> (MemberInfo -> String)
-> ([MemberInfo] -> ShowS)
-> Show MemberInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [MemberInfo] -> ShowS
$cshowList :: [MemberInfo] -> ShowS
show :: MemberInfo -> String
$cshow :: MemberInfo -> String
showsPrec :: Int -> MemberInfo -> ShowS
$cshowsPrec :: Int -> MemberInfo -> ShowS
Show

--------------------------------------------------------------------------------
instance FromJSON MemberInfo where
    parseJSON :: Value -> Parser MemberInfo
parseJSON (Object Object
m) =
        GUUID
-> VNodeState
-> Bool
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> String
-> Int
-> Int64
-> Int64
-> Int64
-> Int64
-> Int
-> GUUID
-> Int
-> MemberInfo
MemberInfo
        (GUUID
 -> VNodeState
 -> Bool
 -> String
 -> Int
 -> String
 -> Int
 -> String
 -> Int
 -> String
 -> Int
 -> Int64
 -> Int64
 -> Int64
 -> Int64
 -> Int
 -> GUUID
 -> Int
 -> MemberInfo)
-> Parser GUUID
-> Parser
     (VNodeState
      -> Bool
      -> String
      -> Int
      -> String
      -> Int
      -> String
      -> Int
      -> String
      -> Int
      -> Int64
      -> Int64
      -> Int64
      -> Int64
      -> Int
      -> GUUID
      -> Int
      -> MemberInfo)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Object
m Object -> Key -> Parser GUUID
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"instanceId"
        Parser
  (VNodeState
   -> Bool
   -> String
   -> Int
   -> String
   -> Int
   -> String
   -> Int
   -> String
   -> Int
   -> Int64
   -> Int64
   -> Int64
   -> Int64
   -> Int
   -> GUUID
   -> Int
   -> MemberInfo)
-> Parser VNodeState
-> Parser
     (Bool
      -> String
      -> Int
      -> String
      -> Int
      -> String
      -> Int
      -> String
      -> Int
      -> Int64
      -> Int64
      -> Int64
      -> Int64
      -> Int
      -> GUUID
      -> Int
      -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser VNodeState
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"state"
        Parser
  (Bool
   -> String
   -> Int
   -> String
   -> Int
   -> String
   -> Int
   -> String
   -> Int
   -> Int64
   -> Int64
   -> Int64
   -> Int64
   -> Int
   -> GUUID
   -> Int
   -> MemberInfo)
-> Parser Bool
-> Parser
     (String
      -> Int
      -> String
      -> Int
      -> String
      -> Int
      -> String
      -> Int
      -> Int64
      -> Int64
      -> Int64
      -> Int64
      -> Int
      -> GUUID
      -> Int
      -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Bool
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"isAlive"
        Parser
  (String
   -> Int
   -> String
   -> Int
   -> String
   -> Int
   -> String
   -> Int
   -> Int64
   -> Int64
   -> Int64
   -> Int64
   -> Int
   -> GUUID
   -> Int
   -> MemberInfo)
-> Parser String
-> Parser
     (Int
      -> String
      -> Int
      -> String
      -> Int
      -> String
      -> Int
      -> Int64
      -> Int64
      -> Int64
      -> Int64
      -> Int
      -> GUUID
      -> Int
      -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser String
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalTcpIp"
        Parser
  (Int
   -> String
   -> Int
   -> String
   -> Int
   -> String
   -> Int
   -> Int64
   -> Int64
   -> Int64
   -> Int64
   -> Int
   -> GUUID
   -> Int
   -> MemberInfo)
-> Parser Int
-> Parser
     (String
      -> Int
      -> String
      -> Int
      -> String
      -> Int
      -> Int64
      -> Int64
      -> Int64
      -> Int64
      -> Int
      -> GUUID
      -> Int
      -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalTcpPort"
        Parser
  (String
   -> Int
   -> String
   -> Int
   -> String
   -> Int
   -> Int64
   -> Int64
   -> Int64
   -> Int64
   -> Int
   -> GUUID
   -> Int
   -> MemberInfo)
-> Parser String
-> Parser
     (Int
      -> String
      -> Int
      -> String
      -> Int
      -> Int64
      -> Int64
      -> Int64
      -> Int64
      -> Int
      -> GUUID
      -> Int
      -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser String
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalTcpIp"
        Parser
  (Int
   -> String
   -> Int
   -> String
   -> Int
   -> Int64
   -> Int64
   -> Int64
   -> Int64
   -> Int
   -> GUUID
   -> Int
   -> MemberInfo)
-> Parser Int
-> Parser
     (String
      -> Int
      -> String
      -> Int
      -> Int64
      -> Int64
      -> Int64
      -> Int64
      -> Int
      -> GUUID
      -> Int
      -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalTcpPort"
        Parser
  (String
   -> Int
   -> String
   -> Int
   -> Int64
   -> Int64
   -> Int64
   -> Int64
   -> Int
   -> GUUID
   -> Int
   -> MemberInfo)
-> Parser String
-> Parser
     (Int
      -> String
      -> Int
      -> Int64
      -> Int64
      -> Int64
      -> Int64
      -> Int
      -> GUUID
      -> Int
      -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser String
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalHttpIp"
        Parser
  (Int
   -> String
   -> Int
   -> Int64
   -> Int64
   -> Int64
   -> Int64
   -> Int
   -> GUUID
   -> Int
   -> MemberInfo)
-> Parser Int
-> Parser
     (String
      -> Int
      -> Int64
      -> Int64
      -> Int64
      -> Int64
      -> Int
      -> GUUID
      -> Int
      -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"internalHttpPort"
        Parser
  (String
   -> Int
   -> Int64
   -> Int64
   -> Int64
   -> Int64
   -> Int
   -> GUUID
   -> Int
   -> MemberInfo)
-> Parser String
-> Parser
     (Int
      -> Int64
      -> Int64
      -> Int64
      -> Int64
      -> Int
      -> GUUID
      -> Int
      -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser String
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalHttpIp"
        Parser
  (Int
   -> Int64
   -> Int64
   -> Int64
   -> Int64
   -> Int
   -> GUUID
   -> Int
   -> MemberInfo)
-> Parser Int
-> Parser
     (Int64
      -> Int64 -> Int64 -> Int64 -> Int -> GUUID -> Int -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"externalHttpPort"
        Parser
  (Int64
   -> Int64 -> Int64 -> Int64 -> Int -> GUUID -> Int -> MemberInfo)
-> Parser Int64
-> Parser
     (Int64 -> Int64 -> Int64 -> Int -> GUUID -> Int -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int64
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"lastCommitPosition"
        Parser
  (Int64 -> Int64 -> Int64 -> Int -> GUUID -> Int -> MemberInfo)
-> Parser Int64
-> Parser (Int64 -> Int64 -> Int -> GUUID -> Int -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int64
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"writerCheckpoint"
        Parser (Int64 -> Int64 -> Int -> GUUID -> Int -> MemberInfo)
-> Parser Int64
-> Parser (Int64 -> Int -> GUUID -> Int -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int64
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"chaserCheckpoint"
        Parser (Int64 -> Int -> GUUID -> Int -> MemberInfo)
-> Parser Int64 -> Parser (Int -> GUUID -> Int -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int64
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"epochPosition"
        Parser (Int -> GUUID -> Int -> MemberInfo)
-> Parser Int -> Parser (GUUID -> Int -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"epochNumber"
        Parser (GUUID -> Int -> MemberInfo)
-> Parser GUUID -> Parser (Int -> MemberInfo)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser GUUID
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"epochId"
        Parser (Int -> MemberInfo) -> Parser Int -> Parser MemberInfo
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Object
m Object -> Key -> Parser Int
forall a. FromJSON a => Object -> Key -> Parser a
.: Key
"nodePriority"
    parseJSON Value
invalid = String -> Value -> Parser MemberInfo
forall a. String -> Value -> Parser a
typeMismatch String
"MemberInfo" Value
invalid

--------------------------------------------------------------------------------
data ClusterInfo =
    ClusterInfo { ClusterInfo -> [MemberInfo]
members :: [MemberInfo] }
    deriving (Int -> ClusterInfo -> ShowS
[ClusterInfo] -> ShowS
ClusterInfo -> String
(Int -> ClusterInfo -> ShowS)
-> (ClusterInfo -> String)
-> ([ClusterInfo] -> ShowS)
-> Show ClusterInfo
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ClusterInfo] -> ShowS
$cshowList :: [ClusterInfo] -> ShowS
show :: ClusterInfo -> String
$cshow :: ClusterInfo -> String
showsPrec :: Int -> ClusterInfo -> ShowS
$cshowsPrec :: Int -> ClusterInfo -> ShowS
Show, (forall x. ClusterInfo -> Rep ClusterInfo x)
-> (forall x. Rep ClusterInfo x -> ClusterInfo)
-> Generic ClusterInfo
forall x. Rep ClusterInfo x -> ClusterInfo
forall x. ClusterInfo -> Rep ClusterInfo x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ClusterInfo x -> ClusterInfo
$cfrom :: forall x. ClusterInfo -> Rep ClusterInfo x
Generic)

--------------------------------------------------------------------------------
instance FromJSON ClusterInfo

--------------------------------------------------------------------------------
discoverEndPoint :: Manager
                 -> IORef (Maybe [MemberInfo])
                 -> Maybe EndPoint
                 -> ClusterSettings
                 -> EventStore (Maybe EndPoint)
discoverEndPoint :: Manager
-> IORef (Maybe [MemberInfo])
-> Maybe EndPoint
-> ClusterSettings
-> EventStore (Maybe EndPoint)
discoverEndPoint Manager
mgr IORef (Maybe [MemberInfo])
ref Maybe EndPoint
fend ClusterSettings
settings = do
    Maybe [MemberInfo]
old_m <- IORef (Maybe [MemberInfo]) -> EventStore (Maybe [MemberInfo])
forall (m :: * -> *) a. MonadBase IO m => IORef a -> m a
readIORef IORef (Maybe [MemberInfo])
ref
    IORef (Maybe [MemberInfo]) -> Maybe [MemberInfo] -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef IORef (Maybe [MemberInfo])
ref Maybe [MemberInfo]
forall a. Maybe a
Nothing
    IOArray Int GossipSeed
candidates <- case Maybe [MemberInfo]
old_m of
        Maybe [MemberInfo]
Nothing  -> ClusterSettings -> EventStore (IOArray Int GossipSeed)
gossipCandidatesFromDns ClusterSettings
settings
        Just [MemberInfo]
old -> IO (IOArray Int GossipSeed) -> EventStore (IOArray Int GossipSeed)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IOArray Int GossipSeed)
 -> EventStore (IOArray Int GossipSeed))
-> IO (IOArray Int GossipSeed)
-> EventStore (IOArray Int GossipSeed)
forall a b. (a -> b) -> a -> b
$ Maybe EndPoint -> [MemberInfo] -> IO (IOArray Int GossipSeed)
gossipCandidatesFromOldGossip Maybe EndPoint
fend [MemberInfo]
old
    IOArray Int GossipSeed
-> (Int -> EventStore (Maybe EndPoint))
-> EventStore (Maybe EndPoint)
forall a b.
IOArray Int a
-> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forArrayFirst IOArray Int GossipSeed
candidates ((Int -> EventStore (Maybe EndPoint))
 -> EventStore (Maybe EndPoint))
-> (Int -> EventStore (Maybe EndPoint))
-> EventStore (Maybe EndPoint)
forall a b. (a -> b) -> a -> b
$ \Int
idx -> do
        GossipSeed
c   <- IO GossipSeed -> EventStore GossipSeed
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO GossipSeed -> EventStore GossipSeed)
-> IO GossipSeed -> EventStore GossipSeed
forall a b. (a -> b) -> a -> b
$ IOArray Int GossipSeed -> Int -> IO GossipSeed
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> m e
readArray IOArray Int GossipSeed
candidates Int
idx
        Maybe ClusterInfo
res <- ClusterSettings
-> Manager -> GossipSeed -> EventStore (Maybe ClusterInfo)
tryGetGossipFrom ClusterSettings
settings Manager
mgr GossipSeed
c
        let fin_end :: Maybe (ClusterInfo, EndPoint)
fin_end = do
                ClusterInfo
info <- Maybe ClusterInfo
res
                EndPoint
best <- [MemberInfo] -> Maybe EndPoint
tryDetermineBestNode ([MemberInfo] -> Maybe EndPoint) -> [MemberInfo] -> Maybe EndPoint
forall a b. (a -> b) -> a -> b
$ ClusterInfo -> [MemberInfo]
members ClusterInfo
info
                (ClusterInfo, EndPoint) -> Maybe (ClusterInfo, EndPoint)
forall (m :: * -> *) a. Monad m => a -> m a
return (ClusterInfo
info, EndPoint
best)
        case Maybe (ClusterInfo, EndPoint)
fin_end of
            Maybe (ClusterInfo, EndPoint)
Nothing   -> Maybe EndPoint -> EventStore (Maybe EndPoint)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe EndPoint
forall a. Maybe a
Nothing
            Just (ClusterInfo
info, EndPoint
best) -> do
                IORef (Maybe [MemberInfo]) -> Maybe [MemberInfo] -> EventStore ()
forall (m :: * -> *) a. MonadBase IO m => IORef a -> a -> m ()
writeIORef  IORef (Maybe [MemberInfo])
ref ([MemberInfo] -> Maybe [MemberInfo]
forall a. a -> Maybe a
Just ([MemberInfo] -> Maybe [MemberInfo])
-> [MemberInfo] -> Maybe [MemberInfo]
forall a b. (a -> b) -> a -> b
$ ClusterInfo -> [MemberInfo]
members ClusterInfo
info)
                Maybe EndPoint -> EventStore (Maybe EndPoint)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe EndPoint -> EventStore (Maybe EndPoint))
-> Maybe EndPoint -> EventStore (Maybe EndPoint)
forall a b. (a -> b) -> a -> b
$ EndPoint -> Maybe EndPoint
forall a. a -> Maybe a
Just EndPoint
best

--------------------------------------------------------------------------------
tryGetGossipFrom :: ClusterSettings
                 -> Manager
                 -> GossipSeed
                 -> EventStore (Maybe ClusterInfo)
tryGetGossipFrom :: ClusterSettings
-> Manager -> GossipSeed -> EventStore (Maybe ClusterInfo)
tryGetGossipFrom ClusterSettings{Int
Maybe (NonEmpty GossipSeed)
Maybe DnsServer
ByteString
TimeSpan
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: ClusterSettings -> Maybe DnsServer
clusterGossipTimeout :: ClusterSettings -> TimeSpan
clusterGossipSeeds :: ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: ClusterSettings -> Int
clusterMaxDiscoverAttempts :: ClusterSettings -> Int
clusterDns :: ClusterSettings -> ByteString
..} Manager
mgr GossipSeed
seed = do
    Request
init_req <- IO Request -> EventStore Request
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Request -> EventStore Request)
-> IO Request -> EventStore Request
forall a b. (a -> b) -> a -> b
$ EndPoint -> String -> IO Request
httpRequest (GossipSeed -> EndPoint
gossipEndpoint GossipSeed
seed) String
"/gossip?format=json"
    let timeout :: Int
timeout = Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
truncate (TimeSpan -> Double
totalMillis TimeSpan
clusterGossipTimeout Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000)
        req :: Request
req     = Request
init_req { responseTimeout :: ResponseTimeout
responseTimeout = Int -> ResponseTimeout
responseTimeoutMicro Int
timeout }
    Either SomeException (Response ByteString)
eithResp <- EventStore (Response ByteString)
-> EventStore (Either SomeException (Response ByteString))
forall (m :: * -> *) a.
MonadCatch m =>
m a -> m (Either SomeException a)
tryAny (EventStore (Response ByteString)
 -> EventStore (Either SomeException (Response ByteString)))
-> EventStore (Response ByteString)
-> EventStore (Either SomeException (Response ByteString))
forall a b. (a -> b) -> a -> b
$ IO (Response ByteString) -> EventStore (Response ByteString)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Response ByteString) -> EventStore (Response ByteString))
-> IO (Response ByteString) -> EventStore (Response ByteString)
forall a b. (a -> b) -> a -> b
$ Request -> Manager -> IO (Response ByteString)
httpLbs Request
req Manager
mgr
    case Either SomeException (Response ByteString)
eithResp of
        Right Response ByteString
resp -> Maybe ClusterInfo -> EventStore (Maybe ClusterInfo)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ClusterInfo -> EventStore (Maybe ClusterInfo))
-> Maybe ClusterInfo -> EventStore (Maybe ClusterInfo)
forall a b. (a -> b) -> a -> b
$ ByteString -> Maybe ClusterInfo
forall a. FromJSON a => ByteString -> Maybe a
decode (ByteString -> Maybe ClusterInfo)
-> ByteString -> Maybe ClusterInfo
forall a b. (a -> b) -> a -> b
$ Response ByteString -> ByteString
forall body. Response body -> body
responseBody Response ByteString
resp
        Left SomeException
err   -> do
            Int
String
LogLevel
String -> Text
String -> String -> String -> CharPos -> CharPos -> Loc
Text -> Text
Loc -> Text -> LogLevel -> Text -> EventStore ()
(Text -> EventStore ()) -> (Text -> Text) -> Text -> EventStore ()
forall a. a -> a
forall b c a. (b -> c) -> (a -> b) -> a -> c
forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
pack :: String -> Text
monadLoggerLog :: forall (m :: * -> *) msg.
(MonadLogger m, ToLogStr msg) =>
Loc -> Text -> LogLevel -> msg -> m ()
id :: forall a. a -> a
. :: forall b c a. (b -> c) -> (a -> b) -> a -> c
$logInfo [i|Failed to get cluster info from [#{seed}], error: #{err}.|]
            Maybe ClusterInfo -> EventStore (Maybe ClusterInfo)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe ClusterInfo
forall a. Maybe a
Nothing

--------------------------------------------------------------------------------
tryDetermineBestNode :: [MemberInfo] -> Maybe EndPoint
tryDetermineBestNode :: [MemberInfo] -> Maybe EndPoint
tryDetermineBestNode [MemberInfo]
members = Maybe EndPoint
node_m
  where
    nodes :: [MemberInfo]
nodes = [MemberInfo
m | MemberInfo
m <- [MemberInfo]
members
               , MemberInfo -> Bool
_isAlive MemberInfo
m
               , VNodeState -> Bool
allowedState (VNodeState -> Bool) -> VNodeState -> Bool
forall a b. (a -> b) -> a -> b
$ MemberInfo -> VNodeState
_state MemberInfo
m
               ]

    node_m :: Maybe EndPoint
node_m =
        case (Element [MemberInfo] -> Down VNodeState)
-> [MemberInfo] -> [MemberInfo]
forall o seq.
(Ord o, SemiSequence seq) =>
(Element seq -> o) -> seq -> seq
sortOn (VNodeState -> Down VNodeState
forall a. a -> Down a
Down (VNodeState -> Down VNodeState)
-> (MemberInfo -> VNodeState) -> MemberInfo -> Down VNodeState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MemberInfo -> VNodeState
_state) [MemberInfo]
nodes of
            []  -> Maybe EndPoint
forall a. Maybe a
Nothing
            MemberInfo
n:[MemberInfo]
_ -> EndPoint -> Maybe EndPoint
forall a. a -> Maybe a
Just (EndPoint -> Maybe EndPoint) -> EndPoint -> Maybe EndPoint
forall a b. (a -> b) -> a -> b
$ String -> Int -> EndPoint
EndPoint (MemberInfo -> String
_externalTcpIp MemberInfo
n) (MemberInfo -> Int
_externalTcpPort MemberInfo
n)

    allowedState :: VNodeState -> Bool
allowedState VNodeState
Manager      = Bool
False
    allowedState VNodeState
ShuttingDown = Bool
False
    allowedState VNodeState
Shutdown     = Bool
False
    allowedState VNodeState
_            = Bool
True

--------------------------------------------------------------------------------
gossipCandidatesFromOldGossip :: Maybe EndPoint
                              -> [MemberInfo]
                              -> IO (IOArray Int GossipSeed)
gossipCandidatesFromOldGossip :: Maybe EndPoint -> [MemberInfo] -> IO (IOArray Int GossipSeed)
gossipCandidatesFromOldGossip Maybe EndPoint
fend_m [MemberInfo]
oldGossip =
    [MemberInfo] -> IO (IOArray Int GossipSeed)
arrangeGossipCandidates [MemberInfo]
candidates
  where
    candidates :: [MemberInfo]
candidates =
        case Maybe EndPoint
fend_m of
            Maybe EndPoint
Nothing   -> [MemberInfo]
oldGossip
            Just EndPoint
fend -> [ MemberInfo
c | MemberInfo
c <- [MemberInfo]
oldGossip
                             , String -> Int -> EndPoint
EndPoint (MemberInfo -> String
_externalTcpIp MemberInfo
c) (MemberInfo -> Int
_externalTcpPort MemberInfo
c) EndPoint -> EndPoint -> Bool
forall a. Eq a => a -> a -> Bool
/= EndPoint
fend
                             ]

--------------------------------------------------------------------------------
data AState = AState !Int !Int

--------------------------------------------------------------------------------
arrangeGossipCandidates :: [MemberInfo] -> IO (IOArray Int GossipSeed)
arrangeGossipCandidates :: [MemberInfo] -> IO (IOArray Int GossipSeed)
arrangeGossipCandidates [MemberInfo]
members = do
    IOArray Int GossipSeed
arr <- CharPos -> GossipSeed -> IO (IOArray Int GossipSeed)
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
(i, i) -> e -> m (a i e)
newArray (Int
0, Int
len) GossipSeed
emptyGossipSeed
    AState Int
idx Int
j <- (AState -> MemberInfo -> IO AState)
-> AState -> [MemberInfo] -> IO AState
forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM (IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState
go IOArray Int GossipSeed
arr) (Int -> Int -> AState
AState (-Int
1) Int
len) [MemberInfo]
members

    IOArray Int GossipSeed -> Int -> Int -> IO ()
forall a. IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int GossipSeed
arr Int
0 Int
idx         -- shuffle nodes
    IOArray Int GossipSeed -> Int -> Int -> IO ()
forall a. IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int GossipSeed
arr Int
j (Int
len Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) -- shuffle managers

    IOArray Int GossipSeed -> IO (IOArray Int GossipSeed)
forall (m :: * -> *) a. Monad m => a -> m a
return IOArray Int GossipSeed
arr
  where
    len :: Int
len = [MemberInfo] -> Int
forall mono. MonoFoldable mono => mono -> Int
length [MemberInfo]
members

    go :: IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState
    go :: IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState
go IOArray Int GossipSeed
arr (AState Int
idx Int
j) MemberInfo
m =
        case MemberInfo -> VNodeState
_state MemberInfo
m of
            VNodeState
Manager -> do
                let new_j :: Int
new_j = Int
j Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
                IOArray Int GossipSeed -> Int -> GossipSeed -> IO ()
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int GossipSeed
arr Int
new_j GossipSeed
seed
                AState -> IO AState
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Int -> AState
AState Int
idx Int
new_j)
            VNodeState
_ -> do
                let new_i :: Int
new_i = Int
idx Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1
                IOArray Int GossipSeed -> Int -> GossipSeed -> IO ()
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int GossipSeed
arr Int
new_i GossipSeed
seed
                AState -> IO AState
forall (m :: * -> *) a. Monad m => a -> m a
return (Int -> Int -> AState
AState Int
new_i Int
j)
      where
        end :: EndPoint
end  = String -> Int -> EndPoint
EndPoint (MemberInfo -> String
_externalHttpIp MemberInfo
m) (MemberInfo -> Int
_externalHttpPort MemberInfo
m)
        seed :: GossipSeed
seed = EndPoint -> String -> GossipSeed
GossipSeed EndPoint
end String
""

--------------------------------------------------------------------------------
gossipCandidatesFromDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
gossipCandidatesFromDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
gossipCandidatesFromDns settings :: ClusterSettings
settings@ClusterSettings{Int
Maybe (NonEmpty GossipSeed)
Maybe DnsServer
ByteString
TimeSpan
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: ClusterSettings -> Maybe DnsServer
clusterGossipTimeout :: ClusterSettings -> TimeSpan
clusterGossipSeeds :: ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: ClusterSettings -> Int
clusterMaxDiscoverAttempts :: ClusterSettings -> Int
clusterDns :: ClusterSettings -> ByteString
..} = do
    IOArray Int GossipSeed
arr <- EventStore (IOArray Int GossipSeed)
endpoints
    IO () -> EventStore ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> EventStore ()) -> IO () -> EventStore ()
forall a b. (a -> b) -> a -> b
$ IOArray Int GossipSeed -> IO ()
forall a. IOArray Int a -> IO ()
shuffleAll IOArray Int GossipSeed
arr
    IOArray Int GossipSeed -> EventStore (IOArray Int GossipSeed)
forall (m :: * -> *) a. Monad m => a -> m a
return IOArray Int GossipSeed
arr
  where
    endpoints :: EventStore (IOArray Int GossipSeed)
endpoints =
        case Maybe (NonEmpty GossipSeed)
clusterGossipSeeds of
            Maybe (NonEmpty GossipSeed)
Nothing -> ClusterSettings -> EventStore (IOArray Int GossipSeed)
resolveDns ClusterSettings
settings
            Just NonEmpty GossipSeed
ss -> let ls :: [GossipSeed]
ls  = NonEmpty GossipSeed -> [GossipSeed]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList NonEmpty GossipSeed
ss
                           len :: Int
len = [GossipSeed] -> Int
forall mono. MonoFoldable mono => mono -> Int
length [GossipSeed]
ls
                  in IO (IOArray Int GossipSeed) -> EventStore (IOArray Int GossipSeed)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IOArray Int GossipSeed)
 -> EventStore (IOArray Int GossipSeed))
-> IO (IOArray Int GossipSeed)
-> EventStore (IOArray Int GossipSeed)
forall a b. (a -> b) -> a -> b
$ CharPos -> [GossipSeed] -> IO (IOArray Int GossipSeed)
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
(i, i) -> [e] -> m (a i e)
newListArray (Int
0, Int
len Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1) [GossipSeed]
ls

--------------------------------------------------------------------------------
resolveDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
resolveDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
resolveDns ClusterSettings{Int
Maybe (NonEmpty GossipSeed)
Maybe DnsServer
ByteString
TimeSpan
clusterDnsServer :: Maybe DnsServer
clusterGossipTimeout :: TimeSpan
clusterGossipSeeds :: Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: Int
clusterMaxDiscoverAttempts :: Int
clusterDns :: ByteString
clusterDnsServer :: ClusterSettings -> Maybe DnsServer
clusterGossipTimeout :: ClusterSettings -> TimeSpan
clusterGossipSeeds :: ClusterSettings -> Maybe (NonEmpty GossipSeed)
clusterExternalGossipPort :: ClusterSettings -> Int
clusterMaxDiscoverAttempts :: ClusterSettings -> Int
clusterDns :: ClusterSettings -> ByteString
..} = do
    let timeoutMicros :: Double
timeoutMicros = TimeSpan -> Double
totalMillis TimeSpan
clusterGossipTimeout Double -> Double -> Double
forall a. Num a => a -> a -> a
* Double
1000
        conf :: ResolvConf
conf =
            case Maybe DnsServer
clusterDnsServer of
                Maybe DnsServer
Nothing  -> ResolvConf
defaultResolvConf
                Just DnsServer
tpe ->
                    let rc :: FileOrNumericHost
rc =
                            case DnsServer
tpe of
                                DnsFilePath String
p   -> String -> FileOrNumericHost
RCFilePath String
p
                                DnsHostName String
h   -> String -> FileOrNumericHost
RCHostName String
h
                                DnsHostPort String
h Int
p -> String -> PortNumber -> FileOrNumericHost
RCHostPort String
h (Int -> PortNumber
forall a b. (Integral a, Num b) => a -> b
fromIntegral Int
p)
                    in ResolvConf
defaultResolvConf { resolvInfo :: FileOrNumericHost
resolvInfo = FileOrNumericHost
rc  }
    ResolvSeed
dnsSeed <- IO ResolvSeed -> EventStore ResolvSeed
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO ResolvSeed -> EventStore ResolvSeed)
-> IO ResolvSeed -> EventStore ResolvSeed
forall a b. (a -> b) -> a -> b
$ ResolvConf -> IO ResolvSeed
makeResolvSeed ResolvConf
conf
               { resolvTimeout :: Int
resolvTimeout = Double -> Int
forall a b. (RealFrac a, Integral b) => a -> b
truncate Double
timeoutMicros
               , resolvRetry :: Int
resolvRetry   = Int
clusterMaxDiscoverAttempts
               }
    IO (IOArray Int GossipSeed) -> EventStore (IOArray Int GossipSeed)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IOArray Int GossipSeed)
 -> EventStore (IOArray Int GossipSeed))
-> IO (IOArray Int GossipSeed)
-> EventStore (IOArray Int GossipSeed)
forall a b. (a -> b) -> a -> b
$ ResolvSeed
-> (Resolver -> IO (IOArray Int GossipSeed))
-> IO (IOArray Int GossipSeed)
forall a. ResolvSeed -> (Resolver -> IO a) -> IO a
withResolver ResolvSeed
dnsSeed ((Resolver -> IO (IOArray Int GossipSeed))
 -> IO (IOArray Int GossipSeed))
-> (Resolver -> IO (IOArray Int GossipSeed))
-> IO (IOArray Int GossipSeed)
forall a b. (a -> b) -> a -> b
$ \Resolver
resv -> do
        Either DNSError [IPv4]
result <- Resolver -> ByteString -> IO (Either DNSError [IPv4])
lookupA Resolver
resv ByteString
clusterDns
        case Either DNSError [IPv4]
result of
            Left DNSError
e    -> DnsDiscoveryException -> IO (IOArray Int GossipSeed)
forall (m :: * -> *) e a. (MonadThrow m, Exception e) => e -> m a
throwIO (DnsDiscoveryException -> IO (IOArray Int GossipSeed))
-> DnsDiscoveryException -> IO (IOArray Int GossipSeed)
forall a b. (a -> b) -> a -> b
$ DNSError -> DnsDiscoveryException
DNSDiscoveryError DNSError
e
            Right [IPv4]
ips -> do
                let len :: Int
len = [IPv4] -> Int
forall mono. MonoFoldable mono => mono -> Int
length [IPv4]
ips Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1
                IOArray Int GossipSeed
arr <- CharPos -> IO (IOArray Int GossipSeed)
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
(i, i) -> m (a i e)
newArray_ (Int
0, Int
len)
                [(Int, IPv4)] -> ((Int, IPv4) -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ ([Int] -> [IPv4] -> [(Int, IPv4)]
forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0..] [IPv4]
ips) (((Int, IPv4) -> IO ()) -> IO ())
-> ((Int, IPv4) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \(Int
idx, IPv4
ip) -> do
                    let end :: EndPoint
end  = String -> Int -> EndPoint
EndPoint (IPv4 -> String
forall a. Show a => a -> String
show IPv4
ip) Int
clusterExternalGossipPort
                        seed :: GossipSeed
seed = EndPoint -> String -> GossipSeed
GossipSeed EndPoint
end String
""
                    IOArray Int GossipSeed -> Int -> GossipSeed -> IO ()
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int GossipSeed
arr Int
idx GossipSeed
seed
                IOArray Int GossipSeed -> IO (IOArray Int GossipSeed)
forall (m :: * -> *) a. Monad m => a -> m a
return IOArray Int GossipSeed
arr

--------------------------------------------------------------------------------
shuffleAll :: IOArray Int a -> IO ()
shuffleAll :: IOArray Int a -> IO ()
shuffleAll IOArray Int a
arr = do
    (Int
low, Int
hig) <- IOArray Int a -> IO CharPos
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> m (i, i)
getBounds IOArray Int a
arr
    IOArray Int a -> Int -> Int -> IO ()
forall a. IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int a
arr Int
low Int
hig

--------------------------------------------------------------------------------
shuffle :: IOArray Int a -> Int -> Int -> IO ()
shuffle :: IOArray Int a -> Int -> Int -> IO ()
shuffle IOArray Int a
arr Int
from Int
to = Int -> Int -> (Int -> IO ()) -> IO ()
forRange_ Int
from Int
to ((Int -> IO ()) -> IO ()) -> (Int -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Int
cur -> do
    Int
idx   <- CharPos -> IO Int
forall a (m :: * -> *). (Random a, MonadIO m) => (a, a) -> m a
randomRIO (Int
cur, Int
to)
    a
tmp   <- IOArray Int a -> Int -> IO a
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> m e
readArray IOArray Int a
arr Int
idx
    a
value <- IOArray Int a -> Int -> IO a
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> m e
readArray IOArray Int a
arr Int
cur
    IOArray Int a -> Int -> a -> IO ()
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int a
arr Int
idx a
value
    IOArray Int a -> Int -> a -> IO ()
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> i -> e -> m ()
writeArray IOArray Int a
arr Int
cur a
tmp

--------------------------------------------------------------------------------
forRange_ :: Int -> Int -> (Int -> IO ()) -> IO ()
forRange_ :: Int -> Int -> (Int -> IO ()) -> IO ()
forRange_ Int
from Int
to Int -> IO ()
k = do
    Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
from Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
to) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> Int -> IO ()
loop (Int
to Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
from
  where
    loop :: Int -> Int -> IO ()
loop Int
len Int
cur
        | Int
len Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
cur = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        | Bool
otherwise  = do
              Int -> IO ()
k Int
cur
              Int -> Int -> IO ()
loop Int
len (Int
cur Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

--------------------------------------------------------------------------------
forArrayFirst :: IOArray Int a
              -> (Int -> EventStore (Maybe b))
              -> EventStore (Maybe b)
forArrayFirst :: IOArray Int a
-> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forArrayFirst IOArray Int a
arr Int -> EventStore (Maybe b)
k = do
    (Int
low, Int
hig) <- IO CharPos -> EventStore CharPos
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO CharPos -> EventStore CharPos)
-> IO CharPos -> EventStore CharPos
forall a b. (a -> b) -> a -> b
$ IOArray Int a -> IO CharPos
forall (a :: * -> * -> *) e (m :: * -> *) i.
(MArray a e m, Ix i) =>
a i e -> m (i, i)
getBounds IOArray Int a
arr
    Int -> Int -> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forall b.
Int -> Int -> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forRangeFirst Int
low Int
hig Int -> EventStore (Maybe b)
k

--------------------------------------------------------------------------------
forRangeFirst :: Int
              -> Int
              -> (Int -> EventStore (Maybe b))
              -> EventStore (Maybe b)
forRangeFirst :: Int -> Int -> (Int -> EventStore (Maybe b)) -> EventStore (Maybe b)
forRangeFirst Int
from Int
to Int -> EventStore (Maybe b)
k = do
    if Int
from Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
to then Int -> Int -> EventStore (Maybe b)
loop (Int
to Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1) Int
from else Maybe b -> EventStore (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
  where
    loop :: Int -> Int -> EventStore (Maybe b)
loop Int
len Int
cur
        | Int
len Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
cur = Maybe b -> EventStore (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
        | Bool
otherwise  = do
              Maybe b
res <- Int -> EventStore (Maybe b)
k Int
cur
              if Maybe b -> Bool
forall a. Maybe a -> Bool
isJust Maybe b
res then Maybe b -> EventStore (Maybe b)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
res else Int -> Int -> EventStore (Maybe b)
loop Int
len (Int
cur Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)