{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE RecordWildCards #-}
module Network.Nakadi.Subscriptions.Events
( subscriptionProcessConduit
, subscriptionProcess
, subscriptionSource
, subscriptionSourceEvents
)
where
import Network.Nakadi.Internal.Prelude
import Control.Concurrent.STM.TBMQueue
( TBMQueue
, newTBMQueue
, writeTBMQueue
, closeTBMQueue
, readTBMQueue
)
import UnliftIO.STM ( atomically )
import Conduit hiding ( throwM )
import Control.Lens
import Data.Aeson
import Control.Monad.Trans.Resource ( allocate )
import Network.HTTP.Client ( responseBody )
import Network.HTTP.Simple
import Network.HTTP.Types
import Network.Nakadi.Internal.Config
import Network.Nakadi.Internal.Conversions
import Network.Nakadi.Internal.Http
import qualified Network.Nakadi.Internal.Lenses
as L
import Network.Nakadi.Internal.Worker
import UnliftIO.Async
subscriptionProcess
:: (MonadNakadi b m, MonadUnliftIO m, MonadMask m, MonadResource m, FromJSON a)
=> SubscriptionId
-> (SubscriptionEventStreamBatch a -> m ())
-> m ()
subscriptionProcess subscriptionId processor = subscriptionProcessConduit subscriptionId conduit
where conduit = iterMC processor
subscriptionProcessConduit
:: ( MonadNakadi b m
, MonadUnliftIO m
, MonadResource m
, MonadMask m
, FromJSON a
, batch ~ SubscriptionEventStreamBatch a
)
=> SubscriptionId
-> ConduitM batch batch m ()
-> m ()
subscriptionProcessConduit subscriptionId processor = do
config <- nakadiAsk
let queryParams = buildConsumeQueryParameters config
httpJsonBodyStream
ok200
[(status404, errorSubscriptionNotFound)]
(includeFlowId config . setRequestPath path . setRequestQueryParameters queryParams)
$ subscriptionProcessHandler subscriptionId processor
where path = "/subscriptions/" <> subscriptionIdToByteString subscriptionId <> "/events"
buildSubscriptionEventStream
:: MonadThrow m => SubscriptionId -> Response a -> m SubscriptionEventStream
buildSubscriptionEventStream subscriptionId response =
case listToMaybe (getResponseHeader "X-Nakadi-StreamId" response) of
Just streamId -> pure SubscriptionEventStream
{ _streamId = StreamId (decodeUtf8 streamId)
, _subscriptionId = subscriptionId
}
Nothing -> throwM StreamIdMissing
subscriptionProcessHandler
:: forall a b m batch
. ( MonadNakadi b m
, MonadUnliftIO m
, MonadResource m
, MonadMask m
, FromJSON a
, batch ~ (SubscriptionEventStreamBatch a)
)
=> SubscriptionId
-> ConduitM batch batch m ()
-> Response (ConduitM () ByteString m ())
-> m ()
subscriptionProcessHandler subscriptionId processor response = do
config <- nakadiAsk
let nWorkers = config ^. L.worker . L.nThreads
eventStream <- buildSubscriptionEventStream subscriptionId response
workerRegistry <- spawnWorkers subscriptionId eventStream nWorkers processor
race_ (workersWait workerRegistry)
$ runConduit
$ responseBody response
.| linesUnboundedAsciiC
.| conduitDecode
.| mapC (identity :: batch -> batch)
.| workerDispatchSink workerRegistry
subscriptionSource
:: forall a b m
. (MonadNakadi b m, MonadUnliftIO m, MonadMask m, MonadResource m, FromJSON a)
=> SubscriptionId
-> ConduitM () (SubscriptionEventStreamBatch a) m ()
subscriptionSource subscriptionId = do
UnliftIO {..} <- lift askUnliftIO
streamLimit <- lift nakadiAsk <&> (fmap fromIntegral . view L.streamLimit)
queue <- atomically $ newTBMQueue queueSize
(_releaseKey, asyncHandle) <- allocate
(unliftIO (async (subscriptionConsumer streamLimit queue)))
cancel
link asyncHandle
drain queue
where
queueSize = 2048
drain queue = atomically (readTBMQueue queue) >>= \case
Just a -> yield a >> drain queue
Nothing -> pure ()
subscriptionConsumer :: Maybe Int -> TBMQueue (SubscriptionEventStreamBatch a) -> m ()
subscriptionConsumer maybeStreamLimit queue = go `finally` atomically (closeTBMQueue queue)
where
go = do
subscriptionProcess subscriptionId (void . atomically . writeTBMQueue queue)
when (isNothing maybeStreamLimit) go
subscriptionSourceEvents
:: (MonadNakadi b m, MonadUnliftIO m, MonadMask m, MonadResource m, FromJSON a)
=> SubscriptionId
-> ConduitM () a m ()
subscriptionSourceEvents subscriptionId =
subscriptionSource subscriptionId .| concatMapC (\batch -> fromMaybe mempty (batch & _events))