{-| Module : Network.Nakadi.Subscriptions.Stats Description : Implementation of Nakadi Subscription API Copyright : (c) Moritz Clasmeier 2017, 2018 License : BSD3 Maintainer : mtesseract@silverratio.net Stability : experimental Portability : POSIX This module implements the @\/subscriptions@ API. -} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE TupleSections #-} module Network.Nakadi.Subscriptions ( module Network.Nakadi.Subscriptions.Cursors , module Network.Nakadi.Subscriptions.Events , module Network.Nakadi.Subscriptions.Stats , module Network.Nakadi.Subscriptions.Subscription , subscriptionCreate' , subscriptionCreate , subscriptionsList' , subscriptionsSource , subscriptionsList , withSubscription , withTemporarySubscription ) where import Network.Nakadi.Internal.Prelude import Conduit import qualified Control.Exception.Safe as Safe import Control.Lens import qualified Data.Text as Text import Network.Nakadi.Internal.Http import qualified Network.Nakadi.Internal.Lenses as L import Network.Nakadi.Internal.Util import Network.Nakadi.Subscriptions.Cursors import Network.Nakadi.Subscriptions.Events import Network.Nakadi.Subscriptions.Stats import Network.Nakadi.Subscriptions.Subscription import qualified Data.Set as Set import Data.Set ( Set ) path :: ByteString path = "/subscriptions" -- | @POST@ to @\/subscriptions@. Creates a new subscription. Low -- level interface. subscriptionCreate' :: MonadNakadi b m => SubscriptionRequest -> m Subscription subscriptionCreate' subscription = httpJsonBody status201 [(ok200, errorSubscriptionExistsAlready)] (setRequestMethod "POST" . setRequestPath path . setRequestBodyJSON subscription) -- | @POST@ to @\/subscriptions@. Creates a new subscription. Does not -- fail if the requested subscription does already exist. subscriptionCreate :: (MonadNakadi b m, MonadCatch m) => SubscriptionRequest -> m Subscription subscriptionCreate subscription = Safe.catchJust exceptionPredicate (subscriptionCreate' subscription) return where exceptionPredicate (SubscriptionExistsAlready s) = Just s exceptionPredicate _ = Nothing -- | @GET@ to @\/subscriptions@. Internal low-level interface. subscriptionsGet :: MonadNakadi b m => [(ByteString, ByteString)] -> m SubscriptionsListResponse subscriptionsGet queryParameters = httpJsonBody ok200 [] (setRequestMethod "GET" . setRequestPath path . setRequestQueryParameters queryParameters) buildQueryParameters :: Maybe ApplicationName -> Maybe [EventTypeName] -> Maybe Limit -> Maybe Offset -> [(ByteString, ByteString)] buildQueryParameters maybeOwningApp maybeEventTypeNames maybeLimit maybeOffset = catMaybes $ [ ("owning_application", ) . encodeUtf8 . unApplicationName <$> maybeOwningApp , ("limit", ) . encodeUtf8 . tshow <$> maybeLimit , ("offset", ) . encodeUtf8 . tshow <$> maybeOffset ] ++ case maybeEventTypeNames of Just eventTypeNames -> map (Just . ("event_type", ) . encodeUtf8 . unEventTypeName) eventTypeNames Nothing -> [] -- | @GET@ to @\/subscriptions@. Retrieves all subscriptions matching -- the provided filter criteria. Low-level interface using pagination. subscriptionsList' :: (MonadNakadi b m) => Maybe ApplicationName -> Maybe [EventTypeName] -> Maybe Limit -> Maybe Offset -> m SubscriptionsListResponse subscriptionsList' maybeOwningApp maybeEventTypeNames maybeLimit maybeOffset = subscriptionsGet queryParameters where queryParameters = buildQueryParameters maybeOwningApp maybeEventTypeNames maybeLimit maybeOffset -- | @GET@ to @\/subscriptions@. Retrieves all subscriptions matching -- the provided filter criteria. High-level Conduit interface. subscriptionsSource :: (MonadNakadi b m) => Maybe ApplicationName -> Maybe [EventTypeName] -> m (ConduitM () [Subscription] m ()) subscriptionsSource maybeOwningApp maybeEventTypeNames = pure $ nextPage initialQueryParameters where nextPage queryParameters = do resp <- lift $ subscriptionsGet queryParameters yield (resp ^. L.items) let maybeNextPath = Text.unpack . view L.href <$> (resp ^. L.links . L.next) forM_ (maybeNextPath >>= extractQueryParametersFromPath) nextPage initialQueryParameters = buildQueryParameters maybeOwningApp maybeEventTypeNames Nothing Nothing -- | @GET@ to @\/subscriptions@. Retrieves all subscriptions matching -- the provided filter criteria. High-level list interface. subscriptionsList :: MonadNakadi b m => Maybe ApplicationName -> Maybe [EventTypeName] -> m [Subscription] subscriptionsList maybeOwningApp maybeEventTypeNames = do source <- subscriptionsSource maybeOwningApp maybeEventTypeNames runConduit $ source .| concatC .| sinkList -- | Experimental API. -- -- Creates a new temporary subscription using the specified parameters via `subscriptionCreate` -- and pass it to the provided action. Note that `bracket` is used to enforce the deletion of -- the subscription via `subscriptionDelete` when the provided action returns. -- -- Do NOT use this function if the specified subscription should not be deleted. withTemporarySubscription :: (MonadNakadi b m, MonadMask m) => ApplicationName -> ConsumerGroup -> Set EventTypeName -> SubscriptionPosition -> (Subscription -> m r) -> m r withTemporarySubscription owningApp consumerGroup eventTypeNames subscriptionPosition = bracket (subscriptionCreate subscriptionRequest) (subscriptionDelete . view L.id) where subscriptionRequest = SubscriptionRequest { _owningApplication = owningApp , _eventTypes = Set.toList eventTypeNames , _consumerGroup = Just consumerGroup , _subscriptionPosition = Just subscriptionPosition } -- | Experimental API. -- -- Creates a new subscription using the specified parameters via `subscriptionCreate` -- and pass it to the provided action. withSubscription :: (MonadNakadi b m, MonadMask m) => ApplicationName -> ConsumerGroup -> Set EventTypeName -> SubscriptionPosition -> (Subscription -> m r) -> m r withSubscription owningApp consumerGroup eventTypeNames subscriptionPosition f = subscriptionCreate subscriptionRequest >>= f where subscriptionRequest = SubscriptionRequest { _owningApplication = owningApp , _eventTypes = Set.toList eventTypeNames , _consumerGroup = Just consumerGroup , _subscriptionPosition = Just subscriptionPosition }