{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NoImplicitPrelude #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.Nakadi.Subscriptions.Cursors
( subscriptionCursorCommit'
, subscriptionCursorCommit
, subscriptionCursors
, subscriptionCursorsReset
)
where
import Network.Nakadi.Internal.Prelude
import Data.Aeson
import qualified Control.Exception.Safe as Safe
import Control.Lens
import qualified Data.HashMap.Lazy as HashMap
import Network.Nakadi.Internal.Conversions
import Network.Nakadi.Internal.Http
import Network.Nakadi.Internal.Lenses ( HasNakadiSubscriptionCursor )
import qualified Network.Nakadi.Internal.Lenses
as L
path :: SubscriptionId -> ByteString
path subscriptionId = "/subscriptions/" <> subscriptionIdToByteString subscriptionId <> "/cursors"
subscriptionCursorCommit'
:: MonadNakadi b m
=> SubscriptionId
-> StreamId
-> SubscriptionCursorCommit
-> m ()
subscriptionCursorCommit' subscriptionId streamId cursors = httpJsonNoBody
status204
[(ok200, errorCursorAlreadyCommitted)]
( setRequestMethod "POST"
. addRequestHeader "X-Nakadi-StreamId" (encodeUtf8 (unStreamId streamId))
. setRequestBodyJSON cursors
. setRequestPath (path subscriptionId)
)
subscriptionCursorCommit
:: (MonadNakadi b m, MonadCatch m, HasNakadiSubscriptionCursor a)
=> SubscriptionEventStream
-> [a]
-> m ()
subscriptionCursorCommit SubscriptionEventStream {..} as = Safe.catchJust
exceptionPredicate
(subscriptionCursorCommit' _subscriptionId _streamId cursorsCommit)
(const (return ()))
where
exceptionPredicate = \case
CursorAlreadyCommitted _ -> Just ()
_ -> Nothing
cursors = map (^. L.subscriptionCursor) as
cursorsCommit = SubscriptionCursorCommit cursors
subscriptionCursors
:: MonadNakadi b m
=> SubscriptionId
-> m [SubscriptionCursor]
subscriptionCursors subscriptionId =
httpJsonBody ok200 [] (setRequestMethod "GET" . setRequestPath (path subscriptionId))
subscriptionCursorsReset
:: MonadNakadi b m
=> SubscriptionId
-> [SubscriptionCursorWithoutToken]
-> m ()
subscriptionCursorsReset subscriptionId cursors = httpJsonNoBody
status204
[(status404, errorSubscriptionNotFound), (status409, errorCursorResetInProgress)]
(setRequestMethod "PATCH" . setRequestPath (path subscriptionId) . setRequestBodyJSON
(Object (HashMap.fromList [("items", toJSON cursors)]))
)