{-|
Module      : Network.Nakadi.EventTypes.CursorsLag
Description : Implementation of Nakadi CursorsLag API
Copyright   : (c) Moritz Schulte 2017, 2018
License     : BSD3
Maintainer  : mtesseract@silverratio.net
Stability   : experimental
Portability : POSIX

This module implements the
@\/event-types\/EVENT-TYPE\/cursors-lag@ API.
-}

{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE MultiParamTypeClasses #-}

module Network.Nakadi.EventTypes.CursorsLag
  ( cursorsLag'
  , cursorsLag
  ) where

import           Network.Nakadi.Internal.Prelude

import           Control.Arrow
import           Control.Lens
import qualified Data.Map.Strict                 as Map
import           Network.Nakadi.Internal.Http
import qualified Network.Nakadi.Internal.Lenses  as L
import           Network.Nakadi.Internal.Util

path :: EventTypeName -> ByteString
path eventTypeName =
  "/event-types/"
  <> encodeUtf8 (unEventTypeName eventTypeName)
  <> "/cursors-lag"

-- | @POST@ to @\/event-types\/EVENT-TYPE\/cursors-lag@. Low level
-- interface.
cursorsLag' ::
  MonadNakadi b m
  => EventTypeName -- ^ Event Type
  -> [Cursor]      -- ^ Cursors for which to compute the lag for
  -> m [Partition] -- ^ Resulting partition information containing
                   -- information about unconsumed events.
cursorsLag' eventTypeName cursors = do
  config <- nakadiAsk
  httpJsonBody ok200 [] $
    (setRequestMethod "POST"
     . includeFlowId config
     . setRequestPath (path eventTypeName)
     . setRequestBodyJSON cursors)

-- | @POST@ to @\/event-types\/EVENT-TYPE\/cursors-lag@. High level
-- interface.
cursorsLag ::
  MonadNakadi b m
  => EventTypeName                  -- ^ Event Type
  -> Map PartitionName CursorOffset -- ^ Cursor offsets associated to
                                    -- partitions.
  -> m (Map PartitionName Int64)    -- ^ Cursor lags associated to partitions.
cursorsLag eventTypeName cursorsMap = do
  partitionStats <- cursorsLag' eventTypeName cursors
  return $ partitionStats & map ((view L.partition &&& view L.unconsumedEvents) >>> sequenceSnd)
                          & catMaybes
                          & Map.fromList
  where cursors = map (uncurry Cursor) (Map.toList cursorsMap)