{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Client.Run where

import Control.Concurrent.STM (check)
import UnliftIO.Async
import UnliftIO.Concurrent
import qualified UnliftIO.Exception as E
import UnliftIO.STM

import Imports
import Network.HTTP2.Arch
import Network.HTTP2.Client.Types
import Network.HTTP2.Frame

-- | Client configuration
data ClientConfig = ClientConfig {
    ClientConfig -> Scheme
scheme     :: Scheme    -- ^ https or http
  , ClientConfig -> Scheme
authority  :: Authority -- ^ Server name
  , ClientConfig -> Int
cacheLimit :: Int       -- ^ How many pushed responses are contained in the cache
  }

-- | Running HTTP/2 client.
run :: ClientConfig -> Config -> Client a -> IO a
run :: forall a. ClientConfig -> Config -> Client a -> IO a
run ClientConfig{Int
Scheme
cacheLimit :: Int
authority :: Scheme
scheme :: Scheme
cacheLimit :: ClientConfig -> Int
authority :: ClientConfig -> Scheme
scheme :: ClientConfig -> Scheme
..} conf :: Config
conf@Config{Int
Buffer
Manager
Int -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confTimeoutManager :: Config -> Manager
confPositionReadMaker :: Config -> PositionReadMaker
confReadN :: Config -> Int -> IO Scheme
confSendAll :: Config -> Scheme -> IO ()
confBufferSize :: Config -> Int
confWriteBuffer :: Config -> Buffer
confTimeoutManager :: Manager
confPositionReadMaker :: PositionReadMaker
confReadN :: Int -> IO Scheme
confSendAll :: Scheme -> IO ()
confBufferSize :: Int
confWriteBuffer :: Buffer
..} Client a
client = do
    RoleInfo
clientInfo <- Scheme -> Scheme -> Int -> IO RoleInfo
newClientInfo Scheme
scheme Scheme
authority Int
cacheLimit
    Context
ctx <- RoleInfo -> Int -> IO Context
newContext RoleInfo
clientInfo Int
confBufferSize
    Manager
mgr <- Manager -> IO Manager
start Manager
confTimeoutManager
    let runBackgroundThreads :: IO ()
runBackgroundThreads = do
            let runReceiver :: IO ()
runReceiver = Context -> Config -> IO ()
frameReceiver Context
ctx Config
conf
                runSender :: IO ()
runSender   = Context -> Config -> Manager -> IO ()
frameSender   Context
ctx Config
conf Manager
mgr
            forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
concurrently_ IO ()
runReceiver IO ()
runSender
    Config -> Context -> IO ()
exchangeSettings Config
conf Context
ctx
    let runClient :: IO a
runClient = do
            a
x <- Client a
client forall a b. (a -> b) -> a -> b
$ forall a.
Context
-> Manager
-> Scheme
-> Scheme
-> Request
-> (Response -> IO a)
-> IO a
sendRequest Context
ctx Manager
mgr Scheme
scheme Scheme
authority
            let frame :: Scheme
frame = Int -> ErrorCode -> Scheme -> Scheme
goawayFrame Int
0 ErrorCode
NoError Scheme
"graceful closing"
            TQueue Control -> Control -> IO ()
enqueueControl (Context -> TQueue Control
controlQ Context
ctx) forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [Scheme] -> Control
CFrames forall a. Maybe a
Nothing [Scheme
frame]
            forall (m :: * -> *) a. Monad m => a -> m a
return a
x
    Either () a
ex <- forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race IO ()
runBackgroundThreads IO a
runClient forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`E.finally` Manager -> IO ()
stop Manager
mgr
    case Either () a
ex of
      Left () -> forall a. HasCallStack => a
undefined -- never reach
      Right a
x -> forall (m :: * -> *) a. Monad m => a -> m a
return a
x

sendRequest :: Context -> Manager -> Scheme -> Authority -> Request -> (Response -> IO a) -> IO a
sendRequest :: forall a.
Context
-> Manager
-> Scheme
-> Scheme
-> Request
-> (Response -> IO a)
-> IO a
sendRequest ctx :: Context
ctx@Context{TVar Int
IORef Bool
IORef Int
IORef (Maybe Int)
IORef (Maybe SettingsList)
IORef Settings
TQueue Control
TQueue (Output Stream)
DynamicTable
Rate
StreamTable
RoleInfo
Role
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxConnectionInc :: Context -> IORef Int
txConnectionWindow :: Context -> TVar Int
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
outputQStreamID :: Context -> TVar Int
outputQ :: Context -> TQueue (Output Stream)
outputBufferLimit :: Context -> IORef Int
peerStreamId :: Context -> IORef Int
myStreamId :: Context -> IORef Int
continued :: Context -> IORef (Maybe Int)
concurrency :: Context -> IORef Int
streamTable :: Context -> StreamTable
peerSettings :: Context -> IORef Settings
mySettings :: Context -> IORef Settings
myPendingAlist :: Context -> IORef (Maybe SettingsList)
myFirstSettings :: Context -> IORef Bool
roleInfo :: Context -> RoleInfo
role :: Context -> Role
emptyFrameRate :: Rate
settingsRate :: Rate
pingRate :: Rate
rxConnectionInc :: IORef Int
txConnectionWindow :: TVar Int
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQStreamID :: TVar Int
outputQ :: TQueue (Output Stream)
outputBufferLimit :: IORef Int
peerStreamId :: IORef Int
myStreamId :: IORef Int
continued :: IORef (Maybe Int)
concurrency :: IORef Int
streamTable :: StreamTable
peerSettings :: IORef Settings
mySettings :: IORef Settings
myPendingAlist :: IORef (Maybe SettingsList)
myFirstSettings :: IORef Bool
roleInfo :: RoleInfo
role :: Role
controlQ :: Context -> TQueue Control
..} Manager
mgr Scheme
scheme Scheme
auth (Request OutObj
req) Response -> IO a
processResponse = do
    -- Checking push promises
    let hdr0 :: [Header]
hdr0 = OutObj -> [Header]
outObjHeaders OutObj
req
        method :: Scheme
method = forall a. a -> Maybe a -> a
fromMaybe (forall a. HasCallStack => [Char] -> a
error [Char]
"sendRequest:method") forall a b. (a -> b) -> a -> b
$ forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":method" [Header]
hdr0
        path :: Scheme
path   = forall a. a -> Maybe a -> a
fromMaybe (forall a. HasCallStack => [Char] -> a
error [Char]
"sendRequest:path") forall a b. (a -> b) -> a -> b
$ forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":path" [Header]
hdr0
    Maybe Stream
mstrm0 <- Scheme -> Scheme -> RoleInfo -> IO (Maybe Stream)
lookupCache Scheme
method Scheme
path RoleInfo
roleInfo
    Stream
strm <- case Maybe Stream
mstrm0 of
      Maybe Stream
Nothing -> do
          -- Arch/Sender is originally implemented for servers where
          -- the ordering of responses can be out-of-order.
          -- But for clients, the ordering must be maintained.
          -- To implement this, 'outputQStreamID' is used.
          -- Also, for 'OutBodyStreaming', TBQ must not be empty
          -- when its 'Output' is enqueued into 'outputQ'.
          -- Otherwise, it would be re-enqueue because of empty
          -- resulting in out-of-order.
          -- To implement this, 'tbqNonMmpty' is used.
          let hdr1 :: [Header]
hdr1 | Scheme
scheme forall a. Eq a => a -> a -> Bool
/= Scheme
"" = (HeaderName
":scheme", Scheme
scheme) forall a. a -> [a] -> [a]
: [Header]
hdr0
                   | Bool
otherwise    = [Header]
hdr0
              hdr2 :: [Header]
hdr2 | Scheme
auth forall a. Eq a => a -> a -> Bool
/= Scheme
"" = (HeaderName
":authority", Scheme
auth) forall a. a -> [a] -> [a]
: [Header]
hdr1
                   | Bool
otherwise  = [Header]
hdr1
              req' :: OutObj
req' = OutObj
req { outObjHeaders :: [Header]
outObjHeaders = [Header]
hdr2 }
          Int
sid <- Context -> IO Int
getMyNewStreamId Context
ctx
          Stream
newstrm <- Context -> Int -> FrameType -> IO Stream
openStream Context
ctx Int
sid FrameType
FrameHeaders
          case OutObj -> OutBody
outObjBody OutObj
req of
            OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
                TBQueue StreamingChunk
tbq <- forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
10 -- fixme: hard coding: 10
                TVar Bool
tbqNonMmpty <- forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
False
                Manager -> IO () -> IO ()
forkManaged Manager
mgr forall a b. (a -> b) -> a -> b
$ do
                    let push :: Builder -> m ()
push Builder
b = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
                            forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Builder -> StreamingChunk
StreamingBuilder Builder
b)
                            forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
tbqNonMmpty Bool
True
                        flush :: IO ()
flush  = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
                    (Builder -> IO ()) -> IO () -> IO ()
strmbdy forall {m :: * -> *}. MonadIO m => Builder -> m ()
push IO ()
flush
                    forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFinished
                forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
                    Int
sidOK <- forall a. TVar a -> STM a
readTVar TVar Int
outputQStreamID
                    Bool
ready <- forall a. TVar a -> STM a
readTVar TVar Bool
tbqNonMmpty
                    Bool -> STM ()
check (Int
sidOK forall a. Eq a => a -> a -> Bool
== Int
sid Bool -> Bool -> Bool
&& Bool
ready)
                    forall a. TVar a -> a -> STM ()
writeTVar TVar Int
outputQStreamID (Int
sid forall a. Num a => a -> a -> a
+ Int
2)
                    forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Output Stream)
outputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output Stream
newstrm OutObj
req' OutputType
OObj (forall a. a -> Maybe a
Just TBQueue StreamingChunk
tbq) (forall (m :: * -> *) a. Monad m => a -> m a
return ())
            OutBody
_ -> forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
                Int
sidOK <- forall a. TVar a -> STM a
readTVar TVar Int
outputQStreamID
                Bool -> STM ()
check (Int
sidOK forall a. Eq a => a -> a -> Bool
== Int
sid)
                forall a. TVar a -> a -> STM ()
writeTVar TVar Int
outputQStreamID (Int
sid forall a. Num a => a -> a -> a
+ Int
2)
                forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Output Stream)
outputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output Stream
newstrm OutObj
req' OutputType
OObj forall a. Maybe a
Nothing (forall (m :: * -> *) a. Monad m => a -> m a
return ())
          forall (m :: * -> *) a. Monad m => a -> m a
return Stream
newstrm
      Just Stream
strm0 -> forall (m :: * -> *) a. Monad m => a -> m a
return Stream
strm0
    InpObj
rsp <- forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar forall a b. (a -> b) -> a -> b
$ Stream -> MVar InpObj
streamInput Stream
strm
    Response -> IO a
processResponse forall a b. (a -> b) -> a -> b
$ InpObj -> Response
Response InpObj
rsp

exchangeSettings :: Config -> Context -> IO ()
exchangeSettings :: Config -> Context -> IO ()
exchangeSettings Config
conf ctx :: Context
ctx@Context{TVar Int
IORef Bool
IORef Int
IORef (Maybe Int)
IORef (Maybe SettingsList)
IORef Settings
TQueue Control
TQueue (Output Stream)
DynamicTable
Rate
StreamTable
RoleInfo
Role
emptyFrameRate :: Rate
settingsRate :: Rate
pingRate :: Rate
rxConnectionInc :: IORef Int
txConnectionWindow :: TVar Int
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQStreamID :: TVar Int
outputQ :: TQueue (Output Stream)
outputBufferLimit :: IORef Int
peerStreamId :: IORef Int
myStreamId :: IORef Int
continued :: IORef (Maybe Int)
concurrency :: IORef Int
streamTable :: StreamTable
peerSettings :: IORef Settings
mySettings :: IORef Settings
myPendingAlist :: IORef (Maybe SettingsList)
myFirstSettings :: IORef Bool
roleInfo :: RoleInfo
role :: Role
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxConnectionInc :: Context -> IORef Int
txConnectionWindow :: Context -> TVar Int
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
outputQStreamID :: Context -> TVar Int
outputQ :: Context -> TQueue (Output Stream)
outputBufferLimit :: Context -> IORef Int
peerStreamId :: Context -> IORef Int
myStreamId :: Context -> IORef Int
continued :: Context -> IORef (Maybe Int)
concurrency :: Context -> IORef Int
streamTable :: Context -> StreamTable
peerSettings :: Context -> IORef Settings
mySettings :: Context -> IORef Settings
myPendingAlist :: Context -> IORef (Maybe SettingsList)
myFirstSettings :: Context -> IORef Bool
roleInfo :: Context -> RoleInfo
role :: Context -> Role
controlQ :: Context -> TQueue Control
..} = do
    [Scheme]
frames <- Config -> Context -> IO [Scheme]
updateMySettings Config
conf Context
ctx
    let setframe :: Control
setframe = Maybe SettingsList -> [Scheme] -> Control
CFrames forall a. Maybe a
Nothing (Scheme
connectionPrefaceforall a. a -> [a] -> [a]
:[Scheme]
frames)
    TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ Control
setframe