{-| Module : Network.Nakadi.Internal.Worker Description : Implementation of Subscription Consumption Workers Copyright : (c) Moritz Clasmeier 2018 License : BSD3 Maintainer : mtesseract@silverratio.net Stability : experimental Portability : POSIX This internal module implements dispatching of batches to workers. -} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE TypeFamilies #-} module Network.Nakadi.Internal.Worker ( WorkerRegistry , spawnWorkers , workerDispatchSink , workersWait ) where import Network.Nakadi.Internal.Prelude import Conduit import Control.Lens import qualified Control.Monad.Trans.Resource as Resource import Data.Aeson import qualified Data.HashMap.Strict as HashMap import Data.List.NonEmpty ( NonEmpty(..) ) import qualified Data.List.NonEmpty as NonEmpty import qualified Data.Vector as Vector import Network.Nakadi.EventTypes.Partitions import Network.Nakadi.Subscriptions.Subscription import Network.Nakadi.Internal.Committer import qualified Network.Nakadi.Internal.Lenses as L import Network.Nakadi.Internal.Types import UnliftIO.Async import UnliftIO.STM -- | Spawn a single worker. spawnWorker :: ( MonadNakadi b m , MonadResource m , MonadUnliftIO m , MonadMask m , MonadIO m , FromJSON a , batch ~ (SubscriptionEventStreamBatch a) ) => SubscriptionEventStream -> ConduitM batch batch m () -> m (Worker a) spawnWorker eventStream processor = do u <- askUnliftIO workerQueue <- atomically $ newTBQueue 1024 let workerIO = unliftIO u (subscriptionWorker processor eventStream workerQueue) (_, workerAsync) <- Resource.allocate (async workerIO) cancel pure Worker {_queue = workerQueue, _async = workerAsync} -- | Spawn multiple workers and return a 'WorkerRegistry'. It is guaranteed that the worker -- registry contains at least one worker. spawnWorkers :: ( MonadNakadi b m , MonadUnliftIO m , MonadMask m , MonadResource m , FromJSON a , batch ~ SubscriptionEventStreamBatch a ) => SubscriptionId -> SubscriptionEventStream -> Int -> ConduitM batch batch m () -> m (WorkerRegistry a) spawnWorkers subscriptionId eventStream nWorkers processor = do let workerIndices = fromMaybe (1 :| []) (NonEmpty.nonEmpty [1 .. nWorkers]) workers <- forM workerIndices (\_idx -> spawnWorker eventStream processor) partitionIndexMap <- retrievePartitionIndexMap subscriptionId pure WorkerRegistry {_workers = workers, _partitionIndexMap = partitionIndexMap} -- | This function processes a subscription, taking care of applying -- the configured committing strategy. subscriptionWorker :: ( MonadNakadi b m , MonadUnliftIO m , MonadResource m , MonadMask m , FromJSON a , batch ~ (SubscriptionEventStreamBatch a) ) => ConduitM batch batch m () -- ^ User provided Conduit for stream. -> SubscriptionEventStream -> TBQueue batch -- ^ Streaming response from Nakadi -> m () subscriptionWorker processor eventStream queue = do config <- nakadiAsk -- This @producer@ is a Conduit producing subscription batches that have been successfully -- processed from the user provided callback @processor@. -- -- What remains to be done with the batches produced by the producer is committing the -- corresponding cursors. let producer = repeatMC (atomically (readTBQueue queue)) .| processor -- For committing we distinguish between two distinct strategies: synchronous and -- asynchronous comitting. case config ^. L.commitStrategy of CommitSync -> -- Synchronous case: Simply use a Conduit sink that commits -- every cursor. -- -- Run the Conduit, which reads batches from the queue, processes them -- and commits their cursors. runConduit $ producer .| subscriptionSink eventStream CommitAsync bufferingStrategy -> do -- Asynchronous case: Create a new queue and spawn a cursor -- committer thread depending on the configured commit buffering -- method. Then execute the provided Conduit processor with a -- sink that sends cursor information to the queue. The cursor -- committer thread reads from this queue and processes the -- cursors. -- -- Run the Conduit, which reads batches from the queue, processes them -- and sends their cursors to the cursor committer thread implementing -- the actual cursor committing logic. commitQueue <- liftIO . atomically $ newTBQueue asyncCommitQueueBufferSize withAsync (subscriptionCommitter bufferingStrategy eventStream commitQueue) $ \asyncHandle -> do -- This makes sure that if the cursor committing thread dies because -- of an exception, this exception will be re-raised in the current thread. link asyncHandle runConduit $ producer .| mapM_C (sendToQueue commitQueue) where sendToQueue commitQueue batch = liftIO . atomically $ do let cursor = batch ^. L.cursor events = fromMaybe Vector.empty (batch ^. L.events) nEvents = length events writeTBQueue commitQueue (nEvents, cursor) asyncCommitQueueBufferSize = 1024 -- | Retrieve the 'PartitionIndexMap' for the given subscription. This map is used for mapping -- per-event type partitions to worker indices. Given a pair consisting of an event type and -- a partition ID, the worker index references the worker in the worker registry responsible -- for processing batches originating from that partition. retrievePartitionIndexMap :: MonadNakadi b m => SubscriptionId -> m PartitionIndexMap retrievePartitionIndexMap subscriptionId = do eventTypes <- (view L.eventTypes) <$> subscriptionGet subscriptionId eventTypesWithPartition <- concat <$> forM eventTypes extractPartitionsForEventType pure . HashMap.fromList $ zip eventTypesWithPartition [0 ..] where extractPartitionsForEventType eventType = do partitions <- map (view L.partition) <$> eventTypePartitions eventType pure (zip partitions (repeat eventType)) -- | Conduit sink which dispatches a batch to a worker contained in -- the registry. workerDispatchSink :: (MonadIO m) => WorkerRegistry a -> ConduitM (SubscriptionEventStreamBatch a) Void m () workerDispatchSink registry = awaitForever $ \batch -> do let partition = batch ^. L.cursor . L.partition eventType = batch ^. L.cursor . L.eventType worker = pickWorker registry eventType partition atomically $ writeTBQueue (_queue worker) batch -- | Given a 'SubscriptionEventStreamBatch', produce the worker that -- should handle the batch. The worker is found using the 'PartitionIndexMap'. pickWorker :: WorkerRegistry a -> EventTypeName -> PartitionName -> Worker a pickWorker registry eventType partition = let workers = registry ^. L.workers nWorkers = NonEmpty.length workers in case HashMap.lookup (partition, eventType) (registry ^. L.partitionIndexMap) of Nothing -> -- We failed to find an entry in the PartitionIndexMap for that partition. -- This should rarely happen. -- It could, for example, happen if the number of partitions of an event -- type belonging to the subscription to be consumed was increased after -- subscription consumption has started. -- -- In the future, we could improve this fallback mechanism, if there is a -- need to do so. At the moment we simply use the first worker in case of -- lookup failures. NonEmpty.head workers Just idx -> -- Lookup successful. Truncate the resulting index using modulo in order to -- obtain a worker index and return the corresponding worker reference. workers NonEmpty.!! (idx `mod` nWorkers) -- | Block as long no worker has finished. Workers are supposed to run forever, unless -- cancelled. Therefore, using 'waitAnyCancel' essentially means that if some worker -- fails due to an uncaught exception, then all other workers are cancelled as well. workersWait :: MonadIO m => WorkerRegistry a -> m () workersWait registry = do let workerHandles = map _async $ NonEmpty.toList (registry ^. L.workers) void . waitAnyCancel $ workerHandles