{-# LANGUAGE DeriveDataTypeable        #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE RecordWildCards           #-}
{-# LANGUAGE TypeFamilies              #-}
--------------------------------------------------------------------------------
-- |
-- Module : Database.EventStore.Internal.Manager.Operation.Registry
-- Copyright : (C) 2017 Yorick Laupa
-- License : (see the file LICENSE)
--
-- Maintainer : Yorick Laupa <yo.eight@gmail.com>
-- Stability : provisional
-- Portability : non-portable
--
-- Main operation bookkeeping structure.
--------------------------------------------------------------------------------
module Database.EventStore.Internal.Manager.Operation.Registry
    ( Registry
    , OperationMaxAttemptReached(..)
    , Decision(..)
    , newRegistry
    , register
    , handlePackage
    , abortPendingRequests
    , checkAndRetry
    , startAwaitings
    ) where

--------------------------------------------------------------------------------
import Data.ProtocolBuffers
import Data.Serialize
import Data.Time
import Data.UUID
import Data.UUID.V4

--------------------------------------------------------------------------------
import Database.EventStore.Internal.Callback
import Database.EventStore.Internal.Command
import Database.EventStore.Internal.Control
import Database.EventStore.Internal.Connection
import Database.EventStore.Internal.EndPoint
import Database.EventStore.Internal.Logger
import Database.EventStore.Internal.Operation hiding (retry)
import Database.EventStore.Internal.Prelude
import Database.EventStore.Internal.Stopwatch
import Database.EventStore.Internal.Types

--------------------------------------------------------------------------------
data Request =
  Request { _requestCmd     :: !Command
          , _requestPayload :: !ByteString
          , _requestCred    :: !(Maybe Credentials)
          }

--------------------------------------------------------------------------------
data Suspend a
  = Required !(Package -> Operation a)
  | Optional !(Maybe Package -> Operation a)
  | Resolved !(Operation a)

--------------------------------------------------------------------------------
type SessionId  = Integer
type SessionMap = HashMap SessionId Session

--------------------------------------------------------------------------------
data Session =
  forall result.
  Session { sessionId       :: !SessionId
          , sessionOp       :: !(Operation result)
          , sessionStack    :: !(IORef (Suspend result))
          , sessionCallback :: !(Callback result)
          }

--------------------------------------------------------------------------------
rejectSession :: Exception e => Session -> e -> EventStore ()
rejectSession Session{..} = reject sessionCallback

--------------------------------------------------------------------------------
resumeSession :: Registry -> Session -> Package -> EventStore ()
resumeSession reg session@Session{..} pkg = do
  atomicModifyIORef' sessionStack $ \case
      Required k -> (Resolved $ k pkg, ())
      Optional k -> (Resolved $ k (Just pkg), ())
      same       -> (same, ())

  execute reg session

--------------------------------------------------------------------------------
resumeNoPkgSession :: Registry -> Session -> EventStore ()
resumeNoPkgSession reg session@Session{..} = do
  atomicModifyIORef' sessionStack $ \case
    Optional k -> (Resolved $ k Nothing, ())
    same       -> (same, ())

  execute reg session

--------------------------------------------------------------------------------
reinitSession :: Session -> EventStore ()
reinitSession Session{..} = atomicWriteIORef sessionStack (Resolved sessionOp)

--------------------------------------------------------------------------------
restartSession :: Registry -> Session -> EventStore ()
restartSession reg session = do
  reinitSession session
  execute reg session

--------------------------------------------------------------------------------
data Sessions =
  Sessions { sessionsNextId :: IORef SessionId
           , sessionsMap    :: IORef SessionMap
           }

--------------------------------------------------------------------------------
createSession :: Sessions -> Operation a -> Callback a -> EventStore Session
createSession Sessions{..} op cb = do
  sid <- atomicModifyIORef' sessionsNextId $ \n -> (succ n, n)
  ref <- newIORef (Resolved op)
  atomicModifyIORef' sessionsMap $ \m ->
    let session =
          Session { sessionId       = sid
                  , sessionOp       = op
                  , sessionStack    = ref
                  , sessionCallback = cb
                  } in
    (insertMap sid session m, session)

--------------------------------------------------------------------------------
destroySession :: Sessions -> Session -> EventStore ()
destroySession Sessions{..} s =
  atomicModifyIORef' sessionsMap $ \m -> (deleteMap (sessionId s) m, ())

--------------------------------------------------------------------------------
sessionDisposed :: Sessions -> Session -> EventStore Bool
sessionDisposed Sessions{..} s =
  notMember (sessionId s) <$> readIORef sessionsMap

--------------------------------------------------------------------------------
newSessions :: IO Sessions
newSessions =
  Sessions <$> newIORef 0
           <*> newIORef mempty

--------------------------------------------------------------------------------
packageOf :: Request -> UUID -> Package
packageOf Request{..} uuid =
  Package { packageCmd         = _requestCmd
          , packageCorrelation = uuid
          , packageData        = _requestPayload
          , packageCred        = _requestCred
          }

--------------------------------------------------------------------------------
data Pending =
    Pending { _pendingRequest :: !(Maybe Request)
            , _pendingSession :: !Session
            , _pendingRetries :: !Int
            , _pendingLastTry :: !NominalDiffTime
            , _pendingConnId  :: !UUID
            }

--------------------------------------------------------------------------------
destroyPendingSession :: Sessions -> Pending -> EventStore ()
destroyPendingSession sessions Pending{..} =
  destroySession sessions _pendingSession

--------------------------------------------------------------------------------
type PendingRequests = HashMap UUID Pending

--------------------------------------------------------------------------------
data Awaiting
  = Awaiting !Session
  | AwaitingRequest !Session !Request

--------------------------------------------------------------------------------
type Awaitings = [Awaiting]

--------------------------------------------------------------------------------
rejectPending :: Exception e => Pending -> e -> EventStore ()
rejectPending Pending{..} = rejectSession _pendingSession

--------------------------------------------------------------------------------
rejectAwaiting :: Exception e => Awaiting -> e -> EventStore ()
rejectAwaiting (Awaiting s) = rejectSession s
rejectAwaiting (AwaitingRequest s _) = rejectSession s

--------------------------------------------------------------------------------
applyResponse :: Registry -> Pending -> Package -> EventStore ()
applyResponse reg Pending{..} = resumeSession reg _pendingSession

--------------------------------------------------------------------------------
restartPending :: Registry -> Pending -> EventStore ()
restartPending reg Pending{..} = restartSession reg _pendingSession

--------------------------------------------------------------------------------
data Registry =
    Registry  { _regConnRef   :: ConnectionRef
              , _regPendings  :: IORef PendingRequests
              , _regAwaitings :: IORef Awaitings
              , _stopwatch    :: Stopwatch
              , _sessions     :: Sessions
              }

--------------------------------------------------------------------------------
newRegistry :: ConnectionRef -> IO Registry
newRegistry ref =
   Registry ref <$> newIORef mempty
                <*> newIORef []
                <*> newStopwatch
                <*> newSessions

--------------------------------------------------------------------------------
scheduleAwait :: Registry -> Awaiting -> EventStore ()
scheduleAwait Registry{..} aw =
  atomicModifyIORef' _regAwaitings $ \stack ->
    (aw : stack, ())

--------------------------------------------------------------------------------
execute :: Registry -> Session -> EventStore ()
execute self@Registry{..} session@Session{..} =
  readIORef sessionStack >>= \case
    Resolved action -> loop action
    _               -> return ()
  where
    loop (MachineT m) =
      case m of
        Failed e -> do
          destroySession _sessions session
          rejectSession session e
        Retry -> do
          reinitSession session
          execute self session
        Proceed s ->
          case s of
            Stop  -> destroySession _sessions session
            Yield a next -> do
              fulfill sessionCallback a
              loop next
            Await k tpe _ ->
              case tpe of
                NeedUUID  -> loop . k =<< freshUUID
                NeedRemote cmd payload cred -> do
                  let req = Request { _requestCmd     = cmd
                                    , _requestPayload = payload
                                    , _requestCred    = cred
                                    }
                  atomicWriteIORef sessionStack (Required k)
                  maybeConnection _regConnRef >>= \case
                    Nothing   -> scheduleAwait self (AwaitingRequest session req)
                    Just conn -> issueRequest self session conn req

                WaitRemote uuid -> do
                  atomicWriteIORef sessionStack (Optional k)
                  let mkNewPending conn = do
                        let connId = connectionId conn
                        pending <- createPending session _stopwatch connId Nothing
                        insertPending self uuid pending

                  traverse_ mkNewPending =<< maybeConnection _regConnRef

--------------------------------------------------------------------------------
issueRequest :: Registry
             -> Session
             -> Connection
             -> Request
             -> EventStore ()
issueRequest reg@Registry{..} session conn req = do
  uuid    <- liftBase nextRandom
  pending <- createPending session _stopwatch (connectionId conn) (Just req)

  insertPending reg uuid pending
  enqueuePackage conn (packageOf req uuid)

--------------------------------------------------------------------------------
createPending :: MonadBaseControl IO m
              => Session
              -> Stopwatch
              -> UUID
              -> Maybe Request
              -> m Pending
createPending session stopwatch connId mReq = do
  elapsed <- stopwatchElapsed stopwatch
  let pending =
        Pending { _pendingRequest = mReq
                , _pendingSession = session
                , _pendingRetries = 1
                , _pendingLastTry = elapsed
                , _pendingConnId  = connId
                }

  return pending

--------------------------------------------------------------------------------
insertPending :: Registry -> UUID -> Pending -> EventStore ()
insertPending Registry{..} key pending =
  atomicModifyIORef' _regPendings $ \pendings ->
    (insertMap key pending pendings, ())

--------------------------------------------------------------------------------
register :: Registry -> Operation a -> Callback a -> EventStore ()
register reg op cb = do
  session <- createSession (_sessions reg) op cb
  execute reg session

--------------------------------------------------------------------------------
abortPendingRequests :: Registry -> EventStore ()
abortPendingRequests Registry{..} = do
  m <- atomicModifyIORef' _regPendings $ \pendings -> (mempty, pendings)
  ws <- atomicModifyIORef' _regAwaitings $ \aw -> (mempty, aw)

  for_ m $ \p -> rejectPending p Aborted
  for_ ws $ \a -> rejectAwaiting a Aborted

--------------------------------------------------------------------------------
data Decision
  = Handled
  | Reconnect NodeEndPoints

--------------------------------------------------------------------------------
handlePackage :: Registry -> Package -> EventStore (Maybe Decision)
handlePackage reg@Registry{..} pkg@Package{..} = do
  outcome <- atomicModifyIORef' _regPendings $ \pendings ->
    let uuid = packageCorrelation in
    (deleteMap uuid pendings, lookup uuid pendings)

  case outcome of
    Nothing      -> return Nothing
    Just pending -> Just <$> executePending reg pkg pending

--------------------------------------------------------------------------------
executePending :: Registry -> Package -> Pending -> EventStore Decision
executePending reg@Registry{..} pkg@Package{..} p@Pending{..} =
  case packageCmd of
    cmd | cmd == badRequestCmd -> do
            let reason = packageDataAsText pkg

            rejectPending p (ServerError reason)
            return Handled
        | cmd == notAuthenticatedCmd -> do
            rejectPending p NotAuthenticatedOp
            return Handled
        | cmd == notHandledCmd -> do
            $(logDebug) [i|Not handled response received: #{pkg}.|]
            let Just msg = maybeDecodeMessage packageData
                reason   = getField $ notHandledReason msg
            case reason of
              N_NotMaster -> do
                let Just details = getField $ notHandledAdditionalInfo msg
                    info         = masterInfo details
                    node         = masterInfoNodeEndPoints info

                restartPending reg p
                return $ Reconnect node
              -- In this case with just retry the operation.
              _ -> Handled <$ restartPending reg p
        | otherwise -> do
            applyResponse reg p pkg
            return Handled

--------------------------------------------------------------------------------
-- | Occurs when an operation has been retried more than 's_operationRetry'.
data OperationMaxAttemptReached =
  OperationMaxAttemptReached UUID Command
  deriving (Show, Typeable)

--------------------------------------------------------------------------------
instance Exception OperationMaxAttemptReached

--------------------------------------------------------------------------------
checkAndRetry :: Registry -> EventStore ()
checkAndRetry self@Registry{..} = do
  pendings    <- readIORef _regPendings
  elapsed     <- stopwatchElapsed _stopwatch
  newPendings <- foldM (checking elapsed) pendings (mapToList pendings)
  atomicWriteIORef _regPendings newPendings
  where
    checking elapsed reg (key, p) = do
      conn  <- getConnection _regConnRef
      setts <- getSettings
      case _pendingRequest p of
        Nothing
          | connectionId conn == _pendingConnId p -> return reg
          | otherwise -> do
            resumeNoPkgSession self (_pendingSession p)
            disposed <- sessionDisposed _sessions (_pendingSession p)
            let newReg = if disposed then deleteMap key reg else reg
            return newReg
        Just req -> do
          let lastTry    = _pendingLastTry p
              hasTimeout = elapsed - lastTry >= s_operationTimeout setts
          if hasTimeout || _pendingConnId p /= connectionId conn
            then do
              let retry = do
                    uuid <- liftBase nextRandom
                    let pkg     = packageOf req uuid
                        pending =
                          p { _pendingLastTry = elapsed
                            , _pendingRetries = _pendingRetries p + 1
                            , _pendingConnId  = connectionId conn
                            }
                        nextReg = deleteMap key $ insertMap uuid pending reg

                    enqueuePackage conn pkg
                    return nextReg

              case s_operationRetry setts of
                AtMost maxAttempts
                  | _pendingRetries p <= maxAttempts
                    -> retry
                  | otherwise -> do
                    let cmd = _requestCmd req
                    destroyPendingSession _sessions p
                    rejectPending p (OperationMaxAttemptReached key cmd)
                    return $ deleteMap key reg
                KeepRetrying -> retry
            else return reg

--------------------------------------------------------------------------------
startAwaitings :: Registry -> EventStore ()
startAwaitings reg@Registry{..} = do
  awaitings <- atomicModifyIORef' _regAwaitings $ \stack ->
    ([], reverse stack)

  traverse_ starting awaitings
  where
    starting (Awaiting session)            = execute reg session
    starting (AwaitingRequest session req) = do
      conn <- getConnection _regConnRef
      issueRequest reg session conn req

--------------------------------------------------------------------------------
maybeDecodeMessage :: Decode a => ByteString -> Maybe a
maybeDecodeMessage bytes =
    case runGet decodeMessage bytes of
        Right a -> Just a
        _       -> Nothing