{-| Module : Network.Nakadi.Subscriptions.Cursors Description : Implementation of Nakadi Subscription Cursors API Copyright : (c) Moritz Schulte 2017, 2018 License : BSD3 Maintainer : mtesseract@silverratio.net Stability : experimental Portability : POSIX This module implements the @\/subscriptions\/SUBSCRIPTIONS\/cursors@ API. -} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE NoImplicitPrelude #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RecordWildCards #-} module Network.Nakadi.Subscriptions.Cursors ( subscriptionCursorCommit' , subscriptionCursorCommit , subscriptionCursors , subscriptionCursorsReset ) where import Network.Nakadi.Internal.Prelude import Data.Aeson import qualified Control.Exception.Safe as Safe import Control.Lens import qualified Data.HashMap.Lazy as HashMap import Network.Nakadi.Internal.Conversions import Network.Nakadi.Internal.Http import Network.Nakadi.Internal.Lenses (HasNakadiSubscriptionCursor) import qualified Network.Nakadi.Internal.Lenses as L path :: SubscriptionId -> ByteString path subscriptionId = "/subscriptions/" <> subscriptionIdToByteString subscriptionId <> "/cursors" -- | @POST@ to @\/subscriptions\/SUBSCRIPTION-ID\/cursors@. Commits -- cursors using low level interface. subscriptionCursorCommit' :: MonadNakadi b m => SubscriptionId -- ^ Subsciption ID -> StreamId -- ^ Stream ID -> SubscriptionCursorCommit -- ^ Subscription Cursor to commit -> m () subscriptionCursorCommit' subscriptionId streamId cursors = httpJsonNoBody status204 [(ok200, errorCursorAlreadyCommitted)] (setRequestMethod "POST" . addRequestHeader "X-Nakadi-StreamId" (encodeUtf8 (unStreamId streamId)) . setRequestBodyJSON cursors . setRequestPath (path subscriptionId)) -- | @POST@ to @\/subscriptions\/SUBSCRIPTION\/cursors@. Commits -- cursors using high level interface. subscriptionCursorCommit :: (MonadNakadi b m, MonadCatch m, HasNakadiSubscriptionCursor a) => SubscriptionEventStream -> [a] -- ^ Values containing Subscription Cursors to commit -> m () subscriptionCursorCommit SubscriptionEventStream { .. } as = do Safe.catchJust exceptionPredicate (subscriptionCursorCommit' _subscriptionId _streamId cursorsCommit) (const (return ())) where exceptionPredicate = \case CursorAlreadyCommitted _ -> Just () _ -> Nothing cursors = map (^. L.subscriptionCursor) as cursorsCommit = SubscriptionCursorCommit cursors -- | @GET@ to @\/subscriptions\/SUBSCRIPTION\/cursors@. Retrieves -- subscriptions cursors. subscriptionCursors :: MonadNakadi b m => SubscriptionId -- ^ Subscription ID -> m [SubscriptionCursor] -- ^ Subscription Cursors for the specified Subscription subscriptionCursors subscriptionId = httpJsonBody ok200 [] (setRequestMethod "GET" . setRequestPath (path subscriptionId)) -- | @PATCH@ to @\/subscriptions\/SUBSCRIPTION\/cursors@. Resets -- subscriptions cursors. subscriptionCursorsReset :: MonadNakadi b m => SubscriptionId -- ^ Subscription ID -> [SubscriptionCursorWithoutToken] -- ^ Subscription Cursors to reset -> m () subscriptionCursorsReset subscriptionId cursors = httpJsonNoBody status204 [ (status404, errorSubscriptionNotFound) , (status409, errorCursorResetInProgress) ] (setRequestMethod "PATCH" . setRequestPath (path subscriptionId) . setRequestBodyJSON (Object (HashMap.fromList [("items", toJSON cursors)])))