{-|
Module      : Network.Nakadi.Subscriptions.Cursors
Description : Implementation of Nakadi Subscription Cursors API
Copyright   : (c) Moritz Clasmeier 2017, 2018
License     : BSD3
Maintainer  : mtesseract@silverratio.net
Stability   : experimental
Portability : POSIX

This module implements the @\/subscriptions\/SUBSCRIPTIONS\/cursors@
API.
-}

{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE LambdaCase            #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NoImplicitPrelude     #-}
{-# LANGUAGE OverloadedStrings     #-}
{-# LANGUAGE RecordWildCards       #-}

module Network.Nakadi.Subscriptions.Cursors
  ( subscriptionCursorCommit'
  , subscriptionCursorCommit
  , subscriptionCursors
  , subscriptionCursorsReset
  )
where

import           Network.Nakadi.Internal.Prelude

import           Data.Aeson

import qualified Control.Exception.Safe        as Safe
import           Control.Lens
import qualified Data.HashMap.Lazy             as HashMap
import           Network.Nakadi.Internal.Conversions
import           Network.Nakadi.Internal.Http
import           Network.Nakadi.Internal.Lenses ( HasNakadiSubscriptionCursor )
import qualified Network.Nakadi.Internal.Lenses
                                               as L

path :: SubscriptionId -> ByteString
path subscriptionId = "/subscriptions/" <> subscriptionIdToByteString subscriptionId <> "/cursors"

-- | @POST@ to @\/subscriptions\/SUBSCRIPTION-ID\/cursors@. Commits
-- cursors using low level interface.
subscriptionCursorCommit'
  :: MonadNakadi b m
  => SubscriptionId           -- ^ Subsciption ID
  -> StreamId                 -- ^ Stream ID
  -> SubscriptionCursorCommit -- ^ Subscription Cursor to commit
  -> m ()
subscriptionCursorCommit' subscriptionId streamId cursors = httpJsonNoBody
  status204
  [(ok200, errorCursorAlreadyCommitted)]
  ( setRequestMethod "POST"
  . addRequestHeader "X-Nakadi-StreamId" (encodeUtf8 (unStreamId streamId))
  . setRequestBodyJSON cursors
  . setRequestPath (path subscriptionId)
  )

-- | @POST@ to @\/subscriptions\/SUBSCRIPTION\/cursors@. Commits
-- cursors using high level interface.
subscriptionCursorCommit
  :: (MonadNakadi b m, MonadCatch m, HasNakadiSubscriptionCursor a)
  => SubscriptionEventStream
  -> [a] -- ^ Values containing Subscription Cursors to commit
  -> m ()
subscriptionCursorCommit SubscriptionEventStream {..} as = Safe.catchJust
  exceptionPredicate
  (subscriptionCursorCommit' _subscriptionId _streamId cursorsCommit)
  (const (return ()))
 where
  exceptionPredicate = \case
    CursorAlreadyCommitted _ -> Just ()
    _                        -> Nothing

  cursors       = map (^. L.subscriptionCursor) as
  cursorsCommit = SubscriptionCursorCommit cursors

-- | @GET@ to @\/subscriptions\/SUBSCRIPTION\/cursors@. Retrieves
-- subscriptions cursors.
subscriptionCursors
  :: MonadNakadi b m
  => SubscriptionId         -- ^ Subscription ID
  -> m [SubscriptionCursor] -- ^ Subscription Cursors for the specified Subscription
subscriptionCursors subscriptionId =
  httpJsonBody ok200 [] (setRequestMethod "GET" . setRequestPath (path subscriptionId))

-- | @PATCH@ to @\/subscriptions\/SUBSCRIPTION\/cursors@. Resets
-- subscriptions cursors.
subscriptionCursorsReset
  :: MonadNakadi b m
  => SubscriptionId                   -- ^ Subscription ID
  -> [SubscriptionCursorWithoutToken] -- ^ Subscription Cursors to reset
  -> m ()
subscriptionCursorsReset subscriptionId cursors = httpJsonNoBody
  status204
  [(status404, errorSubscriptionNotFound), (status409, errorCursorResetInProgress)]
  (setRequestMethod "PATCH" . setRequestPath (path subscriptionId) . setRequestBodyJSON
    (Object (HashMap.fromList [("items", toJSON cursors)]))
  )