{-| Module : Network.Nakadi.Internal.Committer.SmartBuffer Description : Implementation of SmartBuffer based Cursor Committing Strategy Copyright : (c) Moritz Clasmeier 2018 License : BSD3 Maintainer : mtesseract@silverratio.net Stability : experimental Portability : POSIX -} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE ScopedTypeVariables #-} module Network.Nakadi.Internal.Committer.SmartBuffer where import Network.Nakadi.Internal.Prelude import Control.Concurrent.Async.Timer ( Timer ) import qualified Control.Concurrent.Async.Timer as Timer import qualified Data.HashMap.Strict as HashMap import Control.Concurrent.STM ( retry ) import Control.Lens import Data.Function ( (&) ) import Network.Nakadi.Internal.Committer.Shared import qualified Network.Nakadi.Internal.Lenses as L import Network.Nakadi.Internal.Types import UnliftIO.Async import UnliftIO.STM -- | Implementation of the 'CommitSmartBuffer' strategy: We use an -- async timer for committing cursors at specified intervals, but if -- the number of uncommitted events reaches the threshold currently -- specified by @maxUncommittedEvents@ divided by 2 (safety factor) -- before the next scheduled commit, a commit is being done right away -- and the timer is resetted. -- -- The 'StagedCursor's in the 'CommitSmartBuffer' case carry an 'Int', -- which is used for counting the number of events processed on the -- respective partition since the last commit. committerSmartBuffer :: forall b m . (MonadNakadi b m, MonadUnliftIO m, MonadMask m) => SubscriptionEventStream -> TBQueue (Int, SubscriptionCursor) -> m () committerSmartBuffer eventStream queue = do nMaxEvents <- cursorBufferSize let millisDefault = 1000 timerConf = Timer.defaultConf & Timer.setInitDelay (fromIntegral millisDefault) & Timer.setInterval (fromIntegral millisDefault) if nMaxEvents > 1 then do cursorsMap <- liftIO . atomically $ newTVar HashMap.empty withAsync (cursorConsumer queue cursorsMap) $ \asyncCursorConsumer -> do link asyncCursorConsumer Timer.withAsyncTimer timerConf $ cursorCommitter eventStream cursorsMap nMaxEvents else unbufferedCommitLoop eventStream queue -- | The cursorsConsumer drains the cursors queue and adds -- each cursor to the provided cursorsMap. cursorConsumer :: (MonadIO m) => TBQueue (Int, SubscriptionCursor) -> TVar (StagedCursorsMap SubscriptionCursorWithCounter) -> m () cursorConsumer queue cursorsMap = forever . liftIO . atomically $ do (nEvents, cursor) <- readTBQueue queue let key = cursorKey cursor stagedCursor = SubscriptionCursorWithCounter cursor nEvents modifyTVar cursorsMap $ HashMap.insertWith updateCursor key stagedCursor -- | Adds the old number of events to the new entry in the -- cursors map. updateCursor :: SubscriptionCursorWithCounter -> SubscriptionCursorWithCounter -> SubscriptionCursorWithCounter updateCursor cursorNew cursorOld = cursorNew & L.nEvents %~ (+ (cursorOld ^. L.nEvents)) -- | Committer loop. cursorCommitter :: (MonadNakadi b m, MonadUnliftIO m) => SubscriptionEventStream -> TVar (StagedCursorsMap SubscriptionCursorWithCounter) -> Int -> Timer -> m () cursorCommitter eventStream cursorsMap nMaxEvents timer = forever $ race (Timer.wait timer) (maxEventsReached cursorsMap nMaxEvents) >>= \case Left _ -> -- Timer has elapsed, simply commit all currently -- buffered cursors. commitAllCursors (view L.cursor) eventStream cursorsMap Right _ -> do -- Events processed since last cursor commit have -- crosses configured threshold for at least one -- partition. Commit cursors on all such partitions. Timer.reset timer commitAllCursors (view L.cursor) eventStream cursorsMap -- | Returns list of cursors that should be committed -- considering the number of events processed on the -- respective partition since the last commit. Blocks until at -- least one such cursor is found. maxEventsReached :: MonadIO m => TVar (StagedCursorsMap SubscriptionCursorWithCounter) -> Int -> m () maxEventsReached stagedCursorsTv nMaxEvents = liftIO . atomically $ do stagedCursors <- readTVar stagedCursorsTv let cursorsList = HashMap.elems stagedCursors cursorsCommit = filter (shouldBeCommitted nMaxEvents) cursorsList if null cursorsCommit then retry else pure () -- | Returns True if the provided staged cursor should be committed. -- It is expected that the provided staged cursor carries an integral -- enrichment of the same type as @nMaxEvents@. shouldBeCommitted :: Int -> SubscriptionCursorWithCounter -> Bool shouldBeCommitted nMaxEvents stagedCursor = stagedCursor ^. L.nEvents >= nMaxEvents cursorBufferSize :: MonadNakadi b m => m Int cursorBufferSize = do conf <- nakadiAsk pure $ case conf ^. L.maxUncommittedEvents of Nothing -> 1 Just n -> n & fromIntegral & (* safetyFactor) & round where safetyFactor = 0.5