{-|
Module      : Network.Nakadi.Subscriptions.Events
Description : Implementation of Nakadi Subscription Events API
Copyright   : (c) Moritz Clasmeier 2017, 2018
License     : BSD3
Maintainer  : mtesseract@silverratio.net
Stability   : experimental
Portability : POSIX

This module implements a high level interface for the
@\/subscriptions\/SUBSCRIPTIONS\/events@ API.
-}

{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE LambdaCase            #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RecordWildCards       #-}
{-# LANGUAGE ScopedTypeVariables   #-}

module Network.Nakadi.Subscriptions.Events
  ( subscriptionProcessConduit
  , subscriptionProcess
  ) where

import           Network.Nakadi.Internal.Prelude

import           Conduit                              hiding (throwM)
import qualified Control.Concurrent.Async.Timer       as Timer
import           Control.Concurrent.STM               (TBQueue, TVar,
                                                       atomically, modifyTVar,
                                                       newTBQueue, newTVar,
                                                       readTBQueue, readTVar,
                                                       retry, swapTVar,
                                                       writeTBQueue)
import           Control.Lens
import           Control.Monad.Logger
import           Data.Aeson
import qualified Data.Conduit.List                    as Conduit (mapM_)
import           Data.HashMap.Strict                  (HashMap)
import qualified Data.HashMap.Strict                  as HashMap
import qualified Data.Vector                          as Vector
import           Network.HTTP.Client                  (responseBody)
import           Network.HTTP.Simple
import           Network.HTTP.Types
import           Network.Nakadi.Internal.Config
import           Network.Nakadi.Internal.Conversions
import           Network.Nakadi.Internal.Http
import qualified Network.Nakadi.Internal.Lenses       as L
import           Network.Nakadi.Subscriptions.Cursors
import           UnliftIO.Async

-- | Consumes the specified subscription using the commit strategy
-- contained in the configuration. Each consumed batch of subscription
-- events is provided to the provided batch processor action. If this
-- action throws an exception, subscription consumption will terminate.
subscriptionProcess
  :: ( MonadNakadi b m
     , MonadUnliftIO m
     , MonadMask m
     , FromJSON a )
  => Maybe ConsumeParameters                  -- ^ 'ConsumeParameters'
                                              -- to use
  -> SubscriptionId                           -- ^ Subscription to consume
  -> (SubscriptionEventStreamBatch a -> m ()) -- ^ Batch processor action
  -> m ()
subscriptionProcess maybeConsumeParameters subscriptionId processor =
  subscriptionProcessConduit maybeConsumeParameters subscriptionId conduit
  where conduit = iterMC processor

-- | Conduit based interface for subscription consumption. Consumes
-- the specified subscription using the commit strategy contained in
-- the configuration. Each consumed event batch is then processed by
-- the provided conduit. If an exception is thrown inside the conduit,
-- subscription consumption will terminate.
subscriptionProcessConduit
  :: ( MonadNakadi b m
     , MonadUnliftIO m
     , MonadMask m
     , FromJSON a
     , batch ~ SubscriptionEventStreamBatch a )
  => Maybe ConsumeParameters   -- ^ 'ConsumeParameters' to use
  -> SubscriptionId            -- ^ Subscription to consume
  -> ConduitM batch batch m () -- ^ Conduit processor.
  -> m ()
subscriptionProcessConduit maybeConsumeParameters subscriptionId processor = do
  config <- nakadiAsk
  let consumeParams = fromMaybe defaultConsumeParameters maybeConsumeParameters
      queryParams   = buildSubscriptionConsumeQueryParameters consumeParams
  httpJsonBodyStream ok200 [(status404, errorSubscriptionNotFound)]
    (includeFlowId config
     . setRequestPath path
     . setRequestQueryParameters queryParams) $
    subscriptionProcessHandler consumeParams subscriptionId processor

  where path = "/subscriptions/"
               <> subscriptionIdToByteString subscriptionId
               <> "/events"

-- | Derive a 'SubscriptionEventStream' from the provided
-- 'SubscriptionId' and Nakadi streaming response.
buildSubscriptionEventStream
  :: MonadThrow m
  => SubscriptionId
  -> Response a
  -> m SubscriptionEventStream
buildSubscriptionEventStream subscriptionId response =
  case listToMaybe (getResponseHeader "X-Nakadi-StreamId" response) of
    Just streamId ->
      pure SubscriptionEventStream
      { _streamId       = StreamId (decodeUtf8 streamId)
      , _subscriptionId = subscriptionId }
    Nothing ->
      throwM StreamIdMissing

-- | This function processes a subscription, taking care of applying
-- the configured committing strategy.
subscriptionProcessHandler
  :: ( MonadNakadi b m
     , MonadUnliftIO m
     , MonadMask m
     , FromJSON a
     , batch ~ (SubscriptionEventStreamBatch a) )
  => ConsumeParameters
  -> SubscriptionId                         -- ^ Subscription ID required for committing.
  -> ConduitM batch batch m ()              -- ^ User provided Conduit for stream.
  -> Response (ConduitM () ByteString m ()) -- ^ Streaming response from Nakadi
  -> m ()
subscriptionProcessHandler consumeParams subscriptionId processor response = do
  config      <- nakadiAsk
  eventStream <- buildSubscriptionEventStream subscriptionId response
  let producer = responseBody response
                 .| linesUnboundedAsciiC
                 .| conduitDecode config
                 .| processor
  case config^.L.commitStrategy of
    CommitSync ->
      -- Synchronous case: Simply use a Conduit sink that commits
      -- every cursor.
      runConduit $ producer .| subscriptionSink eventStream
    CommitAsync bufferingStrategy -> do
      -- Asynchronous case: Create a new queue and spawn a cursor
      -- committer thread depending on the configured commit buffering
      -- method. Then execute the provided Conduit processor with a
      -- sink that sends cursor information to the queue. The cursor
      -- committer thread reads from this queue and processes the
      -- cursors.
      queue <- liftIO . atomically $ newTBQueue 1024
      withAsync (subscriptionCommitter bufferingStrategy consumeParams eventStream queue) $
        \ asyncHandle -> do
          link asyncHandle
          runConduit $ producer .| Conduit.mapM_ (sendToQueue queue)

  where sendToQueue queue batch = liftIO . atomically $ do
          let cursor  = batch^.L.cursor
              events  = fromMaybe Vector.empty (batch^.L.events)
              nEvents = length events
          writeTBQueue queue (nEvents, cursor)

-- | Sink which can be used as sink for Conduits processing
-- subscriptions events. This sink takes care of committing events. It
-- can consume any values which contain Subscription Cursors.
subscriptionSink
  :: (MonadIO m, MonadNakadi b m)
  => SubscriptionEventStream
  -> ConduitM (SubscriptionEventStreamBatch a) void m ()
subscriptionSink eventStream = do
  config <- lift nakadiAsk
  awaitForever $ \ batch -> lift $ do
    let cursor  = batch^.L.cursor
    catchAny (commitOneCursor eventStream cursor) $ \ exn -> nakadiLiftBase $
      case config^.L.logFunc of
        Just logFunc -> logFunc "nakadi-client" LevelWarn $ toLogStr $
          "Failed to synchronously commit cursor: " <> tshow exn
        Nothing ->
          pure ()

type CursorsMap = HashMap (EventTypeName, PartitionName) (Int, SubscriptionCursor)

emptyCursorsMap :: CursorsMap
emptyCursorsMap = HashMap.empty

cursorKey :: SubscriptionCursor -> (EventTypeName, PartitionName)
cursorKey cursor = (cursor^.L.eventType, cursor^.L.partition)

-- | Implementation for the 'CommitNoBuffer' strategy: We simply read
-- every cursor and commit it in order.
unbufferedCommitLoop
  :: (MonadNakadi b m, MonadIO m)
  => SubscriptionEventStream
  -> TBQueue (Int, SubscriptionCursor)
  -> m ()
unbufferedCommitLoop eventStream queue = do
  config <- nakadiAsk
  forever $ do
    (_nEvents, cursor) <- liftIO . atomically . readTBQueue $ queue
    catchAny (subscriptionCursorCommit eventStream [cursor]) $ \ exn -> do
      nakadiLiftBase $
        case config^.L.logFunc of
          Just logFunc -> logFunc "nakadi-client" LevelWarn $ toLogStr $
            "Failed to commit cursor " <> tshow cursor <> ": " <> tshow exn
          Nothing ->
            pure ()

cursorBufferSize :: ConsumeParameters -> Int
cursorBufferSize consumeParams =
  case consumeParams^.L.maxUncommittedEvents of
    Nothing -> 1
    Just n  -> n
               & fromIntegral
               & (* safetyFactor)
               & round

  where safetyFactor     = 0.5

-- | Main function for the cursor committer thread. Logic depends on
-- the provided buffering strategy.
subscriptionCommitter
  :: ( MonadNakadi b m
     , MonadUnliftIO m
     , MonadMask m )
  => CommitBufferingStrategy
  -> ConsumeParameters
  -> SubscriptionEventStream
  -> TBQueue (Int, SubscriptionCursor)
  -> m ()

-- | Implementation for the 'CommitNoBuffer' strategy: We simply read
-- every cursor and commit it in order.
subscriptionCommitter CommitNoBuffer _consumeParams eventStream queue =
  unbufferedCommitLoop eventStream queue

-- | Implementation of the 'CommitTimeBuffer' strategy: We use an
-- async timer for committing cursors at specified intervals.
subscriptionCommitter (CommitTimeBuffer millis) _consumeParams eventStream queue = do
  let timerConf = Timer.defaultConf
                  & Timer.setInitDelay (fromIntegral millis)
                  & Timer.setInterval  (fromIntegral millis)
  cursorsMap <- liftIO . atomically $ newTVar emptyCursorsMap
  withAsync (cursorConsumer cursorsMap) $ \ asyncCursorConsumer -> do
    link asyncCursorConsumer
    Timer.withAsyncTimer timerConf $ \ timer -> forever $ do
      Timer.wait timer
      commitAllCursors eventStream cursorsMap

  where -- | The cursorsConsumer drains the cursors queue and adds each
        -- cursor to the provided cursorsMap.
        cursorConsumer cursorsMap = forever . liftIO . atomically $ do
          (_, cursor) <- readTBQueue queue
          modifyTVar cursorsMap (HashMap.insert (cursorKey cursor) (0, cursor))

-- | Implementation of the 'CommitSmartBuffer' strategy: We use an
-- async timer for committing cursors at specified intervals, but if
-- the number of uncommitted events reaches some threshold before the
-- next scheduled commit, a commit is being done right away and the
-- timer is resetted.
subscriptionCommitter CommitSmartBuffer consumeParams eventStream queue = do
  let millisDefault = 1000
      nMaxEvents    = cursorBufferSize consumeParams
      timerConf     = Timer.defaultConf
                      & Timer.setInitDelay (fromIntegral millisDefault)
                      & Timer.setInterval  (fromIntegral millisDefault)
  if nMaxEvents > 1
    then do cursorsMap <- liftIO . atomically $ newTVar emptyCursorsMap
            withAsync (cursorConsumer cursorsMap) $ \ asyncCursorConsumer -> do
              link asyncCursorConsumer
              Timer.withAsyncTimer timerConf $ cursorCommitter cursorsMap nMaxEvents
    else unbufferedCommitLoop eventStream queue

  where -- | The cursorsConsumer drains the cursors queue and adds
        -- each cursor to the provided cursorsMap.
        cursorConsumer cursorsMap = forever . liftIO . atomically $ do
          (nEvents, cursor) <- readTBQueue queue
          modifyTVar cursorsMap $
            HashMap.insertWith updateCursor (cursorKey cursor) (nEvents, cursor)

        -- | Adds the old number of events to the new entry in the
        -- cursors map.
        updateCursor cursorNew _cursorOld @ (nEventsOld, _) =
          cursorNew & _1 %~ (+ nEventsOld)

        -- | Committer loop.
        cursorCommitter cursorsMap nMaxEvents timer = forever $  do
          race (Timer.wait timer) (maxEventsReached cursorsMap nMaxEvents) >>= \ case
            Left _ ->
              -- Timer has elapsed, simply commit all currently
              -- buffered cursors.
              commitAllCursors eventStream cursorsMap
            Right _ -> do
              -- Events processed since last cursor commit have
              -- crosses configured threshold for at least one
              -- partition. Commit cursors on all such partitions.
              Timer.reset timer
              commitAllCursors eventStream cursorsMap

        -- | Returns list of cursors that should be committed
        -- considering the number of events processed on the
        -- respective partition since the last commit. Blocks until at
        -- least one such cursor is found.
        maxEventsReached cursorsMap nMaxEvents = liftIO . atomically $ do
          cursorsList <- HashMap.toList <$> readTVar cursorsMap
          let cursorsCommit = filter (shouldBeCommitted nMaxEvents) cursorsList
          if null cursorsCommit
            then retry
            else pure ()

        -- | Returns True if the provided cursor should be committed.
        shouldBeCommitted nMaxEvents cursor = cursor^._2._1  >= nMaxEvents

-- | This function commits all cursors in the provided cursorsMap.
commitAllCursors
  :: (MonadNakadi b m, MonadIO m)
  => SubscriptionEventStream
  -> TVar CursorsMap
  -> m ()
commitAllCursors eventStream cursorsMap = do
  cursors <- liftIO . atomically $ swapTVar cursorsMap emptyCursorsMap
  forM_ cursors $ \ (_nEvents, cursor) -> commitOneCursor eventStream cursor

-- | This function takes care of committing a single cursor. Exceptions will be
-- catched and logged, but the failure will NOT be propagated. This means that
-- Nakadi itself is in control of disconnecting us.
commitOneCursor
  :: (MonadIO m, MonadNakadi b m)
  => SubscriptionEventStream
  -> SubscriptionCursor
  -> m ()
commitOneCursor eventStream cursor = do
  config <- nakadiAsk
  catchAny (subscriptionCursorCommit eventStream [cursor]) $ \ exn -> nakadiLiftBase $
    case config^.L.logFunc of
      Just logFunc -> logFunc "nakadi-client" LevelWarn $ toLogStr $
        "Failed to commit cursor " <> tshow cursor <> ": " <> tshow exn
      Nothing ->
        pure ()