{-|
Module      : Network.Nakadi.Internal.Http
Description : Nakadi Client HTTP (Internal)
Copyright   : (c) Moritz Schulte 2017, 2018
License     : BSD3
Maintainer  : mtesseract@silverratio.net
Stability   : experimental
Portability : POSIX

Internal module containing HTTP client relevant code.
-}

{-# LANGUAGE FlexibleContexts      #-}
{-# LANGUAGE GADTs                 #-}
{-# LANGUAGE LambdaCase            #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE RecordWildCards       #-}
{-# LANGUAGE ScopedTypeVariables   #-}
{-# LANGUAGE TypeFamilies          #-}

module Network.Nakadi.Internal.Http
  ( module Network.HTTP.Simple
  , module Network.HTTP.Types.Status
  , module Network.Nakadi.Internal.Types
  , httpJsonBody
  , httpJsonNoBody
  , httpJsonBodyStream
  , httpBuildRequest
  , conduitDecode
  , includeFlowId
  , errorClientNotAuthenticated
  , errorUnprocessableEntity
  , errorAccessForbidden
  , errorTooManyRequests
  , errorBadRequest
  , errorSubscriptionNotFound
  , errorCursorAlreadyCommitted
  , errorCursorResetInProgress
  , errorEventTypeNotFound
  , errorSubscriptionExistsAlready
  , errorBatchPartiallySubmitted
  , errorBatchNotSubmitted
  , setRequestQueryParameters
  ) where

import           Network.Nakadi.Internal.Prelude

import           Conduit                         hiding (throwM)
import           Control.Arrow
import           Control.Lens
import           Control.Monad                   (void)
import           Data.Aeson
import qualified Data.ByteString.Lazy            as ByteString.Lazy
import qualified Data.ByteString.Lazy            as LB
import qualified Data.Text                       as Text
import           Network.HTTP.Client             (BodyReader,
                                                  HttpException (..),
                                                  HttpExceptionContent (..),
                                                  Manager, checkResponse,
                                                  responseBody, responseStatus)
import           Network.HTTP.Simple             hiding (Proxy)
import           Network.HTTP.Types
import           Network.HTTP.Types.Status
import qualified Network.Nakadi.Internal.Lenses  as L
import           Network.Nakadi.Internal.Types
import           Network.Nakadi.Internal.Util

-- | If no deserializationFailureCallback is set in the provided
-- configuration (which is the default), a
-- DeserializationFailureCallback exception will be thrown. Otherwise,
-- simply run the callback.
conduitDecode
  :: forall a b m
   . (FromJSON a, MonadNakadi b m)
  => Config b -- ^ Configuration, containing the
              -- deserialization-failure-callback.
  -> ConduitM ByteString a m () -- ^ Conduit deserializing bytestrings
                                -- into custom values
conduitDecode config = awaitForever $ \ a -> case eitherDecodeStrict' a of
  Right v   -> yield v
  Left  err -> lift . nakadiLiftBase $ callback a (Text.pack err)
 where
  callback bs err = case config ^. L.deserializationFailureCallback of
    Nothing -> throwM $ DeserializationFailure bs err
    Just cb -> cb bs err

-- | Throw 'HttpException' exception on server errors (5xx).
checkNakadiResponse :: Request -> Response BodyReader -> IO ()
checkNakadiResponse request response =
  when (statusCode (responseStatus response) `div` 100 == 5)
    $ throwIO
    $ HttpExceptionRequest request (StatusCodeException (void response) mempty)

httpBuildRequest
  :: MonadNakadi b m
  => (Request -> Request) -- ^ Pure request modifier
  -> m Request -- ^ Resulting request to execute
httpBuildRequest requestDef = do
  config <- nakadiAsk
  let request = requestDef (config ^. L.requestTemplate)
        & \req -> req { checkResponse = checkNakadiResponse }
  modifyRequest (config ^. L.requestModifier) request

-- | Modify the Request based on a user function in the configuration.
modifyRequest
  :: MonadNakadi b m => (Request -> b Request) -> Request -> m Request
modifyRequest rm request = tryAny (nakadiLiftBase (rm request)) >>= \case
  Right modifiedRequest -> return modifiedRequest
  Left  exn             -> throwIO $ RequestModificationException exn

-- | Executes an HTTP request using the provided configuration and a
-- pure request modifier.
httpExecRequest
  :: MonadNakadi b m
  => (Request -> Request)
  -> m (Response ByteString.Lazy.ByteString)
httpExecRequest requestDef = do
  config <- nakadiAsk
  req    <- httpBuildRequest requestDef
  nakadiLiftBase $ nakadiHttpLbs config req (config^.L.manager)

nakadiHttpLbs :: Config b
              -> Request
              -> Maybe Manager
              -> b (Response LB.ByteString)
nakadiHttpLbs config req maybeMngr =
  (config^.L.http.L.httpLbs) config req maybeMngr

-- | Executes an HTTP request using the provided configuration and a
-- pure request modifier. Returns the HTTP response and separately the
-- response status.
httpExecRequestWithStatus
  :: MonadNakadi b m
  => (Request -> Request) -- ^ Pure request modifier
  -> m (Response ByteString.Lazy.ByteString, Status)
httpExecRequestWithStatus requestDef =
  (identity &&& getResponseStatus) <$> httpExecRequest requestDef

httpJsonBody
  :: (MonadNakadi b m, FromJSON a)
  => Status
  -> [(Status, ByteString.Lazy.ByteString -> m NakadiException)]
  -> (Request -> Request)
  -> m a
httpJsonBody successStatus exceptionMap requestDef = do
  (response, status) <- httpExecRequestWithStatus requestDef
  if status == successStatus
    then decodeThrow (getResponseBody response)
    else case lookup status exceptionMap' of
      Just mkExn -> mkExn (getResponseBody response) >>= throwM
      Nothing    -> throwM (UnexpectedResponse (void response))
  where exceptionMap' = exceptionMap ++ defaultExceptionMap

httpJsonNoBody
  :: MonadNakadi b m
  => Status
  -> [(Status, ByteString.Lazy.ByteString -> m NakadiException)]
  -> (Request -> Request)
  -> m ()
httpJsonNoBody successStatus exceptionMap requestDef = do
  (response, status) <- httpExecRequestWithStatus requestDef
  unless (status == successStatus) $ case lookup status exceptionMap' of
    Just mkExn -> mkExn (getResponseBody response) >>= throwIO
    Nothing    -> throwIO (UnexpectedResponse (void response))
  where exceptionMap' = exceptionMap ++ defaultExceptionMap

nakadiHttpResponseOpen :: Config b
                       -> Request
                       -> Maybe Manager
                       -> b (Response (ConduitM () ByteString b ()))
nakadiHttpResponseOpen config req maybeMngr =
  (config^.L.http.L.httpResponseOpen) config req maybeMngr

nakadiHttpResponseClose :: Config b -> Response () -> b ()
nakadiHttpResponseClose config rsp =
  (config^.L.http.L.httpResponseClose) rsp

httpJsonBodyStream
  :: forall b m r
   . (MonadNakadi b m, MonadMask m)
  => Status
  -> [(Status, ByteString.Lazy.ByteString -> m NakadiException)]
  -> (Request -> Request)
  -> (Response (ConduitM () ByteString m ()) -> m r)
  -> m r
httpJsonBodyStream successStatus exceptionMap requestDef handler = do
  config  <- nakadiAsk
  request <- httpBuildRequest requestDef
  bracket (nakadiLiftBase $ nakadiHttpResponseOpen config request (config^.L.manager))
          (nakadiLiftBase . nakadiHttpResponseClose config . void)
   $ \response -> wrappedHandler config response

  where wrappedHandler :: Config b -> Response (ConduitM () ByteString b ()) -> m r
        wrappedHandler config response = do
          let response_      = void response
              status         = responseStatus response_
              bodySource     = responseBody responseLifted
              responseLifted = fmap (transPipe nakadiLiftBase) response
          if status == successStatus
            then do connectCallback config response_
                    handler responseLifted
            else case lookup status exceptionMap' of
                   Just mkExn ->
                     conduitDrainToLazyByteString bodySource >>= mkExn >>= throwM
                   Nothing -> throwM (UnexpectedResponse response_)

        exceptionMap' = exceptionMap ++ defaultExceptionMap

        -- connectCallback :: s -> t -> m ()
        connectCallback config response =
          nakadiLiftBase $ case config^.L.streamConnectCallback of
                         Just cb -> cb response
                         Nothing -> pure ()

setRequestQueryParameters :: [(ByteString, ByteString)] -> Request -> Request
setRequestQueryParameters parameters = setRequestQueryString parameters'
  where parameters' = map (fmap Just) parameters

includeFlowId
  :: Config b
  -> Request
  -> Request
includeFlowId config =
  case config^.L.flowId of
    Just flowId -> setRequestHeader "X-Flow-Id" [encodeUtf8 (unFlowId flowId)]
    Nothing     -> identity

defaultExceptionMap
  :: MonadThrow m => [(Status, ByteString.Lazy.ByteString -> m NakadiException)]
defaultExceptionMap =
  [ (status401, errorClientNotAuthenticated)
  , (status403, errorAccessForbidden)
  , (status400, errorBadRequest)
  , (status422, errorUnprocessableEntity)
  , (status409, errorConflict)
  ]

errorClientNotAuthenticated
  :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorClientNotAuthenticated s = ClientNotAuthenticated <$> decodeThrow s

errorConflict :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorConflict s = Conflict <$> decodeThrow s

errorUnprocessableEntity
  :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorUnprocessableEntity s = UnprocessableEntity <$> decodeThrow s

errorAccessForbidden
  :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorAccessForbidden s = AccessForbidden <$> decodeThrow s

errorTooManyRequests
  :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorTooManyRequests s = TooManyRequests <$> decodeThrow s

errorBadRequest
  :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorBadRequest s = BadRequest <$> decodeThrow s

errorSubscriptionNotFound
  :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorSubscriptionNotFound s = SubscriptionNotFound <$> decodeThrow s

errorCursorAlreadyCommitted
  :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorCursorAlreadyCommitted s = CursorAlreadyCommitted <$> decodeThrow s

errorCursorResetInProgress
  :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorCursorResetInProgress s = CursorResetInProgress <$> decodeThrow s

errorEventTypeNotFound
  :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorEventTypeNotFound s = EventTypeNotFound <$> decodeThrow s

errorSubscriptionExistsAlready
  :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorSubscriptionExistsAlready s = SubscriptionExistsAlready <$> decodeThrow s

errorBatchPartiallySubmitted
  :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorBatchPartiallySubmitted s = BatchPartiallySubmitted <$> decodeThrow s

errorBatchNotSubmitted
  :: MonadThrow m => ByteString.Lazy.ByteString -> m NakadiException
errorBatchNotSubmitted s = BatchNotSubmitted <$> decodeThrow s