{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE TypeFamilies #-}
module Network.Nakadi.Internal.Worker
( WorkerRegistry
, spawnWorkers
, workerDispatchSink
, workersWait
)
where
import Network.Nakadi.Internal.Prelude
import Conduit
import Control.Lens
import qualified Control.Monad.Trans.Resource as Resource
import Data.Aeson
import qualified Data.HashMap.Strict as HashMap
import Data.List.NonEmpty ( NonEmpty(..) )
import qualified Data.List.NonEmpty as NonEmpty
import qualified Data.Vector as Vector
import Network.Nakadi.EventTypes.Partitions
import Network.Nakadi.Subscriptions.Subscription
import Network.Nakadi.Internal.Committer
import qualified Network.Nakadi.Internal.Lenses
as L
import Network.Nakadi.Internal.Types
import UnliftIO.Async
import UnliftIO.STM
spawnWorker
:: ( MonadNakadi b m
, MonadResource m
, MonadUnliftIO m
, MonadMask m
, MonadIO m
, FromJSON a
, batch ~ (SubscriptionEventStreamBatch a)
)
=> SubscriptionEventStream
-> ConduitM batch batch m ()
-> m (Worker a)
spawnWorker eventStream processor = do
u <- askUnliftIO
workerQueue <- atomically $ newTBQueue 1024
let workerIO = unliftIO u (subscriptionWorker processor eventStream workerQueue)
(_, workerAsync) <- Resource.allocate (async workerIO) cancel
pure Worker {_queue = workerQueue, _async = workerAsync}
spawnWorkers
:: ( MonadNakadi b m
, MonadUnliftIO m
, MonadMask m
, MonadResource m
, FromJSON a
, batch ~ SubscriptionEventStreamBatch a
)
=> SubscriptionId
-> SubscriptionEventStream
-> Int
-> ConduitM batch batch m ()
-> m (WorkerRegistry a)
spawnWorkers subscriptionId eventStream nWorkers processor = do
let workerIndices = fromMaybe (1 :| []) (NonEmpty.nonEmpty [1 .. nWorkers])
workers <- forM workerIndices (\_idx -> spawnWorker eventStream processor)
partitionIndexMap <- retrievePartitionIndexMap subscriptionId
pure WorkerRegistry {_workers = workers, _partitionIndexMap = partitionIndexMap}
subscriptionWorker
:: ( MonadNakadi b m
, MonadUnliftIO m
, MonadResource m
, MonadMask m
, FromJSON a
, batch ~ (SubscriptionEventStreamBatch a)
)
=> ConduitM batch batch m ()
-> SubscriptionEventStream
-> TBQueue batch
-> m ()
subscriptionWorker processor eventStream queue = do
config <- nakadiAsk
let producer = repeatMC (atomically (readTBQueue queue)) .| processor
case config ^. L.commitStrategy of
CommitSync ->
runConduit $ producer .| subscriptionSink eventStream
CommitAsync bufferingStrategy -> do
commitQueue <- liftIO . atomically $ newTBQueue asyncCommitQueueBufferSize
withAsync (subscriptionCommitter bufferingStrategy eventStream commitQueue) $ \asyncHandle ->
do
link asyncHandle
runConduit $ producer .| mapM_C (sendToQueue commitQueue)
where
sendToQueue commitQueue batch = liftIO . atomically $ do
let cursor = batch ^. L.cursor
events = fromMaybe Vector.empty (batch ^. L.events)
nEvents = length events
writeTBQueue commitQueue (nEvents, cursor)
asyncCommitQueueBufferSize = 1024
retrievePartitionIndexMap :: MonadNakadi b m => SubscriptionId -> m PartitionIndexMap
retrievePartitionIndexMap subscriptionId = do
eventTypes <- (view L.eventTypes) <$> subscriptionGet subscriptionId
eventTypesWithPartition <- concat <$> forM eventTypes extractPartitionsForEventType
pure . HashMap.fromList $ zip eventTypesWithPartition [0 ..]
where
extractPartitionsForEventType eventType = do
partitions <- map (view L.partition) <$> eventTypePartitions eventType
pure (zip partitions (repeat eventType))
workerDispatchSink
:: (MonadIO m) => WorkerRegistry a -> ConduitM (SubscriptionEventStreamBatch a) Void m ()
workerDispatchSink registry = awaitForever $ \batch -> do
let partition = batch ^. L.cursor . L.partition
eventType = batch ^. L.cursor . L.eventType
worker = pickWorker registry eventType partition
atomically $ writeTBQueue (_queue worker) batch
pickWorker :: WorkerRegistry a -> EventTypeName -> PartitionName -> Worker a
pickWorker registry eventType partition =
let workers = registry ^. L.workers
nWorkers = NonEmpty.length workers
in case HashMap.lookup (partition, eventType) (registry ^. L.partitionIndexMap) of
Nothing ->
NonEmpty.head workers
Just idx ->
workers NonEmpty.!! (idx `mod` nWorkers)
workersWait :: MonadIO m => WorkerRegistry a -> m ()
workersWait registry = do
let workerHandles = map _async $ NonEmpty.toList (registry ^. L.workers)
void . waitAnyCancel $ workerHandles