{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE ExistentialQuantification #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeFamilies #-} -------------------------------------------------------------------------------- -- | -- Module : Database.EventStore.Internal.Manager.Operation.Registry -- Copyright : (C) 2017 Yorick Laupa -- License : (see the file LICENSE) -- -- Maintainer : Yorick Laupa -- Stability : provisional -- Portability : non-portable -- -- Main operation bookkeeping structure. -------------------------------------------------------------------------------- module Database.EventStore.Internal.Manager.Operation.Registry ( Registry , OperationMaxAttemptReached(..) , Decision(..) , newRegistry , register , handlePackage , abortPendingRequests , checkAndRetry , startAwaitings ) where -------------------------------------------------------------------------------- import Data.ProtocolBuffers import Data.Serialize import Data.Time import Data.UUID import Data.UUID.V4 import Data.Void (absurd) -------------------------------------------------------------------------------- import Database.EventStore.Internal.Callback import Database.EventStore.Internal.Command import Database.EventStore.Internal.Control import Database.EventStore.Internal.Connection import Database.EventStore.Internal.EndPoint import Database.EventStore.Internal.Logger import Database.EventStore.Internal.Operation hiding (retry) import Database.EventStore.Internal.Prelude import Database.EventStore.Internal.Stopwatch import Database.EventStore.Internal.Types -------------------------------------------------------------------------------- data Request = Request { _requestCmd :: !Command , _requestPayload :: !ByteString , _requestCred :: !(Maybe Credentials) } -------------------------------------------------------------------------------- data Suspend a = Required !(Package -> Operation a) | Optional !(Maybe Package -> Operation a) | Resolved !(Operation a) -------------------------------------------------------------------------------- type SessionId = Integer type SessionMap = HashMap SessionId Session -------------------------------------------------------------------------------- data Session = forall result. Session { sessionId :: !SessionId , sessionOp :: !(Operation result) , sessionStack :: !(IORef (Suspend result)) , sessionCallback :: !(Callback result) } -------------------------------------------------------------------------------- rejectSession :: Exception e => Session -> e -> EventStore () rejectSession Session{..} = reject sessionCallback -------------------------------------------------------------------------------- resumeSession :: Registry -> Session -> Package -> EventStore () resumeSession reg session@Session{..} pkg = do atomicModifyIORef' sessionStack $ \case Required k -> (Resolved $ k pkg, ()) Optional k -> (Resolved $ k (Just pkg), ()) same -> (same, ()) execute reg session -------------------------------------------------------------------------------- resumeNoPkgSession :: Registry -> Session -> EventStore () resumeNoPkgSession reg session@Session{..} = do atomicModifyIORef' sessionStack $ \case Optional k -> (Resolved $ k Nothing, ()) same -> (same, ()) execute reg session -------------------------------------------------------------------------------- reinitSession :: Session -> EventStore () reinitSession Session{..} = atomicWriteIORef sessionStack (Resolved sessionOp) -------------------------------------------------------------------------------- restartSession :: Registry -> Session -> EventStore () restartSession reg session = do reinitSession session execute reg session -------------------------------------------------------------------------------- data Sessions = Sessions { sessionsNextId :: IORef SessionId , sessionsMap :: IORef SessionMap } -------------------------------------------------------------------------------- createSession :: Sessions -> Operation a -> Callback a -> EventStore Session createSession Sessions{..} op cb = do sid <- atomicModifyIORef' sessionsNextId $ \n -> (succ n, n) ref <- newIORef (Resolved op) atomicModifyIORef' sessionsMap $ \m -> let session = Session { sessionId = sid , sessionOp = op , sessionStack = ref , sessionCallback = cb } in (insertMap sid session m, session) -------------------------------------------------------------------------------- destroySession :: Sessions -> Session -> EventStore () destroySession Sessions{..} s = atomicModifyIORef' sessionsMap $ \m -> (deleteMap (sessionId s) m, ()) -------------------------------------------------------------------------------- sessionDisposed :: Sessions -> Session -> EventStore Bool sessionDisposed Sessions{..} s = notMember (sessionId s) <$> readIORef sessionsMap -------------------------------------------------------------------------------- newSessions :: IO Sessions newSessions = Sessions <$> newIORef 0  <*> newIORef mempty -------------------------------------------------------------------------------- packageOf :: Request -> UUID -> Package packageOf Request{..} uuid = Package { packageCmd = _requestCmd , packageCorrelation = uuid , packageData = _requestPayload , packageCred = _requestCred } -------------------------------------------------------------------------------- data Pending = Pending { _pendingRequest :: !(Maybe Request) , _pendingSession :: !Session , _pendingRetries :: !Int , _pendingLastTry :: !NominalDiffTime , _pendingConnId :: !UUID } -------------------------------------------------------------------------------- destroyPendingSession :: Sessions -> Pending -> EventStore () destroyPendingSession sessions Pending{..} = destroySession sessions _pendingSession -------------------------------------------------------------------------------- type PendingRequests = HashMap UUID Pending -------------------------------------------------------------------------------- data Awaiting = Awaiting !Session | AwaitingRequest !Session !Request -------------------------------------------------------------------------------- type Awaitings = [Awaiting] -------------------------------------------------------------------------------- rejectPending :: Exception e => Pending -> e -> EventStore () rejectPending Pending{..} = rejectSession _pendingSession -------------------------------------------------------------------------------- rejectAwaiting :: Exception e => Awaiting -> e -> EventStore () rejectAwaiting (Awaiting s) = rejectSession s rejectAwaiting (AwaitingRequest s _) = rejectSession s -------------------------------------------------------------------------------- applyResponse :: Registry -> Pending -> Package -> EventStore () applyResponse reg Pending{..} = resumeSession reg _pendingSession -------------------------------------------------------------------------------- restartPending :: Registry -> Pending -> EventStore () restartPending reg Pending{..} = restartSession reg _pendingSession -------------------------------------------------------------------------------- data Registry = Registry { _regConnRef :: ConnectionRef , _regPendings :: IORef PendingRequests , _regAwaitings :: IORef Awaitings , _stopwatch :: Stopwatch , _sessions :: Sessions } -------------------------------------------------------------------------------- newRegistry :: ConnectionRef -> IO Registry newRegistry ref = Registry ref <$> newIORef mempty <*> newIORef [] <*> newStopwatch <*> newSessions -------------------------------------------------------------------------------- scheduleAwait :: Registry -> Awaiting -> EventStore () scheduleAwait Registry{..} aw = atomicModifyIORef' _regAwaitings $ \stack -> (aw : stack, ()) -------------------------------------------------------------------------------- execute :: Registry -> Session -> EventStore () execute self@Registry{..} session@Session{..} = readIORef sessionStack >>= \case Resolved action -> loop action _ -> return () where loop stream = case stream of Return x -> absurd x Effect m -> case m of Failed e -> do destroySession _sessions session rejectSession session e Retry -> do reinitSession session execute self session Proceed inner -> loop inner Step step -> case step of Stop -> destroySession _sessions session Yield a next -> do fulfill sessionCallback a loop next Await k tpe _ -> case tpe of NeedUUID -> loop . k =<< freshUUID NeedRemote p -> do let req = Request { _requestCmd = payloadCmd p , _requestPayload = payloadData p , _requestCred = payloadCreds p } atomicWriteIORef sessionStack (Required k) maybeConnection _regConnRef >>= \case Nothing -> scheduleAwait self (AwaitingRequest session req) Just conn -> issueRequest self session conn req WaitRemote uuid -> do atomicWriteIORef sessionStack (Optional k) let mkNewPending conn = do let connId = connectionId conn pending <- createPending session _stopwatch connId Nothing insertPending self uuid pending traverse_ mkNewPending =<< maybeConnection _regConnRef -------------------------------------------------------------------------------- issueRequest :: Registry -> Session -> Connection -> Request -> EventStore () issueRequest reg@Registry{..} session conn req = do uuid <- liftBase nextRandom pending <- createPending session _stopwatch (connectionId conn) (Just req) insertPending reg uuid pending enqueuePackage conn (packageOf req uuid) -------------------------------------------------------------------------------- createPending :: MonadBaseControl IO m => Session -> Stopwatch -> UUID -> Maybe Request -> m Pending createPending session stopwatch connId mReq = do elapsed <- stopwatchElapsed stopwatch let pending = Pending { _pendingRequest = mReq , _pendingSession = session , _pendingRetries = 1 , _pendingLastTry = elapsed , _pendingConnId = connId } return pending -------------------------------------------------------------------------------- insertPending :: Registry -> UUID -> Pending -> EventStore () insertPending Registry{..} key pending = atomicModifyIORef' _regPendings $ \pendings -> (insertMap key pending pendings, ()) -------------------------------------------------------------------------------- register :: Registry -> Operation a -> Callback a -> EventStore () register reg op cb = do session <- createSession (_sessions reg) op cb execute reg session -------------------------------------------------------------------------------- abortPendingRequests :: Registry -> EventStore () abortPendingRequests Registry{..} = do m <- atomicModifyIORef' _regPendings $ \pendings -> (mempty, pendings) ws <- atomicModifyIORef' _regAwaitings $ \aw -> (mempty, aw) for_ m $ \p -> rejectPending p Aborted for_ ws $ \a -> rejectAwaiting a Aborted -------------------------------------------------------------------------------- data Decision = Handled | Reconnect NodeEndPoints -------------------------------------------------------------------------------- handlePackage :: Registry -> Package -> EventStore (Maybe Decision) handlePackage reg@Registry{..} pkg@Package{..} = do outcome <- atomicModifyIORef' _regPendings $ \pendings -> let uuid = packageCorrelation in (deleteMap uuid pendings, lookup uuid pendings) case outcome of Nothing -> return Nothing Just pending -> Just <$> executePending reg pkg pending -------------------------------------------------------------------------------- executePending :: Registry -> Package -> Pending -> EventStore Decision executePending reg@Registry{..} pkg@Package{..} p@Pending{..} = case packageCmd of cmd | cmd == badRequestCmd -> do let reason = packageDataAsText pkg rejectPending p (ServerError reason) return Handled | cmd == notAuthenticatedCmd -> do rejectPending p NotAuthenticatedOp return Handled | cmd == notHandledCmd -> do $(logDebug) [i|Not handled response received: #{pkg}.|] let Just msg = maybeDecodeMessage packageData reason = getField $ notHandledReason msg case reason of N_NotMaster -> do let Just details = getField $ notHandledAdditionalInfo msg info = masterInfo details node = masterInfoNodeEndPoints info restartPending reg p return $ Reconnect node -- In this case with just retry the operation. _ -> Handled <$ restartPending reg p | otherwise -> do applyResponse reg p pkg return Handled -------------------------------------------------------------------------------- -- | Occurs when an operation has been retried more than 's_operationRetry'. data OperationMaxAttemptReached = OperationMaxAttemptReached UUID Command deriving (Show, Typeable) -------------------------------------------------------------------------------- instance Exception OperationMaxAttemptReached -------------------------------------------------------------------------------- checkAndRetry :: Registry -> EventStore () checkAndRetry self@Registry{..} = do pendings <- readIORef _regPendings elapsed <- stopwatchElapsed _stopwatch newPendings <- foldM (checking elapsed) pendings (mapToList pendings) atomicWriteIORef _regPendings newPendings where checking elapsed reg (key, p) = do conn <- getConnection _regConnRef setts <- getSettings case _pendingRequest p of Nothing | connectionId conn == _pendingConnId p -> return reg | otherwise -> do resumeNoPkgSession self (_pendingSession p) disposed <- sessionDisposed _sessions (_pendingSession p) let newReg = if disposed then deleteMap key reg else reg return newReg Just req -> do let lastTry = _pendingLastTry p hasTimeout = elapsed - lastTry >= s_operationTimeout setts if hasTimeout || _pendingConnId p /= connectionId conn then do let retry = do uuid <- liftBase nextRandom let pkg = packageOf req uuid pending = p { _pendingLastTry = elapsed , _pendingRetries = _pendingRetries p + 1 , _pendingConnId = connectionId conn } nextReg = deleteMap key $ insertMap uuid pending reg enqueuePackage conn pkg return nextReg case s_operationRetry setts of AtMost maxAttempts | _pendingRetries p <= maxAttempts -> retry | otherwise -> do let cmd = _requestCmd req destroyPendingSession _sessions p rejectPending p (OperationMaxAttemptReached key cmd) return $ deleteMap key reg KeepRetrying -> retry else return reg -------------------------------------------------------------------------------- startAwaitings :: Registry -> EventStore () startAwaitings reg@Registry{..} = do awaitings <- atomicModifyIORef' _regAwaitings $ \stack -> ([], reverse stack) traverse_ starting awaitings where starting (Awaiting session) = execute reg session starting (AwaitingRequest session req) = do conn <- getConnection _regConnRef issueRequest reg session conn req -------------------------------------------------------------------------------- maybeDecodeMessage :: Decode a => ByteString -> Maybe a maybeDecodeMessage bytes = case runGet decodeMessage bytes of Right a -> Just a _ -> Nothing