{-| Module : Network.Nakadi.Internal.Http Description : Nakadi Client HTTP (Internal) Copyright : (c) Moritz Schulte 2017 License : BSD3 Maintainer : mtesseract@silverratio.net Stability : experimental Portability : POSIX Internal module containing HTTP client relevant code. -} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} module Network.Nakadi.Internal.Http ( module Network.HTTP.Simple , module Network.HTTP.Types.Status , module Network.Nakadi.Internal.Types , httpJsonBody , httpJsonNoBody , httpJsonBodyStream , errorClientNotAuthenticated , errorUnprocessableEntity , errorAccessForbidden , errorTooManyRequests , errorBadRequest , errorSubscriptionNotFound , errorCursorAlreadyCommitted , errorCursorResetInProgress , errorEventTypeNotFound , errorSubscriptionExistsAlready , errorBatchPartiallySubmitted , errorBatchNotSubmitted , setRequestQueryParameters ) where import Network.Nakadi.Internal.Prelude import Conduit import Control.Arrow import Control.Lens import Control.Monad (void) import Control.Monad.Reader import Control.Monad.Trans.Resource import Data.Aeson import qualified Data.ByteString.Lazy as ByteString.Lazy import qualified Data.Conduit.Binary as Conduit import qualified Data.Text as Text import Network.HTTP.Client (BodyReader, HttpException (..), HttpExceptionContent (..), checkResponse, responseStatus) import Network.HTTP.Client.Conduit (bodyReaderSource) import Network.HTTP.Simple import Network.HTTP.Types import Network.HTTP.Types.Status import qualified Network.Nakadi.Internal.Lenses as L import Network.Nakadi.Internal.Retry import Network.Nakadi.Internal.Types import Network.Nakadi.Internal.Util conduitDecode :: (MonadIO m, FromJSON a) => Config -- ^ Configuration, containing the -- deserialization-failure-callback. -> ConduitM ByteString a m () -- ^ Conduit deserializing bytestrings -- into custom values conduitDecode Config { .. } = awaitForever $ \a -> case eitherDecodeStrict a of Right v -> yield v Left err -> liftIO $ callback a (Text.pack err) where callback = fromMaybe dummyCallback _deserializationFailureCallback dummyCallback _ _ = return () -- | Throw 'HttpException' exception on server errors (5xx). checkNakadiResponse :: Request -> Response BodyReader -> IO () checkNakadiResponse request response = when (statusCode (responseStatus response) `div` 100 == 5) $ throwIO $ HttpExceptionRequest request (StatusCodeException (void response) mempty) httpBuildRequest :: (MonadIO m, MonadCatch m) => Config -- ^ Configuration, contains the impure request modifier -> (Request -> Request) -- ^ Pure request modifier -> m Request -- ^ Resulting request to execute httpBuildRequest Config { .. } requestDef = let manager = _manager request = requestDef _requestTemplate & setRequestManager manager & (\req -> req { checkResponse = checkNakadiResponse }) in modifyRequest _requestModifier request -- | Modify the Request based on a user function in the configuration. modifyRequest :: (MonadIO m, MonadCatch m) => (Request -> IO Request) -> Request -> m Request modifyRequest rm request = tryAny (liftIO (rm request)) >>= \case Right modifiedRequest -> return modifiedRequest Left exn -> throwIO $ RequestModificationException exn -- | Executes an HTTP request using the provided configuration and a -- pure request modifier. httpExecRequest :: (MonadIO m, MonadMask m) => Config -> (Request -> Request) -> m (Response ByteString.Lazy.ByteString) httpExecRequest config requestDef = do req <- httpBuildRequest config requestDef retryAction config req (liftIO . (config^.L.http.L.httpLbs)) -- | Executes an HTTP request using the provided configuration and a -- pure request modifier. Returns the HTTP response and separately the -- response status. httpExecRequestWithStatus :: (MonadIO m, MonadMask m) => Config -- ^ Configuration -> (Request -> Request) -- ^ Pure request modifier -> m (Response ByteString.Lazy.ByteString, Status) httpExecRequestWithStatus config requestDef = (identity &&& getResponseStatus) <$> httpExecRequest config requestDef httpJsonBody :: (MonadMask m, MonadIO m, FromJSON a) => Config -> Status -> [(Status, ByteString.Lazy.ByteString -> m NakadiException)] -> (Request -> Request) -> m a httpJsonBody config successStatus exceptionMap requestDef = do (response, status) <- httpExecRequestWithStatus config requestDef if status == successStatus then decodeThrow (getResponseBody response) else case lookup status exceptionMap' of Just mkExn -> mkExn (getResponseBody response) >>= throwIO Nothing -> throwIO (UnexpectedResponse (void response)) where exceptionMap' = exceptionMap ++ defaultExceptionMap httpJsonNoBody :: (MonadMask m, MonadIO m) => Config -> Status -> [(Status, ByteString.Lazy.ByteString -> m NakadiException)] -> (Request -> Request) -> m () httpJsonNoBody config successStatus exceptionMap requestDef = do (response, status) <- httpExecRequestWithStatus config requestDef unless (status == successStatus) $ case lookup status exceptionMap' of Just mkExn -> mkExn (getResponseBody response) >>= throwIO Nothing -> throwIO (UnexpectedResponse (void response)) where exceptionMap' = exceptionMap ++ defaultExceptionMap httpJsonBodyStream :: (MonadMask m, MonadIO m, FromJSON a, MonadBaseControl IO m, MonadResource m) => Config -> Status -> (Response () -> Either Text b) -> [(Status, ByteString.Lazy.ByteString -> m NakadiException)] -> (Request -> Request) -> m (b, ConduitM () a (ReaderT r m) ()) httpJsonBodyStream config successStatus f exceptionMap requestDef = do request <- httpBuildRequest config requestDef let manager = config^.L.manager responseOpen = config^.L.http.L.responseOpen responseClose = config^.L.http.L.responseClose responseOpenRetry = retryAction config request (flip responseOpen manager) (_, response) <- allocate responseOpenRetry responseClose let response_ = void response bodySource = bodyReaderSource (getResponseBody response) status = getResponseStatus response if status == successStatus then do connectCallback (config^.L.logFunc) response_ let conduit = bodySource .| Conduit.lines .| conduitDecode config case f response_ of Left errMsg -> throwString (Text.unpack errMsg) Right b -> return (b, readerC (const conduit)) else case lookup status exceptionMap' of Just mkExn -> conduitDrainToLazyByteString bodySource >>= mkExn >>= throwIO Nothing -> throwIO (UnexpectedResponse response_) where exceptionMap' = exceptionMap ++ defaultExceptionMap connectCallback = case config^.L.streamConnectCallback of Just cb -> \logFunc response -> liftIO $ cb logFunc response Nothing -> \_ _ -> return () setRequestQueryParameters :: [(ByteString, ByteString)] -> Request -> Request setRequestQueryParameters parameters = setRequestQueryString parameters' where parameters' = map (fmap Just) parameters defaultExceptionMap :: MonadThrow m => [(Status, ByteString.Lazy.ByteString -> m NakadiException)] defaultExceptionMap = [ (status401, errorClientNotAuthenticated) , (status403, errorAccessForbidden) , (status400, errorBadRequest) , (status422, errorUnprocessableEntity) , (status409, errorConflict) ] errorClientNotAuthenticated :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorClientNotAuthenticated s = ClientNotAuthenticated <$> decodeThrow s errorConflict :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorConflict s = Conflict <$> decodeThrow s errorUnprocessableEntity :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorUnprocessableEntity s = UnprocessableEntity <$> decodeThrow s errorAccessForbidden :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorAccessForbidden s = AccessForbidden <$> decodeThrow s errorTooManyRequests :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorTooManyRequests s = TooManyRequests <$> decodeThrow s errorBadRequest :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorBadRequest s = BadRequest <$> decodeThrow s errorSubscriptionNotFound :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorSubscriptionNotFound s = SubscriptionNotFound <$> decodeThrow s errorCursorAlreadyCommitted :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorCursorAlreadyCommitted s = CursorAlreadyCommitted <$> decodeThrow s errorCursorResetInProgress :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorCursorResetInProgress s = CursorResetInProgress <$> decodeThrow s errorEventTypeNotFound :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorEventTypeNotFound s = EventTypeNotFound <$> decodeThrow s errorSubscriptionExistsAlready :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorSubscriptionExistsAlready s = SubscriptionExistsAlready <$> decodeThrow s errorBatchPartiallySubmitted :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorBatchPartiallySubmitted s = BatchPartiallySubmitted <$> decodeThrow s errorBatchNotSubmitted :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException errorBatchNotSubmitted s = BatchNotSubmitted <$> decodeThrow s