{-| Module : Network.Nakadi.Internal.Http Description : Nakadi Client HTTP (Internal) Copyright : (c) Moritz Clasmeier 2017, 2018 License : BSD3 Maintainer : mtesseract@silverratio.net Stability : experimental Portability : POSIX Internal module containing HTTP client relevant code. -} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GADTs #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeFamilies #-} module Network.Nakadi.Internal.Http ( module Network.HTTP.Simple , module Network.HTTP.Types.Status , module Network.Nakadi.Internal.Types , httpJsonBody , httpJsonNoBody , httpJsonBodyStream , httpBuildRequest , conduitDecode , includeFlowId , errorClientNotAuthenticated , errorUnprocessableEntity , errorAccessForbidden , errorTooManyRequests , errorBadRequest , errorSubscriptionNotFound , errorCursorAlreadyCommitted , errorCursorResetInProgress , errorEventTypeNotFound , errorSubscriptionExistsAlready , errorBatchPartiallySubmitted , errorBatchNotSubmitted , setRequestQueryParameters ) where import Network.Nakadi.Internal.Prelude import Conduit hiding ( throwM ) import Control.Arrow import Control.Lens import Control.Monad ( void ) import Data.Aeson import qualified Data.ByteString.Lazy as ByteString.Lazy import qualified Data.ByteString.Lazy as LB import qualified Data.Text as Text import Network.HTTP.Client ( BodyReader , HttpException(..) , HttpExceptionContent(..) , Manager , checkResponse , responseBody , responseStatus ) import Network.HTTP.Simple hiding ( Proxy ) import Network.HTTP.Types import Network.HTTP.Types.Status import qualified Network.Nakadi.Internal.Lenses as L import Network.Nakadi.Internal.Types import Network.Nakadi.Internal.Util -- | If no deserializationFailureCallback is set in the provided -- configuration (which is the default), a -- DeserializationFailureCallback exception will be thrown. Otherwise, -- simply run the callback. conduitDecode :: forall a b m . (FromJSON a, MonadNakadi b m) => ConduitM ByteString a m () -- ^ Conduit deserializing bytestrings -- into custom values conduitDecode = do config <- lift nakadiAsk awaitForever $ \ a -> case eitherDecodeStrict' a of Right v -> yield v Left err -> lift . nakadiLiftBase $ callback config a (Text.pack err) where callback config bs err = case config^.L.deserializationFailureCallback of Nothing -> throwM $ DeserializationFailure bs err Just cb -> cb bs err -- | Throw 'HttpException' exception on server errors (5xx). checkNakadiResponse :: Request -> Response BodyReader -> IO () checkNakadiResponse request response = when (statusCode (responseStatus response) `div` 100 == 5) $ throwIO $ HttpExceptionRequest request (StatusCodeException (void response) mempty) httpBuildRequest :: MonadNakadi b m => (Request -> Request) -- ^ Pure request modifier -> m Request -- ^ Resulting request to execute httpBuildRequest requestDef = do config <- nakadiAsk let request = requestDef (config ^. L.requestTemplate) & \req -> req { checkResponse = checkNakadiResponse } modifyRequest (config ^. L.requestModifier) request -- | Modify the Request based on a user function in the configuration. modifyRequest :: MonadNakadi b m => (Request -> b Request) -> Request -> m Request modifyRequest rm request = tryAny (nakadiLiftBase (rm request)) >>= \case Right modifiedRequest -> return modifiedRequest Left exn -> throwIO $ RequestModificationException exn -- | Executes an HTTP request using the provided configuration and a -- pure request modifier. httpExecRequest :: MonadNakadi b m => (Request -> Request) -> m (Response ByteString.Lazy.ByteString) httpExecRequest requestDef = do config <- nakadiAsk req <- httpBuildRequest requestDef nakadiLiftBase $ nakadiHttpLbs config req (config ^. L.manager) nakadiHttpLbs :: Config b -> Request -> Maybe Manager -> b (Response LB.ByteString) nakadiHttpLbs config = (config ^. L.http . L.httpLbs) config -- | Executes an HTTP request using the provided configuration and a -- pure request modifier. Returns the HTTP response and separately the -- response status. httpExecRequestWithStatus :: MonadNakadi b m => (Request -> Request) -- ^ Pure request modifier -> m (Response ByteString.Lazy.ByteString, Status) httpExecRequestWithStatus requestDef = (identity &&& getResponseStatus) <$> httpExecRequest requestDef httpJsonBody :: (MonadNakadi b m, FromJSON a) => Status -> [(Status, ByteString.Lazy.ByteString -> m NakadiException)] -> (Request -> Request) -> m a httpJsonBody successStatus exceptionMap requestDef = do (response, status) <- httpExecRequestWithStatus requestDef if status == successStatus then decodeThrow (getResponseBody response) else case lookup status exceptionMap' of Just mkExn -> mkExn (getResponseBody response) >>= throwM Nothing -> throwM (UnexpectedResponse (void response)) where exceptionMap' = exceptionMap ++ defaultExceptionMap httpJsonNoBody :: MonadNakadi b m => Status -> [(Status, ByteString.Lazy.ByteString -> m NakadiException)] -> (Request -> Request) -> m () httpJsonNoBody successStatus exceptionMap requestDef = do (response, status) <- httpExecRequestWithStatus requestDef unless (status == successStatus) $ case lookup status exceptionMap' of Just mkExn -> mkExn (getResponseBody response) >>= throwIO Nothing -> throwIO (UnexpectedResponse (void response)) where exceptionMap' = exceptionMap ++ defaultExceptionMap nakadiHttpResponseOpen :: Config b -> Request -> Maybe Manager -> b (Response (ConduitM () ByteString b ())) nakadiHttpResponseOpen config = (config ^. L.http . L.httpResponseOpen) config nakadiHttpResponseClose :: Config b -> Response () -> b () nakadiHttpResponseClose = view (L.http . L.httpResponseClose) httpJsonBodyStream :: forall b m r . (MonadNakadi b m, MonadMask m) => Status -> [(Status, ByteString.Lazy.ByteString -> m NakadiException)] -> (Request -> Request) -> (Response (ConduitM () ByteString m ()) -> m r) -> m r httpJsonBodyStream successStatus exceptionMap requestDef handler = do config <- nakadiAsk request <- httpBuildRequest requestDef bracket (nakadiLiftBase $ nakadiHttpResponseOpen config request (config ^. L.manager)) (nakadiLiftBase . nakadiHttpResponseClose config . void) $ \response -> wrappedHandler config response where wrappedHandler :: Config b -> Response (ConduitM () ByteString b ()) -> m r wrappedHandler config response = do let response_ = void response status = responseStatus response_ bodySource = responseBody responseLifted responseLifted = fmap (transPipe nakadiLiftBase) response if status == successStatus then do connectCallback config response_ handler responseLifted else case lookup status exceptionMap' of Just mkExn -> conduitDrainToLazyByteString bodySource >>= mkExn >>= throwM Nothing -> throwM (UnexpectedResponse response_) exceptionMap' = exceptionMap ++ defaultExceptionMap -- connectCallback :: s -> t -> m () connectCallback config response = nakadiLiftBase $ case config ^. L.streamConnectCallback of Just cb -> cb response Nothing -> pure () setRequestQueryParameters :: [(ByteString, ByteString)] -> Request -> Request setRequestQueryParameters parameters = setRequestQueryString parameters' where parameters' = map (fmap Just) parameters includeFlowId :: Config b -> Request -> Request includeFlowId config = case config ^. L.flowId of Just flowId -> setRequestHeader "X-Flow-Id" [encodeUtf8 (unFlowId flowId)] Nothing -> identity defaultExceptionMap :: MonadThrow m => [(Status, ByteString.Lazy.ByteString -> m NakadiException)] defaultExceptionMap = [ (status401, errorClientNotAuthenticated) , (status403, errorAccessForbidden) , (status400, errorBadRequest) , (status422, errorUnprocessableEntity) , (status409, errorConflict) ] errorClientNotAuthenticated :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorClientNotAuthenticated s = ClientNotAuthenticated <$> decodeThrow s errorConflict :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorConflict s = Conflict <$> decodeThrow s errorUnprocessableEntity :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorUnprocessableEntity s = UnprocessableEntity <$> decodeThrow s errorAccessForbidden :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorAccessForbidden s = AccessForbidden <$> decodeThrow s errorTooManyRequests :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorTooManyRequests s = TooManyRequests <$> decodeThrow s errorBadRequest :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorBadRequest s = BadRequest <$> decodeThrow s errorSubscriptionNotFound :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorSubscriptionNotFound s = SubscriptionNotFound <$> decodeThrow s errorCursorAlreadyCommitted :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorCursorAlreadyCommitted s = CursorAlreadyCommitted <$> decodeThrow s errorCursorResetInProgress :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorCursorResetInProgress s = CursorResetInProgress <$> decodeThrow s errorEventTypeNotFound :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorEventTypeNotFound s = EventTypeNotFound <$> decodeThrow s errorSubscriptionExistsAlready :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorSubscriptionExistsAlready s = SubscriptionExistsAlready <$> decodeThrow s errorBatchPartiallySubmitted :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorBatchPartiallySubmitted s = BatchPartiallySubmitted <$> decodeThrow s errorBatchNotSubmitted :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorBatchNotSubmitted s = BatchNotSubmitted <$> decodeThrow s