{-# LANGUAGE DeriveDataTypeable         #-}
{-# LANGUAGE DeriveGeneric              #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE OverloadedStrings          #-}
{-# LANGUAGE RecordWildCards            #-}
module Database.EventStore.Internal.Discovery
    ( Discovery(..)
    , GossipSeed
    , DnsDiscoveryException(..)
    , ClusterSettings(..)
    , DnsServer(..)
    , EndPoint(..)
    , staticEndPointDiscovery
    , clusterDnsEndPointDiscovery
    , gossipSeedClusterSettings
    , simpleDnsEndPointDiscovery
    , dnsClusterSettings
    , gossipSeed
    , gossipSeedWithHeader
    , gossipSeedHeader
    , gossipSeedHost
    , gossipSeedPort
    ) where
import Prelude (String)
import Data.Maybe
import Control.Exception.Safe (tryAny)
import Data.Aeson
import Data.Aeson.Types
import Data.Array.IO
import Data.DotNet.TimeSpan
import Data.List.NonEmpty (NonEmpty)
import Data.UUID
import Network.HTTP.Client
import Network.DNS hiding (decode)
import System.Random
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Prelude
data DnsDiscoveryException
    = MaxDiscoveryAttemptReached ByteString
    | DNSDiscoveryError DNSError
    deriving (Show, Typeable)
instance Exception DnsDiscoveryException
httpRequest :: EndPoint -> String -> IO Request
httpRequest (EndPoint ip p) path = parseUrlThrow url
  where
    url = "http://" <> ip <> ":" <> show p <> path
data GossipSeed =
    GossipSeed
    { gossipEndpoint :: !EndPoint
      
      
      
      
    , gossipSeedHeader :: !String
      
    } deriving Show
gossipSeed :: String -> Int -> GossipSeed
gossipSeed h p = GossipSeed (EndPoint h p) ""
gossipSeedWithHeader :: String -> Int -> String -> GossipSeed
gossipSeedWithHeader h p hd = GossipSeed (EndPoint h p) hd
gossipSeedHost :: GossipSeed -> String
gossipSeedHost = endPointIp . gossipEndpoint
gossipSeedPort :: GossipSeed -> Int
gossipSeedPort = endPointPort . gossipEndpoint
emptyGossipSeed :: GossipSeed
emptyGossipSeed = GossipSeed emptyEndPoint ""
newtype Discovery =
    Discovery { runDiscovery :: Maybe EndPoint -> EventStore (Maybe EndPoint) }
staticEndPointDiscovery :: String -> Int -> Discovery
staticEndPointDiscovery host port =
    Discovery $ \_ -> return $ Just $ EndPoint host port
simpleDnsEndPointDiscovery :: ByteString -> Maybe DnsServer -> Int -> Discovery
simpleDnsEndPointDiscovery domain srv port = Discovery $ \_ -> do
    let conf =
            case srv of
                Nothing  -> defaultResolvConf
                Just tpe ->
                    let rc =
                            case tpe of
                                DnsFilePath p   -> RCFilePath p
                                DnsHostName h   -> RCHostName h
                                DnsHostPort h p -> RCHostPort h (fromIntegral p)
                    in defaultResolvConf { resolvInfo = rc }
    dnsSeed <- liftIO $ makeResolvSeed conf
    res     <- liftIO $ withResolver dnsSeed $ \resv -> lookupA resv domain
    case res of
        Left e    -> throwIO $ DNSDiscoveryError e
        Right ips -> do
            let pts = [ EndPoint (show ip) port | ip <- ips ]
            case pts of
                []   -> return Nothing
                pt:_ -> return $ Just pt
data DnsServer
    = DnsFilePath String
    | DnsHostName String
    | DnsHostPort String Int
data ClusterSettings =
    ClusterSettings
    { clusterDns :: !ByteString
      
    , clusterMaxDiscoverAttempts :: !Int
      
    , clusterExternalGossipPort :: !Int
      
    , clusterGossipSeeds :: (Maybe (NonEmpty GossipSeed))
      
    , clusterGossipTimeout :: !TimeSpan
      
    , clusterDnsServer :: !(Maybe DnsServer)
      
    }
gossipSeedClusterSettings :: NonEmpty GossipSeed -> ClusterSettings
gossipSeedClusterSettings xs =
    ClusterSettings
    { clusterDns                 = ""
    , clusterMaxDiscoverAttempts = 10
    , clusterExternalGossipPort  = 0
    , clusterGossipSeeds         = Just xs
    , clusterGossipTimeout       = fromSeconds 1
    , clusterDnsServer           = Nothing
    }
dnsClusterSettings :: ByteString -> ClusterSettings
dnsClusterSettings clusterDns = ClusterSettings{..}
  where
    clusterMaxDiscoverAttempts = 10
    clusterExternalGossipPort  = 0
    clusterGossipSeeds         = Nothing
    clusterGossipTimeout       = fromSeconds 1
    clusterDnsServer           = Nothing
clusterDnsEndPointDiscovery :: ClusterSettings -> IO Discovery
clusterDnsEndPointDiscovery settings = do
    ref     <- newIORef Nothing
    manager <- newManager defaultManagerSettings
    return $ Discovery $ \fend -> discoverEndPoint manager ref fend settings
data VNodeState
    = Initializing
    | Unknown
    | PreReplica
    | CatchingUp
    | Clone
    | Slave
    | PreMaster
    | Master
    | Manager
    | ShuttingDown
    | Shutdown
    deriving (Eq, Ord, Generic, Show)
instance FromJSON VNodeState
newtype GUUID = GUUID UUID deriving Show
instance FromJSON GUUID where
    parseJSON (String txt) =
        case fromText txt of
            Just uuid -> return $ GUUID uuid
            _         -> fail $ "Wrong UUID format " <> show txt
    parseJSON invalid = typeMismatch "UUID" invalid
data MemberInfo =
    MemberInfo
    { _instanceId :: !GUUID
    , _state :: !VNodeState
    , _isAlive :: !Bool
    , _internalTcpIp :: !String
    , _internalTcpPort :: !Int
    , _externalTcpIp :: !String
    , _externalTcpPort :: !Int
    , _internalHttpIp :: !String
    , _internalHttpPort :: !Int
    , _externalHttpIp :: !String
    , _externalHttpPort :: !Int
    , _lastCommitPosition :: !Int64
    , _writerCheckpoint :: !Int64
    , _chaserCheckpoint :: !Int64
    , _epochPosition :: !Int64
    , _epochNumber :: !Int
    , _epochId :: !GUUID
    , _nodePriority :: !Int
    } deriving Show
instance FromJSON MemberInfo where
    parseJSON (Object m) =
        MemberInfo
        <$> m .: "instanceId"
        <*> m .: "state"
        <*> m .: "isAlive"
        <*> m .: "internalTcpIp"
        <*> m .: "internalTcpPort"
        <*> m .: "externalTcpIp"
        <*> m .: "externalTcpPort"
        <*> m .: "internalHttpIp"
        <*> m .: "internalHttpPort"
        <*> m .: "externalHttpIp"
        <*> m .: "externalHttpPort"
        <*> m .: "lastCommitPosition"
        <*> m .: "writerCheckpoint"
        <*> m .: "chaserCheckpoint"
        <*> m .: "epochPosition"
        <*> m .: "epochNumber"
        <*> m .: "epochId"
        <*> m .: "nodePriority"
    parseJSON invalid = typeMismatch "MemberInfo" invalid
data ClusterInfo =
    ClusterInfo { members :: [MemberInfo] }
    deriving (Show, Generic)
instance FromJSON ClusterInfo
discoverEndPoint :: Manager
                 -> IORef (Maybe [MemberInfo])
                 -> Maybe EndPoint
                 -> ClusterSettings
                 -> EventStore (Maybe EndPoint)
discoverEndPoint mgr ref fend settings = do
    old_m <- readIORef ref
    writeIORef ref Nothing
    candidates <- case old_m of
        Nothing  -> gossipCandidatesFromDns settings
        Just old -> liftIO $ gossipCandidatesFromOldGossip fend old
    forArrayFirst candidates $ \idx -> do
        c   <- liftIO $ readArray candidates idx
        res <- tryGetGossipFrom settings mgr c
        let fin_end = do
                info <- res
                best <- tryDetermineBestNode $ members info
                return (info, best)
        case fin_end of
            Nothing   -> return Nothing
            Just (info, best) -> do
                writeIORef  ref (Just $ members info)
                return $ Just best
tryGetGossipFrom :: ClusterSettings
                 -> Manager
                 -> GossipSeed
                 -> EventStore (Maybe ClusterInfo)
tryGetGossipFrom ClusterSettings{..} mgr seed = do
    init_req <- liftIO $ httpRequest (gossipEndpoint seed) "/gossip?format=json"
    let timeout = truncate (totalMillis clusterGossipTimeout * 1000)
        req     = init_req { responseTimeout = responseTimeoutMicro timeout }
    eithResp <- tryAny $ liftIO $ httpLbs req mgr
    case eithResp of
        Right resp -> return $ decode $ responseBody resp
        Left err   -> do
            $logInfo [i|Failed to get cluster info from [#{seed}], error: #{err}.|]
            pure Nothing
tryDetermineBestNode :: [MemberInfo] -> Maybe EndPoint
tryDetermineBestNode members = node_m
  where
    nodes = [m | m <- members
               , _isAlive m
               , allowedState $ _state m
               ]
    node_m =
        case sortOn (Down . _state) nodes of
            []  -> Nothing
            n:_ -> Just $ EndPoint (_externalTcpIp n) (_externalTcpPort n)
    allowedState Manager      = False
    allowedState ShuttingDown = False
    allowedState Shutdown     = False
    allowedState _            = True
gossipCandidatesFromOldGossip :: Maybe EndPoint
                              -> [MemberInfo]
                              -> IO (IOArray Int GossipSeed)
gossipCandidatesFromOldGossip fend_m oldGossip =
    arrangeGossipCandidates candidates
  where
    candidates =
        case fend_m of
            Nothing   -> oldGossip
            Just fend -> [ c | c <- oldGossip
                             , EndPoint (_externalTcpIp c) (_externalTcpPort c) /= fend
                             ]
data AState = AState !Int !Int
arrangeGossipCandidates :: [MemberInfo] -> IO (IOArray Int GossipSeed)
arrangeGossipCandidates members = do
    arr <- newArray (0, len) emptyGossipSeed
    AState idx j <- foldM (go arr) (AState (-1) len) members
    shuffle arr 0 idx         
    shuffle arr j (len - 1) 
    return arr
  where
    len = length members
    go :: IOArray Int GossipSeed -> AState -> MemberInfo -> IO AState
    go arr (AState idx j) m =
        case _state m of
            Manager -> do
                let new_j = j - 1
                writeArray arr new_j seed
                return (AState idx new_j)
            _ -> do
                let new_i = idx + 1
                writeArray arr new_i seed
                return (AState new_i j)
      where
        end  = EndPoint (_externalHttpIp m) (_externalHttpPort m)
        seed = GossipSeed end ""
gossipCandidatesFromDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
gossipCandidatesFromDns settings@ClusterSettings{..} = do
    arr <- endpoints
    liftIO $ shuffleAll arr
    return arr
  where
    endpoints =
        case clusterGossipSeeds of
            Nothing -> resolveDns settings
            Just ss -> let ls  = toList ss
                           len = length ls
                  in liftIO $ newListArray (0, len - 1) ls
resolveDns :: ClusterSettings -> EventStore (IOArray Int GossipSeed)
resolveDns ClusterSettings{..} = do
    let timeoutMicros = totalMillis clusterGossipTimeout * 1000
        conf =
            case clusterDnsServer of
                Nothing  -> defaultResolvConf
                Just tpe ->
                    let rc =
                            case tpe of
                                DnsFilePath p   -> RCFilePath p
                                DnsHostName h   -> RCHostName h
                                DnsHostPort h p -> RCHostPort h (fromIntegral p)
                    in defaultResolvConf { resolvInfo = rc  }
    dnsSeed <- liftIO $ makeResolvSeed conf
               { resolvTimeout = truncate timeoutMicros
               , resolvRetry   = clusterMaxDiscoverAttempts
               }
    liftIO $ withResolver dnsSeed $ \resv -> do
        result <- lookupA resv clusterDns
        case result of
            Left e    -> throwIO $ DNSDiscoveryError e
            Right ips -> do
                let len = length ips - 1
                arr <- newArray_ (0, len)
                forM_ (zip [0..] ips) $ \(idx, ip) -> do
                    let end  = EndPoint (show ip) clusterExternalGossipPort
                        seed = GossipSeed end ""
                    writeArray arr idx seed
                return arr
shuffleAll :: IOArray Int a -> IO ()
shuffleAll arr = do
    (low, hig) <- getBounds arr
    shuffle arr low hig
shuffle :: IOArray Int a -> Int -> Int -> IO ()
shuffle arr from to = forRange_ from to $ \cur -> do
    idx   <- randomRIO (cur, to)
    tmp   <- readArray arr idx
    value <- readArray arr cur
    writeArray arr idx value
    writeArray arr cur tmp
forRange_ :: Int -> Int -> (Int -> IO ()) -> IO ()
forRange_ from to k = do
    when (from <= to) $ loop (to + 1) from
  where
    loop len cur
        | len == cur = return ()
        | otherwise  = do
              k cur
              loop len (cur + 1)
forArrayFirst :: IOArray Int a
              -> (Int -> EventStore (Maybe b))
              -> EventStore (Maybe b)
forArrayFirst arr k = do
    (low, hig) <- liftIO $ getBounds arr
    forRangeFirst low hig k
forRangeFirst :: Int
              -> Int
              -> (Int -> EventStore (Maybe b))
              -> EventStore (Maybe b)
forRangeFirst from to k = do
    if from <= to then loop (to + 1) from else return Nothing
  where
    loop len cur
        | len == cur = return Nothing
        | otherwise  = do
              res <- k cur
              if isJust res then return res else loop len (cur + 1)