{-| Module : Network.Nakadi.Subscriptions.Events Description : Implementation of Nakadi Subscription Events API Copyright : (c) Moritz Clasmeier 2017, 2018 License : BSD3 Maintainer : mtesseract@silverratio.net Stability : experimental Portability : POSIX This module implements a high level interface for the @\/subscriptions\/SUBSCRIPTIONS\/events@ API. -} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE RecordWildCards #-} module Network.Nakadi.Subscriptions.Events ( subscriptionProcessConduit , subscriptionProcess , subscriptionSource , subscriptionSourceEvents ) where import Network.Nakadi.Internal.Prelude import Control.Concurrent.STM.TBMQueue ( TBMQueue , newTBMQueue , writeTBMQueue , closeTBMQueue , readTBMQueue ) import UnliftIO.STM ( atomically ) import Conduit hiding ( throwM ) import Control.Lens import Data.Aeson import Control.Monad.Trans.Resource ( allocate ) import Network.HTTP.Client ( responseBody ) import Network.HTTP.Simple import Network.HTTP.Types import Network.Nakadi.Internal.Config import Network.Nakadi.Internal.Conversions import Network.Nakadi.Internal.Http import qualified Network.Nakadi.Internal.Lenses as L import Network.Nakadi.Internal.Worker import UnliftIO.Async -- For dispatching batches to workers, we maintain an integer-indexed -- (non-empty) list of workers. Thus, we need have a way for mapping a -- subscription batch, more precisely the cursors belonging to a -- subscription batch, to some worker index. Mapping a cursor to an -- Integer is sufficient, as we can simply take the reminder modulo -- number of workers in our workers list. -- -- Nakadi subscription cursors contain a partition reference and an -- event type name — both given as strings. How do we derive an -- integer from this data? The idea is to built a 'PartitionIndexMap' -- upfrom, which allows us to establish this mapping. It is expected -- that this map contains an entry for every valid @(PartitionName, -- EventTypeName)@ combination for the subscription to be consumed. -- If, for some reason, we receive cursors not contained in this map, -- we map it to the zero index. -- | Consumes the specified subscription using the commit strategy -- contained in the configuration. Each consumed batch of subscription -- events is provided to the provided batch processor action. If this -- action throws an exception, subscription consumption will terminate. subscriptionProcess :: (MonadNakadi b m, MonadUnliftIO m, MonadMask m, MonadResource m, FromJSON a) => SubscriptionId -- ^ Subscription to consume -> (SubscriptionEventStreamBatch a -> m ()) -- ^ Batch processor action -> m () subscriptionProcess subscriptionId processor = subscriptionProcessConduit subscriptionId conduit where conduit = iterMC processor -- | Conduit based interface for subscription consumption. Consumes -- the specified subscription using the commit strategy contained in -- the configuration. Each consumed event batch is then processed by -- the provided conduit. If an exception is thrown inside the conduit, -- subscription consumption will terminate. subscriptionProcessConduit :: ( MonadNakadi b m , MonadUnliftIO m , MonadResource m , MonadMask m , FromJSON a , batch ~ SubscriptionEventStreamBatch a ) => SubscriptionId -- ^ Subscription to consume -> ConduitM batch batch m () -- ^ Conduit processor. -> m () subscriptionProcessConduit subscriptionId processor = do config <- nakadiAsk let queryParams = buildConsumeQueryParameters config httpJsonBodyStream ok200 [(status404, errorSubscriptionNotFound)] (includeFlowId config . setRequestPath path . setRequestQueryParameters queryParams) $ subscriptionProcessHandler subscriptionId processor where path = "/subscriptions/" <> subscriptionIdToByteString subscriptionId <> "/events" -- | Derive a 'SubscriptionEventStream' from the provided -- 'SubscriptionId' and Nakadi streaming response. buildSubscriptionEventStream :: MonadThrow m => SubscriptionId -> Response a -> m SubscriptionEventStream buildSubscriptionEventStream subscriptionId response = case listToMaybe (getResponseHeader "X-Nakadi-StreamId" response) of Just streamId -> pure SubscriptionEventStream { _streamId = StreamId (decodeUtf8 streamId) , _subscriptionId = subscriptionId } Nothing -> throwM StreamIdMissing -- | This function processes a subscription, taking care of -- dispatching to worker threads and applying the configured -- committing strategy. subscriptionProcessHandler :: forall a b m batch . ( MonadNakadi b m , MonadUnliftIO m , MonadResource m , MonadMask m , FromJSON a , batch ~ (SubscriptionEventStreamBatch a) ) => SubscriptionId -- ^ Subscription ID required for committing. -> ConduitM batch batch m () -- ^ User provided Conduit for stream. -> Response (ConduitM () ByteString m ()) -- ^ Streaming response from Nakadi -> m () subscriptionProcessHandler subscriptionId processor response = do config <- nakadiAsk let nWorkers = config ^. L.worker . L.nThreads eventStream <- buildSubscriptionEventStream subscriptionId response workerRegistry <- spawnWorkers subscriptionId eventStream nWorkers processor race_ (workersWait workerRegistry) $ runConduit $ responseBody response .| linesUnboundedAsciiC .| conduitDecode .| mapC (identity :: batch -> batch) .| workerDispatchSink workerRegistry -- | Experimental API. -- -- Creates a Conduit source from a subscription ID. The source will produce subscription -- event stream batches. Note the batches will be asynchronously committed irregardless of -- any event processing logic. Use this function only if the guarantees provided by Nakadi -- for event processing and cursor committing are not required or not desired. subscriptionSource :: forall a b m . (MonadNakadi b m, MonadUnliftIO m, MonadMask m, MonadResource m, FromJSON a) => SubscriptionId -- ^ Subscription to consume. -> ConduitM () (SubscriptionEventStreamBatch a) m () -- ^ Conduit source. subscriptionSource subscriptionId = do UnliftIO {..} <- lift askUnliftIO streamLimit <- lift nakadiAsk <&> (fmap fromIntegral . view L.streamLimit) queue <- atomically $ newTBMQueue queueSize (_releaseKey, asyncHandle) <- allocate (unliftIO (async (subscriptionConsumer streamLimit queue))) cancel link asyncHandle drain queue where queueSize = 2048 drain queue = atomically (readTBMQueue queue) >>= \case Just a -> yield a >> drain queue Nothing -> pure () subscriptionConsumer :: Maybe Int -> TBMQueue (SubscriptionEventStreamBatch a) -> m () subscriptionConsumer maybeStreamLimit queue = go `finally` atomically (closeTBMQueue queue) where go = do subscriptionProcess subscriptionId (void . atomically . writeTBMQueue queue) -- We only reconnect automatically when no stream limit is set. -- This effectively means that we don't try to reach @streamLimit@ events exactly. -- We simply regard @streamLimit@ as an upper bound and in case Nakadi disconnects us -- earlier or the connection breaks, we produce less than @streamLimit@ events. when (isNothing maybeStreamLimit) go -- | Experimental API. -- -- Similar to `subscriptionSource`, but the created Conduit source provides events instead -- of event batches. subscriptionSourceEvents :: (MonadNakadi b m, MonadUnliftIO m, MonadMask m, MonadResource m, FromJSON a) => SubscriptionId -- ^ Subscription to consume -> ConduitM () a m () -- ^ Conduit processor. subscriptionSourceEvents subscriptionId = subscriptionSource subscriptionId .| concatMapC (\batch -> fromMaybe mempty (batch & _events))