{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.Nakadi.Subscriptions.Events
( subscriptionProcessConduit
, subscriptionProcess
) where
import Network.Nakadi.Internal.Prelude
import Conduit hiding (throwM)
import qualified Control.Concurrent.Async.Timer as Timer
import Control.Concurrent.STM (TBQueue, TVar,
atomically, modifyTVar,
newTBQueue, newTVar,
readTBQueue, readTVar,
retry, swapTVar,
writeTBQueue)
import Control.Lens
import Control.Monad.Logger
import Data.Aeson
import qualified Data.Conduit.List as Conduit (mapM_)
import Data.HashMap.Strict (HashMap)
import qualified Data.HashMap.Strict as HashMap
import qualified Data.Vector as Vector
import Network.HTTP.Client (responseBody)
import Network.HTTP.Simple
import Network.HTTP.Types
import Network.Nakadi.Internal.Config
import Network.Nakadi.Internal.Conversions
import Network.Nakadi.Internal.Http
import qualified Network.Nakadi.Internal.Lenses as L
import Network.Nakadi.Subscriptions.Cursors
import UnliftIO.Async
subscriptionProcess
:: ( MonadNakadi b m
, MonadUnliftIO m
, MonadMask m
, FromJSON a )
=> Maybe ConsumeParameters
-> SubscriptionId
-> (SubscriptionEventStreamBatch a -> m ())
-> m ()
subscriptionProcess maybeConsumeParameters subscriptionId processor =
subscriptionProcessConduit maybeConsumeParameters subscriptionId conduit
where conduit = iterMC processor
subscriptionProcessConduit
:: ( MonadNakadi b m
, MonadUnliftIO m
, MonadMask m
, FromJSON a
, batch ~ SubscriptionEventStreamBatch a )
=> Maybe ConsumeParameters
-> SubscriptionId
-> ConduitM batch batch m ()
-> m ()
subscriptionProcessConduit maybeConsumeParameters subscriptionId processor = do
config <- nakadiAsk
let consumeParams = fromMaybe defaultConsumeParameters maybeConsumeParameters
queryParams = buildSubscriptionConsumeQueryParameters consumeParams
httpJsonBodyStream ok200 [(status404, errorSubscriptionNotFound)]
(includeFlowId config
. setRequestPath path
. setRequestQueryParameters queryParams) $
subscriptionProcessHandler consumeParams subscriptionId processor
where path = "/subscriptions/"
<> subscriptionIdToByteString subscriptionId
<> "/events"
buildSubscriptionEventStream
:: MonadThrow m
=> SubscriptionId
-> Response a
-> m SubscriptionEventStream
buildSubscriptionEventStream subscriptionId response =
case listToMaybe (getResponseHeader "X-Nakadi-StreamId" response) of
Just streamId ->
pure SubscriptionEventStream
{ _streamId = StreamId (decodeUtf8 streamId)
, _subscriptionId = subscriptionId }
Nothing ->
throwM StreamIdMissing
subscriptionProcessHandler
:: ( MonadNakadi b m
, MonadUnliftIO m
, MonadMask m
, FromJSON a
, batch ~ (SubscriptionEventStreamBatch a) )
=> ConsumeParameters
-> SubscriptionId
-> ConduitM batch batch m ()
-> Response (ConduitM () ByteString m ())
-> m ()
subscriptionProcessHandler consumeParams subscriptionId processor response = do
config <- nakadiAsk
eventStream <- buildSubscriptionEventStream subscriptionId response
let producer = responseBody response
.| linesUnboundedAsciiC
.| conduitDecode config
.| processor
case config^.L.commitStrategy of
CommitSync ->
runConduit $ producer .| subscriptionSink eventStream
CommitAsync bufferingStrategy -> do
queue <- liftIO . atomically $ newTBQueue 1024
withAsync (subscriptionCommitter bufferingStrategy consumeParams eventStream queue) $
\ asyncHandle -> do
link asyncHandle
runConduit $ producer .| Conduit.mapM_ (sendToQueue queue)
where sendToQueue queue batch = liftIO . atomically $ do
let cursor = batch^.L.cursor
events = fromMaybe Vector.empty (batch^.L.events)
nEvents = length events
writeTBQueue queue (nEvents, cursor)
subscriptionSink
:: (MonadIO m, MonadNakadi b m)
=> SubscriptionEventStream
-> ConduitM (SubscriptionEventStreamBatch a) void m ()
subscriptionSink eventStream = do
config <- lift nakadiAsk
awaitForever $ \ batch -> lift $ do
let cursor = batch^.L.cursor
catchAny (commitOneCursor eventStream cursor) $ \ exn -> nakadiLiftBase $
case config^.L.logFunc of
Just logFunc -> logFunc "nakadi-client" LevelWarn $ toLogStr $
"Failed to synchronously commit cursor: " <> tshow exn
Nothing ->
pure ()
type CursorsMap = HashMap (EventTypeName, PartitionName) (Int, SubscriptionCursor)
emptyCursorsMap :: CursorsMap
emptyCursorsMap = HashMap.empty
cursorKey :: SubscriptionCursor -> (EventTypeName, PartitionName)
cursorKey cursor = (cursor^.L.eventType, cursor^.L.partition)
unbufferedCommitLoop
:: (MonadNakadi b m, MonadIO m)
=> SubscriptionEventStream
-> TBQueue (Int, SubscriptionCursor)
-> m ()
unbufferedCommitLoop eventStream queue = do
config <- nakadiAsk
forever $ do
(_nEvents, cursor) <- liftIO . atomically . readTBQueue $ queue
catchAny (subscriptionCursorCommit eventStream [cursor]) $ \ exn -> do
nakadiLiftBase $
case config^.L.logFunc of
Just logFunc -> logFunc "nakadi-client" LevelWarn $ toLogStr $
"Failed to commit cursor " <> tshow cursor <> ": " <> tshow exn
Nothing ->
pure ()
cursorBufferSize :: ConsumeParameters -> Int
cursorBufferSize consumeParams =
case consumeParams^.L.maxUncommittedEvents of
Nothing -> 1
Just n -> n
& fromIntegral
& (* safetyFactor)
& round
where safetyFactor = 0.5
subscriptionCommitter
:: ( MonadNakadi b m
, MonadUnliftIO m
, MonadMask m )
=> CommitBufferingStrategy
-> ConsumeParameters
-> SubscriptionEventStream
-> TBQueue (Int, SubscriptionCursor)
-> m ()
subscriptionCommitter CommitNoBuffer _consumeParams eventStream queue =
unbufferedCommitLoop eventStream queue
subscriptionCommitter (CommitTimeBuffer millis) _consumeParams eventStream queue = do
let timerConf = Timer.defaultConf
& Timer.setInitDelay (fromIntegral millis)
& Timer.setInterval (fromIntegral millis)
cursorsMap <- liftIO . atomically $ newTVar emptyCursorsMap
withAsync (cursorConsumer cursorsMap) $ \ asyncCursorConsumer -> do
link asyncCursorConsumer
Timer.withAsyncTimer timerConf $ \ timer -> forever $ do
Timer.wait timer
commitAllCursors eventStream cursorsMap
where
cursorConsumer cursorsMap = forever . liftIO . atomically $ do
(_, cursor) <- readTBQueue queue
modifyTVar cursorsMap (HashMap.insert (cursorKey cursor) (0, cursor))
subscriptionCommitter CommitSmartBuffer consumeParams eventStream queue = do
let millisDefault = 1000
nMaxEvents = cursorBufferSize consumeParams
timerConf = Timer.defaultConf
& Timer.setInitDelay (fromIntegral millisDefault)
& Timer.setInterval (fromIntegral millisDefault)
if nMaxEvents > 1
then do cursorsMap <- liftIO . atomically $ newTVar emptyCursorsMap
withAsync (cursorConsumer cursorsMap) $ \ asyncCursorConsumer -> do
link asyncCursorConsumer
Timer.withAsyncTimer timerConf $ cursorCommitter cursorsMap nMaxEvents
else unbufferedCommitLoop eventStream queue
where
cursorConsumer cursorsMap = forever . liftIO . atomically $ do
(nEvents, cursor) <- readTBQueue queue
modifyTVar cursorsMap $
HashMap.insertWith updateCursor (cursorKey cursor) (nEvents, cursor)
updateCursor cursorNew _cursorOld @ (nEventsOld, _) =
cursorNew & _1 %~ (+ nEventsOld)
cursorCommitter cursorsMap nMaxEvents timer = forever $ do
race (Timer.wait timer) (maxEventsReached cursorsMap nMaxEvents) >>= \ case
Left _ ->
commitAllCursors eventStream cursorsMap
Right _ -> do
Timer.reset timer
commitAllCursors eventStream cursorsMap
maxEventsReached cursorsMap nMaxEvents = liftIO . atomically $ do
cursorsList <- HashMap.toList <$> readTVar cursorsMap
let cursorsCommit = filter (shouldBeCommitted nMaxEvents) cursorsList
if null cursorsCommit
then retry
else pure ()
shouldBeCommitted nMaxEvents cursor = cursor^._2._1 >= nMaxEvents
commitAllCursors
:: (MonadNakadi b m, MonadIO m)
=> SubscriptionEventStream
-> TVar CursorsMap
-> m ()
commitAllCursors eventStream cursorsMap = do
cursors <- liftIO . atomically $ swapTVar cursorsMap emptyCursorsMap
forM_ cursors $ \ (_nEvents, cursor) -> commitOneCursor eventStream cursor
commitOneCursor
:: (MonadIO m, MonadNakadi b m)
=> SubscriptionEventStream
-> SubscriptionCursor
-> m ()
commitOneCursor eventStream cursor = do
config <- nakadiAsk
catchAny (subscriptionCursorCommit eventStream [cursor]) $ \ exn -> nakadiLiftBase $
case config^.L.logFunc of
Just logFunc -> logFunc "nakadi-client" LevelWarn $ toLogStr $
"Failed to commit cursor " <> tshow cursor <> ": " <> tshow exn
Nothing ->
pure ()