{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Client.Run where

import Control.Concurrent.STM (check)
import qualified Data.ByteString.UTF8 as UTF8
import Data.IORef
import Data.IP (IPv6)
import Network.Control (RxFlow (..), defaultMaxData)
import Network.HTTP.Semantics.Client
import Network.HTTP.Semantics.Client.Internal
import Network.HTTP.Semantics.IO
import Network.Socket (SockAddr)
import Text.Read (readMaybe)
import UnliftIO.Async
import UnliftIO.Concurrent
import UnliftIO.Exception
import UnliftIO.STM

import Imports
import Network.HTTP2.Frame
import Network.HTTP2.H2

-- | Client configuration
data ClientConfig = ClientConfig
    { ClientConfig -> Scheme
scheme :: Scheme
    -- ^ https or http
    , ClientConfig -> Authority
authority :: Authority
    -- ^ Server name
    , ClientConfig -> StreamId
cacheLimit :: Int
    -- ^ The maximum number of incoming streams on the net
    , ClientConfig -> StreamId
connectionWindowSize :: WindowSize
    -- ^ The window size of connection.
    , ClientConfig -> Settings
settings :: Settings
    -- ^ Settings
    }
    deriving (ClientConfig -> ClientConfig -> Bool
(ClientConfig -> ClientConfig -> Bool)
-> (ClientConfig -> ClientConfig -> Bool) -> Eq ClientConfig
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: ClientConfig -> ClientConfig -> Bool
== :: ClientConfig -> ClientConfig -> Bool
$c/= :: ClientConfig -> ClientConfig -> Bool
/= :: ClientConfig -> ClientConfig -> Bool
Eq, StreamId -> ClientConfig -> ShowS
[ClientConfig] -> ShowS
ClientConfig -> Authority
(StreamId -> ClientConfig -> ShowS)
-> (ClientConfig -> Authority)
-> ([ClientConfig] -> ShowS)
-> Show ClientConfig
forall a.
(StreamId -> a -> ShowS)
-> (a -> Authority) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: StreamId -> ClientConfig -> ShowS
showsPrec :: StreamId -> ClientConfig -> ShowS
$cshow :: ClientConfig -> Authority
show :: ClientConfig -> Authority
$cshowList :: [ClientConfig] -> ShowS
showList :: [ClientConfig] -> ShowS
Show)

-- | The default client config.
--
-- The @authority@ field will be used to set the HTTP2 @:authority@
-- pseudo-header. In most cases you will want to override it to be equal to
-- @host@.
--
-- Further background on @authority@:
-- [RFC 3986](https://datatracker.ietf.org/doc/html/rfc3986#section-3.2) also
-- allows @host:port@, and most servers will accept this too. However, when
-- using TLS, many servers will expect the TLS SNI server name and the
-- @:authority@ pseudo-header to be equal, and for TLS SNI the server name
-- should not include the port. Note that HTTP2 explicitly /disallows/ using
-- @userinfo\@@ as part of the authority.
--
-- >>> defaultClientConfig
-- ClientConfig {scheme = "http", authority = "localhost", cacheLimit = 64, connectionWindowSize = 16777216, settings = Settings {headerTableSize = 4096, enablePush = True, maxConcurrentStreams = Just 64, initialWindowSize = 262144, maxFrameSize = 16384, maxHeaderListSize = Nothing, pingRateLimit = 10, emptyFrameRateLimit = 4, settingsRateLimit = 4, rstRateLimit = 4}}
defaultClientConfig :: ClientConfig
defaultClientConfig :: ClientConfig
defaultClientConfig =
    ClientConfig
        { scheme :: Scheme
scheme = Scheme
"http"
        , authority :: Authority
authority = Authority
"localhost"
        , cacheLimit :: StreamId
cacheLimit = StreamId
64
        , connectionWindowSize :: StreamId
connectionWindowSize = StreamId
defaultMaxData
        , settings :: Settings
settings = Settings
defaultSettings
        }

-- | Running HTTP/2 client.
run :: ClientConfig -> Config -> Client a -> IO a
run :: forall a. ClientConfig -> Config -> Client a -> IO a
run cconf :: ClientConfig
cconf@ClientConfig{StreamId
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> StreamId
connectionWindowSize :: ClientConfig -> StreamId
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: StreamId
connectionWindowSize :: StreamId
settings :: Settings
..} Config
conf Client a
client = do
    Context
ctx <- ClientConfig -> Config -> IO Context
setup ClientConfig
cconf Config
conf
    Config -> Context -> IO a -> IO a
forall a. Config -> Context -> IO a -> IO a
runH2 Config
conf Context
ctx (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Context -> IO a
runClient Context
ctx
  where
    serverMaxStreams :: Context -> IO StreamId
serverMaxStreams Context
ctx = do
        Maybe StreamId
mx <- Settings -> Maybe StreamId
maxConcurrentStreams (Settings -> Maybe StreamId) -> IO Settings -> IO (Maybe StreamId)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef (Context -> IORef Settings
peerSettings Context
ctx)
        case Maybe StreamId
mx of
            Maybe StreamId
Nothing -> StreamId -> IO StreamId
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamId
forall a. Bounded a => a
maxBound
            Just StreamId
x -> StreamId -> IO StreamId
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamId
x
    possibleClientStream :: Context -> IO StreamId
possibleClientStream Context
ctx = do
        StreamId
x <- Context -> IO StreamId
serverMaxStreams Context
ctx
        StreamId
n <- OddStreamTable -> StreamId
oddConc (OddStreamTable -> StreamId) -> IO OddStreamTable -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TVar OddStreamTable -> IO OddStreamTable
forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (Context -> TVar OddStreamTable
oddStreamTable Context
ctx)
        StreamId -> IO StreamId
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
x StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
- StreamId
n)
    aux :: Context -> Aux
aux Context
ctx =
        Aux
            { auxPossibleClientStreams :: IO StreamId
auxPossibleClientStreams = Context -> IO StreamId
possibleClientStream Context
ctx
            }
    clientCore :: Context -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx Request
req Response -> IO b
processResponse = do
        Stream
strm <- Config -> Context -> Scheme -> Authority -> Request -> IO Stream
sendRequest Config
conf Context
ctx Scheme
scheme Authority
authority Request
req
        Response
rsp <- Stream -> IO Response
getResponse Stream
strm
        b
x <- Response -> IO b
processResponse Response
rsp
        Context -> Stream -> IO ()
adjustRxWindow Context
ctx Stream
strm
        b -> IO b
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return b
x
    runClient :: Context -> IO a
runClient Context
ctx = Context -> IO a -> IO a
forall a. Context -> IO a -> IO a
wrapClinet Context
ctx (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ Client a
client (Context -> Request -> (Response -> IO r) -> IO r
forall {b}. Context -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx) (Aux -> IO a) -> Aux -> IO a
forall a b. (a -> b) -> a -> b
$ Context -> Aux
aux Context
ctx

wrapClinet :: Context -> IO a -> IO a
wrapClinet :: forall a. Context -> IO a -> IO a
wrapClinet Context
ctx IO a
client = do
    a
x <- IO a
client
    Manager -> IO ()
waitCounter0 (Manager -> IO ()) -> Manager -> IO ()
forall a b. (a -> b) -> a -> b
$ Context -> Manager
threadManager Context
ctx
    let frame :: Scheme
frame = StreamId -> ErrorCode -> Scheme -> Scheme
goawayFrame StreamId
0 ErrorCode
NoError Scheme
"graceful closing"
    TQueue Control -> Control -> IO ()
enqueueControl (Context -> TQueue Control
controlQ Context
ctx) (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing [Scheme
frame]
    TQueue Control -> Control -> IO ()
enqueueControl (Context -> TQueue Control
controlQ Context
ctx) (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ HTTP2Error -> Control
CFinish HTTP2Error
GoAwayIsSent
    STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        Bool
done <- TVar Bool -> STM Bool
forall a. TVar a -> STM a
readTVar (TVar Bool -> STM Bool) -> TVar Bool -> STM Bool
forall a b. (a -> b) -> a -> b
$ Context -> TVar Bool
senderDone Context
ctx
        Bool -> STM ()
check Bool
done
    a -> IO a
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return a
x

-- | Launching a receiver and a sender.
runIO :: ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO :: forall a. ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO cconf :: ClientConfig
cconf@ClientConfig{StreamId
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> StreamId
connectionWindowSize :: ClientConfig -> StreamId
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: StreamId
connectionWindowSize :: StreamId
settings :: Settings
..} conf :: Config
conf@Config{StreamId
Buffer
Manager
SockAddr
StreamId -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confWriteBuffer :: Buffer
confBufferSize :: StreamId
confSendAll :: Scheme -> IO ()
confReadN :: StreamId -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
confWriteBuffer :: Config -> Buffer
confBufferSize :: Config -> StreamId
confSendAll :: Config -> Scheme -> IO ()
confReadN :: Config -> StreamId -> IO Scheme
confPositionReadMaker :: Config -> PositionReadMaker
confTimeoutManager :: Config -> Manager
confMySockAddr :: Config -> SockAddr
confPeerSockAddr :: Config -> SockAddr
..} ClientIO -> IO (IO a)
action = do
    ctx :: Context
ctx@Context{TVar Bool
TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
DynamicTable
Manager
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> Manager
controlQ :: Context -> TQueue Control
senderDone :: Context -> TVar Bool
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue Output
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: Manager
senderDone :: TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar StreamId
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
..} <- ClientConfig -> Config -> IO Context
setup ClientConfig
cconf Config
conf
    let putB :: Scheme -> IO ()
putB Scheme
bs = TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing [Scheme
bs]
        putR :: Request -> IO (StreamId, Stream)
putR Request
req = do
            Stream
strm <- Config -> Context -> Scheme -> Authority -> Request -> IO Stream
sendRequest Config
conf Context
ctx Scheme
scheme Authority
authority Request
req
            (StreamId, Stream) -> IO (StreamId, Stream)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> StreamId
streamNumber Stream
strm, Stream
strm)
        get :: Stream -> IO Response
get = Stream -> IO Response
getResponse
        create :: IO (StreamId, Stream)
create = Context -> IO (StreamId, Stream)
openOddStreamWait Context
ctx
    IO a
runClient <- do
        IO a
act <- ClientIO -> IO (IO a)
action (ClientIO -> IO (IO a)) -> ClientIO -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ SockAddr
-> SockAddr
-> (Request -> IO (StreamId, Stream))
-> (Stream -> IO Response)
-> (Scheme -> IO ())
-> IO (StreamId, Stream)
-> ClientIO
ClientIO SockAddr
confMySockAddr SockAddr
confPeerSockAddr Request -> IO (StreamId, Stream)
putR Stream -> IO Response
get Scheme -> IO ()
putB IO (StreamId, Stream)
create
        IO a -> IO (IO a)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (IO a -> IO (IO a)) -> IO a -> IO (IO a)
forall a b. (a -> b) -> a -> b
$ Context -> IO a -> IO a
forall a. Context -> IO a -> IO a
wrapClinet Context
ctx IO a
act
    Config -> Context -> IO a -> IO a
forall a. Config -> Context -> IO a -> IO a
runH2 Config
conf Context
ctx IO a
runClient

getResponse :: Stream -> IO Response
getResponse :: Stream -> IO Response
getResponse Stream
strm = do
    Either SomeException InpObj
mRsp <- MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj)
forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar (MVar (Either SomeException InpObj)
 -> IO (Either SomeException InpObj))
-> MVar (Either SomeException InpObj)
-> IO (Either SomeException InpObj)
forall a b. (a -> b) -> a -> b
$ Stream -> MVar (Either SomeException InpObj)
streamInput Stream
strm
    case Either SomeException InpObj
mRsp of
        Left SomeException
err -> SomeException -> IO Response
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO SomeException
err
        Right InpObj
rsp -> Response -> IO Response
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Response -> IO Response) -> Response -> IO Response
forall a b. (a -> b) -> a -> b
$ InpObj -> Response
Response InpObj
rsp

setup :: ClientConfig -> Config -> IO Context
setup :: ClientConfig -> Config -> IO Context
setup ClientConfig{StreamId
Authority
Scheme
Settings
scheme :: ClientConfig -> Scheme
authority :: ClientConfig -> Authority
cacheLimit :: ClientConfig -> StreamId
connectionWindowSize :: ClientConfig -> StreamId
settings :: ClientConfig -> Settings
scheme :: Scheme
authority :: Authority
cacheLimit :: StreamId
connectionWindowSize :: StreamId
settings :: Settings
..} conf :: Config
conf@Config{StreamId
Buffer
Manager
SockAddr
StreamId -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confWriteBuffer :: Config -> Buffer
confBufferSize :: Config -> StreamId
confSendAll :: Config -> Scheme -> IO ()
confReadN :: Config -> StreamId -> IO Scheme
confPositionReadMaker :: Config -> PositionReadMaker
confTimeoutManager :: Config -> Manager
confMySockAddr :: Config -> SockAddr
confPeerSockAddr :: Config -> SockAddr
confWriteBuffer :: Buffer
confBufferSize :: StreamId
confSendAll :: Scheme -> IO ()
confReadN :: StreamId -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
..} = do
    let clientInfo :: RoleInfo
clientInfo = Scheme -> Authority -> RoleInfo
newClientInfo Scheme
scheme Authority
authority
    Context
ctx <-
        RoleInfo
-> Config
-> StreamId
-> StreamId
-> Settings
-> Manager
-> IO Context
newContext
            RoleInfo
clientInfo
            Config
conf
            StreamId
cacheLimit
            StreamId
connectionWindowSize
            Settings
settings
            Manager
confTimeoutManager
    Context -> IO ()
exchangeSettings Context
ctx
    Context -> IO Context
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Context
ctx

runH2 :: Config -> Context -> IO a -> IO a
runH2 :: forall a. Config -> Context -> IO a -> IO a
runH2 Config
conf Context
ctx IO a
runClient = do
    Manager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
forall a. Manager -> IO a -> (Maybe SomeException -> IO ()) -> IO a
stopAfter Manager
mgr (Either () a -> a
forall a. Either () a -> a
clientResult (Either () a -> a) -> IO (Either () a) -> IO a
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IO () -> IO a -> IO (Either () a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race IO ()
runBackgroundThreads IO a
runClient) ((Maybe SomeException -> IO ()) -> IO a)
-> (Maybe SomeException -> IO ()) -> IO a
forall a b. (a -> b) -> a -> b
$ \Maybe SomeException
res ->
        TVar OddStreamTable
-> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams (Context -> TVar OddStreamTable
oddStreamTable Context
ctx) (Context -> TVar EvenStreamTable
evenStreamTable Context
ctx) Maybe SomeException
res
  where
    mgr :: Manager
mgr = Context -> Manager
threadManager Context
ctx
    runReceiver :: IO ()
runReceiver = Context -> Config -> IO ()
frameReceiver Context
ctx Config
conf
    runSender :: IO ()
runSender = Context -> Config -> IO ()
frameSender Context
ctx Config
conf
    runBackgroundThreads :: IO ()
runBackgroundThreads = do
        Authority -> IO ()
labelMe Authority
"H2 runBackgroundThreads"
        IO () -> IO () -> IO ()
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
concurrently_ IO ()
runReceiver IO ()
runSender

    clientResult :: Either () a -> a
    clientResult :: forall a. Either () a -> a
clientResult (Left ()) = a
forall a. HasCallStack => a
undefined -- unreachable
    clientResult (Right a
a) = a
a

sendRequest
    :: Config
    -> Context
    -> Scheme
    -> Authority
    -> Request
    -> IO Stream
sendRequest :: Config -> Context -> Scheme -> Authority -> Request -> IO Stream
sendRequest Config
conf ctx :: Context
ctx@Context{TVar Bool
TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
DynamicTable
Manager
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> Manager
controlQ :: Context -> TQueue Control
senderDone :: Context -> TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar StreamId
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue Output
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: Manager
senderDone :: TVar Bool
..} Scheme
scheme Authority
auth (Request OutObj
req) = do
    -- Checking push promises
    let hdr0 :: [Header]
hdr0 = OutObj -> [Header]
outObjHeaders OutObj
req
        method :: Scheme
method = Scheme -> Maybe Scheme -> Scheme
forall a. a -> Maybe a -> a
fromMaybe (Authority -> Scheme
forall a. HasCallStack => Authority -> a
error Authority
"sendRequest:method") (Maybe Scheme -> Scheme) -> Maybe Scheme -> Scheme
forall a b. (a -> b) -> a -> b
$ HeaderName -> [Header] -> Maybe Scheme
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":method" [Header]
hdr0
        path :: Scheme
path = Scheme -> Maybe Scheme -> Scheme
forall a. a -> Maybe a -> a
fromMaybe (Authority -> Scheme
forall a. HasCallStack => Authority -> a
error Authority
"sendRequest:path") (Maybe Scheme -> Scheme) -> Maybe Scheme -> Scheme
forall a b. (a -> b) -> a -> b
$ HeaderName -> [Header] -> Maybe Scheme
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":path" [Header]
hdr0
    Maybe Stream
mstrm0 <- TVar EvenStreamTable -> Scheme -> Scheme -> IO (Maybe Stream)
lookupEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
    case Maybe Stream
mstrm0 of
        Just Stream
strm0 -> do
            TVar EvenStreamTable -> Scheme -> Scheme -> IO ()
deleteEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
            Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Stream
strm0
        Maybe Stream
Nothing -> do
            -- Arch/Sender is originally implemented for servers where
            -- the ordering of responses can be out-of-order.
            -- But for clients, the ordering must be maintained.
            -- To implement this, 'outputQStreamID' is used.
            let isIPv6 :: Bool
isIPv6 = Maybe IPv6 -> Bool
forall a. Maybe a -> Bool
isJust (Authority -> Maybe IPv6
forall a. Read a => Authority -> Maybe a
readMaybe Authority
auth :: Maybe IPv6)
                auth' :: Scheme
auth'
                    | Bool
isIPv6 = Scheme
"[" Scheme -> Scheme -> Scheme
forall a. Semigroup a => a -> a -> a
<> Authority -> Scheme
UTF8.fromString Authority
auth Scheme -> Scheme -> Scheme
forall a. Semigroup a => a -> a -> a
<> Scheme
"]"
                    | Bool
otherwise = Authority -> Scheme
UTF8.fromString Authority
auth
            let hdr1, hdr2 :: [Header]
                hdr1 :: [Header]
hdr1
                    | Scheme
scheme Scheme -> Scheme -> Bool
forall a. Eq a => a -> a -> Bool
/= Scheme
"" = (HeaderName
":scheme", Scheme
scheme) Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hdr0
                    | Bool
otherwise = [Header]
hdr0
                hdr2 :: [Header]
hdr2
                    | Authority
auth Authority -> Authority -> Bool
forall a. Eq a => a -> a -> Bool
/= Authority
"" = (HeaderName
":authority", Scheme
auth') Header -> [Header] -> [Header]
forall a. a -> [a] -> [a]
: [Header]
hdr1
                    | Bool
otherwise = [Header]
hdr1
                req' :: OutObj
req' = OutObj
req{outObjHeaders = hdr2}
            -- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
            (StreamId
sid, Stream
newstrm) <- Context -> IO (StreamId, Stream)
openOddStreamWait Context
ctx
            Config -> Context -> StreamId -> Stream -> OutObj -> IO ()
sendHeaderBody Config
conf Context
ctx StreamId
sid Stream
newstrm OutObj
req'
            Stream -> IO Stream
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Stream
newstrm

sendHeaderBody :: Config -> Context -> StreamId -> Stream -> OutObj -> IO ()
sendHeaderBody :: Config -> Context -> StreamId -> Stream -> OutObj -> IO ()
sendHeaderBody Config{StreamId
Buffer
Manager
SockAddr
StreamId -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confWriteBuffer :: Config -> Buffer
confBufferSize :: Config -> StreamId
confSendAll :: Config -> Scheme -> IO ()
confReadN :: Config -> StreamId -> IO Scheme
confPositionReadMaker :: Config -> PositionReadMaker
confTimeoutManager :: Config -> Manager
confMySockAddr :: Config -> SockAddr
confPeerSockAddr :: Config -> SockAddr
confWriteBuffer :: Buffer
confBufferSize :: StreamId
confSendAll :: Scheme -> IO ()
confReadN :: StreamId -> IO Scheme
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
..} ctx :: Context
ctx@Context{TVar Bool
TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
DynamicTable
Manager
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> Manager
controlQ :: Context -> TQueue Control
senderDone :: Context -> TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar StreamId
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue Output
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: Manager
senderDone :: TVar Bool
..} StreamId
sid Stream
newstrm OutObj{[Header]
OutBody
TrailersMaker
outObjHeaders :: OutObj -> [Header]
outObjHeaders :: [Header]
outObjBody :: OutBody
outObjTrailers :: TrailersMaker
outObjBody :: OutObj -> OutBody
outObjTrailers :: OutObj -> TrailersMaker
..} = do
    (Maybe DynaNext
mnext, Maybe (TBQueue StreamingChunk)
mtbq) <- case OutBody
outObjBody of
        OutBody
OutBodyNone -> (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DynaNext
forall a. Maybe a
Nothing, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
        OutBodyFile (FileSpec Authority
path FileOffset
fileoff FileOffset
bytecount) -> do
            (PositionRead
pread, Sentinel
closerOrRefresher) <- PositionReadMaker
confPositionReadMaker Authority
path
            IO ()
refresh <- case Sentinel
closerOrRefresher of
                Closer IO ()
closer -> Manager -> IO () -> IO (IO ())
timeoutClose Manager
threadManager IO ()
closer
                Refresher IO ()
refresher -> IO () -> IO (IO ())
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return IO ()
refresher
            let next :: DynaNext
next = PositionRead -> FileOffset -> FileOffset -> IO () -> DynaNext
fillFileBodyGetNext PositionRead
pread FileOffset
fileoff FileOffset
bytecount IO ()
refresh
            (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
        OutBodyBuilder Builder
builder -> do
            let next :: DynaNext
next = Builder -> DynaNext
fillBuilderBodyGetNext Builder
builder
            (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
        OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
            TBQueue StreamingChunk
q <- Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context
ctx Stream
newstrm ((OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk))
-> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
forall a b. (a -> b) -> a -> b
$ \OutBodyIface
iface ->
                OutBodyIface -> forall x. IO x -> IO x
outBodyUnmask OutBodyIface
iface (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (Builder -> IO ()) -> IO () -> IO ()
strmbdy (OutBodyIface -> Builder -> IO ()
outBodyPush OutBodyIface
iface) (OutBodyIface -> IO ()
outBodyFlush OutBodyIface
iface)
            let next :: DynaNext
next = TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
q
            (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, TBQueue StreamingChunk -> Maybe (TBQueue StreamingChunk)
forall a. a -> Maybe a
Just TBQueue StreamingChunk
q)
        OutBodyStreamingIface OutBodyIface -> IO ()
strmbdy -> do
            TBQueue StreamingChunk
q <- Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context
ctx Stream
newstrm OutBodyIface -> IO ()
strmbdy
            let next :: DynaNext
next = TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
q
            (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, TBQueue StreamingChunk -> Maybe (TBQueue StreamingChunk)
forall a. a -> Maybe a
Just TBQueue StreamingChunk
q)
    ((MVar Sync
var, Maybe OutputType -> IO Bool
sync), Output
out) <-
        Stream
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ((MVar Sync, Maybe OutputType -> IO Bool), Output)
prepareSync Stream
newstrm ([Header] -> Maybe DynaNext -> TrailersMaker -> OutputType
OHeader [Header]
outObjHeaders Maybe DynaNext
mnext TrailersMaker
outObjTrailers) Maybe (TBQueue StreamingChunk)
mtbq
    STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        StreamId
sidOK <- TVar StreamId -> STM StreamId
forall a. TVar a -> STM a
readTVar TVar StreamId
outputQStreamID
        Bool -> STM ()
check (StreamId
sidOK StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
sid)
        TQueue Output -> Output -> STM ()
enqueueOutputSTM TQueue Output
outputQ Output
out
        TVar StreamId -> StreamId -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar StreamId
outputQStreamID (StreamId
sid StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ StreamId
2)
    Manager -> Authority -> IO () -> IO ()
forkManaged Manager
threadManager Authority
"H2 worker" (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Context
-> Stream -> MVar Sync -> (Maybe OutputType -> IO Bool) -> IO ()
syncWithSender Context
ctx Stream
newstrm MVar Sync
var Maybe OutputType -> IO Bool
sync

sendStreaming
    :: Context
    -> Stream
    -> (OutBodyIface -> IO ())
    -> IO (TBQueue StreamingChunk)
sendStreaming :: Context
-> Stream -> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
sendStreaming Context{TVar Bool
TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
DynamicTable
Manager
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> Manager
controlQ :: Context -> TQueue Control
senderDone :: Context -> TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar StreamId
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue Output
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: Manager
senderDone :: TVar Bool
..} Stream
strm OutBodyIface -> IO ()
strmbdy = do
    let label :: Authority
label = Authority
"H2 streaming supporter for stream " Authority -> ShowS
forall a. [a] -> [a] -> [a]
++ StreamId -> Authority
forall a. Show a => a -> Authority
show (Stream -> StreamId
streamNumber Stream
strm)
    TBQueue StreamingChunk
tbq <- Natural -> IO (TBQueue StreamingChunk)
forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
10 -- fixme: hard coding: 10
    Manager
-> Authority -> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forkManagedUnmask Manager
threadManager Authority
label (((forall x. IO x -> IO x) -> IO ()) -> IO ())
-> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall x. IO x -> IO x
unmask ->
        TBQueue StreamingChunk
-> (forall x. IO x -> IO x) -> (OutBodyIface -> IO ()) -> IO ()
forall r.
TBQueue StreamingChunk
-> (forall x. IO x -> IO x) -> (OutBodyIface -> IO r) -> IO r
withOutBodyIface TBQueue StreamingChunk
tbq IO a -> IO a
forall x. IO x -> IO x
unmask OutBodyIface -> IO ()
strmbdy
    TBQueue StreamingChunk -> IO (TBQueue StreamingChunk)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return TBQueue StreamingChunk
tbq

exchangeSettings :: Context -> IO ()
exchangeSettings :: Context -> IO ()
exchangeSettings Context{TVar Bool
TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
DynamicTable
Manager
Settings
RoleInfo
Role
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
threadManager :: Context -> Manager
controlQ :: Context -> TQueue Control
senderDone :: Context -> TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar StreamId
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue Output
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: Manager
senderDone :: TVar Bool
..} = do
    StreamId
connRxWS <- RxFlow -> StreamId
rxfBufSize (RxFlow -> StreamId) -> IO RxFlow -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef RxFlow -> IO RxFlow
forall a. IORef a -> IO a
readIORef IORef RxFlow
rxFlow
    let frames :: [Scheme]
frames = Settings -> StreamId -> [Scheme]
makeNegotiationFrames Settings
mySettings StreamId
connRxWS
        setframe :: Control
setframe = Maybe SettingsList -> [Scheme] -> Control
CFrames Maybe SettingsList
forall a. Maybe a
Nothing (Scheme
connectionPreface Scheme -> [Scheme] -> [Scheme]
forall a. a -> [a] -> [a]
: [Scheme]
frames)
    IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
myFirstSettings Bool
True
    TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ Control
setframe

data ClientIO = ClientIO
    { ClientIO -> SockAddr
cioMySockAddr :: SockAddr
    , ClientIO -> SockAddr
cioPeerSockAddr :: SockAddr
    , ClientIO -> Request -> IO (StreamId, Stream)
cioWriteRequest :: Request -> IO (StreamId, Stream)
    , ClientIO -> Stream -> IO Response
cioReadResponse :: Stream -> IO Response
    , ClientIO -> Scheme -> IO ()
cioWriteBytes :: ByteString -> IO ()
    , ClientIO -> IO (StreamId, Stream)
cioCreateStream :: IO (StreamId, Stream)
    }