{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Server.Worker (
    worker,
) where

import Control.Concurrent.STM
import Data.IORef
import Network.HTTP.Semantics
import Network.HTTP.Semantics.IO
import Network.HTTP.Semantics.Server
import Network.HTTP.Semantics.Server.Internal
import Network.HTTP.Types
import qualified System.TimeManager as T

import Imports hiding (insert)
import Network.HTTP2.Frame
import Network.HTTP2.H2

----------------------------------------------------------------

makePushStream :: Context -> Stream -> IO (StreamId, Stream)
makePushStream :: Context -> Stream -> IO (Int, Stream)
makePushStream Context
ctx Stream
pstrm = do
    -- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
    (Int
_, Stream
newstrm) <- Context -> IO (Int, Stream)
openEvenStreamWait Context
ctx
    let pid :: Int
pid = Stream -> Int
streamNumber Stream
pstrm
    (Int, Stream) -> IO (Int, Stream)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
pid, Stream
newstrm)

----------------------------------------------------------------

pushStream
    :: Config
    -> Context
    -> Stream -- parent stream
    -> ValueTable -- request
    -> [PushPromise]
    -> IO (Maybe (IO ()))
pushStream :: Config
-> Context
-> Stream
-> ValueTable
-> [PushPromise]
-> IO (Maybe (IO ()))
pushStream Config
_ Context
_ Stream
_ ValueTable
_ [] = Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IO ())
forall a. Maybe a
Nothing
pushStream Config
conf ctx :: Context
ctx@Context{TVar Bool
TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
DynamicTable
Manager
Settings
RoleInfo
Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: Manager
senderDone :: TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe Int)
myStreamId :: Context -> TVar Int
peerStreamId :: Context -> IORef Int
outputBufferLimit :: Context -> IORef Int
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar Int
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
threadManager :: Context -> Manager
senderDone :: Context -> TVar Bool
..} Stream
pstrm ValueTable
reqvt [PushPromise]
pps0
    | Int
len Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0 = Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IO ())
forall a. Maybe a
Nothing
    | Bool
otherwise = do
        Bool
pushable <- Settings -> Bool
enablePush (Settings -> Bool) -> IO Settings -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef IORef Settings
peerSettings
        if Bool
pushable
            then do
                TVar Int
tvar <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0
                Int
lim <- TVar Int -> [PushPromise] -> Int -> IO Int
forall {a}. Num a => TVar a -> [PushPromise] -> Int -> IO Int
push TVar Int
tvar [PushPromise]
pps0 Int
0
                if Int
lim Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
0
                    then Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IO ())
forall a. Maybe a
Nothing
                    else Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (IO ()) -> IO (Maybe (IO ())))
-> Maybe (IO ()) -> IO (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$ IO () -> Maybe (IO ())
forall a. a -> Maybe a
Just (IO () -> Maybe (IO ())) -> IO () -> Maybe (IO ())
forall a b. (a -> b) -> a -> b
$ Int -> TVar Int -> IO ()
forall {a}. Ord a => a -> TVar a -> IO ()
waiter Int
lim TVar Int
tvar
            else Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IO ())
forall a. Maybe a
Nothing
  where
    len :: Int
len = [PushPromise] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [PushPromise]
pps0
    increment :: TVar a -> IO ()
increment TVar a
tvar = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar a -> (a -> a) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar a
tvar (a -> a -> a
forall a. Num a => a -> a -> a
+ a
1)
    -- Checking if all push are done.
    waiter :: a -> TVar a -> IO ()
waiter a
lim TVar a
tvar = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        a
n <- TVar a -> STM a
forall a. TVar a -> STM a
readTVar TVar a
tvar
        Bool -> STM ()
check (a
n a -> a -> Bool
forall a. Ord a => a -> a -> Bool
>= a
lim)
    push :: TVar a -> [PushPromise] -> Int -> IO Int
push TVar a
_ [] Int
n = Int -> IO Int
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Int
n :: Int)
    push TVar a
tvar (PushPromise
pp : [PushPromise]
pps) Int
n = do
        Manager -> String -> IO () -> IO ()
forkManaged Manager
threadManager String
"H2 server push" (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            Manager -> (Handle -> IO ()) -> IO ()
forall a. Manager -> (Handle -> IO a) -> IO a
timeoutKillThread Manager
threadManager ((Handle -> IO ()) -> IO ()) -> (Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Handle
th -> do
                (Int
pid, Stream
newstrm) <- Context -> Stream -> IO (Int, Stream)
makePushStream Context
ctx Stream
pstrm
                let scheme :: FieldValue
scheme = Maybe FieldValue -> FieldValue
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe FieldValue -> FieldValue) -> Maybe FieldValue -> FieldValue
forall a b. (a -> b) -> a -> b
$ Token -> ValueTable -> Maybe FieldValue
getFieldValue Token
tokenScheme ValueTable
reqvt
                    -- fixme: this value can be Nothing
                    auth :: FieldValue
auth =
                        Maybe FieldValue -> FieldValue
forall a. HasCallStack => Maybe a -> a
fromJust
                            ( Token -> ValueTable -> Maybe FieldValue
getFieldValue Token
tokenAuthority ValueTable
reqvt
                                Maybe FieldValue -> Maybe FieldValue -> Maybe FieldValue
forall a. Maybe a -> Maybe a -> Maybe a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Token -> ValueTable -> Maybe FieldValue
getFieldValue Token
tokenHost ValueTable
reqvt
                            )
                    path :: FieldValue
path = PushPromise -> FieldValue
promiseRequestPath PushPromise
pp
                    promiseRequest :: [(Token, FieldValue)]
promiseRequest =
                        [ (Token
tokenMethod, FieldValue
methodGet)
                        , (Token
tokenScheme, FieldValue
scheme)
                        , (Token
tokenAuthority, FieldValue
auth)
                        , (Token
tokenPath, FieldValue
path)
                        ]
                    ot :: OutputType
ot = [(Token, FieldValue)] -> Int -> OutputType
OPush [(Token, FieldValue)]
promiseRequest Int
pid
                    Response OutObj
rsp = PushPromise -> Response
promiseResponse PushPromise
pp
                ((MVar Sync
var, Maybe OutputType -> IO Bool
sync), Output
out) <- Stream
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ((MVar Sync, Maybe OutputType -> IO Bool), Output)
prepareSync Stream
newstrm OutputType
ot Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing
                TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ Output
out
                Context
-> Stream -> MVar Sync -> (Maybe OutputType -> IO Bool) -> IO ()
syncWithSender Context
ctx Stream
newstrm MVar Sync
var Maybe OutputType -> IO Bool
sync
                TVar a -> IO ()
forall {a}. Num a => TVar a -> IO ()
increment TVar a
tvar
                Config -> Context -> Handle -> Stream -> OutObj -> IO ()
sendHeaderBody Config
conf Context
ctx Handle
th Stream
newstrm OutObj
rsp
        TVar a -> [PushPromise] -> Int -> IO Int
push TVar a
tvar [PushPromise]
pps (Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)

-- | This function is passed to workers.
--   They also pass 'Response's from a server to this function.
--   This function enqueues commands for the HTTP/2 sender.
sendResponse
    :: Config
    -> Context
    -> T.Handle
    -> Stream
    -> Request
    -> Response
    -> [PushPromise]
    -> IO ()
sendResponse :: Config
-> Context
-> Handle
-> Stream
-> Request
-> Response
-> [PushPromise]
-> IO ()
sendResponse Config
conf Context
ctx Handle
th Stream
strm (Request InpObj
req) (Response OutObj
rsp) [PushPromise]
pps = do
    Maybe (IO ())
mwait <- Config
-> Context
-> Stream
-> ValueTable
-> [PushPromise]
-> IO (Maybe (IO ()))
pushStream Config
conf Context
ctx Stream
strm ValueTable
reqvt [PushPromise]
pps
    case Maybe (IO ())
mwait of
        Maybe (IO ())
Nothing -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Just IO ()
wait -> IO ()
wait -- all pushes are sent
    Config -> Context -> Handle -> Stream -> OutObj -> IO ()
sendHeaderBody Config
conf Context
ctx Handle
th Stream
strm OutObj
rsp
  where
    ([(Token, FieldValue)]
_, ValueTable
reqvt) = InpObj -> ([(Token, FieldValue)], ValueTable)
inpObjHeaders InpObj
req

sendHeaderBody :: Config -> Context -> T.Handle -> Stream -> OutObj -> IO ()
sendHeaderBody :: Config -> Context -> Handle -> Stream -> OutObj -> IO ()
sendHeaderBody Config{Int
Buffer
Manager
SockAddr
Int -> IO FieldValue
PositionReadMaker
FieldValue -> IO ()
confWriteBuffer :: Buffer
confBufferSize :: Int
confSendAll :: FieldValue -> IO ()
confReadN :: Int -> IO FieldValue
confPositionReadMaker :: PositionReadMaker
confTimeoutManager :: Manager
confMySockAddr :: SockAddr
confPeerSockAddr :: SockAddr
confWriteBuffer :: Config -> Buffer
confBufferSize :: Config -> Int
confSendAll :: Config -> FieldValue -> IO ()
confReadN :: Config -> Int -> IO FieldValue
confPositionReadMaker :: Config -> PositionReadMaker
confTimeoutManager :: Config -> Manager
confMySockAddr :: Config -> SockAddr
confPeerSockAddr :: Config -> SockAddr
..} ctx :: Context
ctx@Context{TVar Bool
TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
DynamicTable
Manager
Settings
RoleInfo
Role
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe Int)
myStreamId :: Context -> TVar Int
peerStreamId :: Context -> IORef Int
outputBufferLimit :: Context -> IORef Int
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar Int
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
threadManager :: Context -> Manager
senderDone :: Context -> TVar Bool
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: Manager
senderDone :: TVar Bool
..} Handle
th Stream
strm OutObj{[Header]
OutBody
TrailersMaker
outObjHeaders :: [Header]
outObjBody :: OutBody
outObjTrailers :: TrailersMaker
outObjHeaders :: OutObj -> [Header]
outObjBody :: OutObj -> OutBody
outObjTrailers :: OutObj -> TrailersMaker
..} = do
    (Maybe DynaNext
mnext, Maybe (TBQueue StreamingChunk)
mtbq) <- case OutBody
outObjBody of
        OutBody
OutBodyNone -> (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe DynaNext
forall a. Maybe a
Nothing, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
        OutBodyFile (FileSpec String
path FileOffset
fileoff FileOffset
bytecount) -> do
            (PositionRead
pread, Sentinel
sentinel) <- PositionReadMaker
confPositionReadMaker String
path
            let next :: DynaNext
next = PositionRead -> FileOffset -> FileOffset -> Sentinel -> DynaNext
fillFileBodyGetNext PositionRead
pread FileOffset
fileoff FileOffset
bytecount Sentinel
sentinel
            (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
        OutBodyBuilder Builder
builder -> do
            let next :: DynaNext
next = Builder -> DynaNext
fillBuilderBodyGetNext Builder
builder
            (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing)
        OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
            TBQueue StreamingChunk
q <- Context
-> Stream
-> Handle
-> (OutBodyIface -> IO ())
-> IO (TBQueue StreamingChunk)
sendStreaming Context
ctx Stream
strm Handle
th ((OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk))
-> (OutBodyIface -> IO ()) -> IO (TBQueue StreamingChunk)
forall a b. (a -> b) -> a -> b
$ \OutBodyIface{IO ()
Maybe SomeException -> IO ()
Builder -> IO ()
forall x. IO x -> IO x
outBodyUnmask :: forall x. IO x -> IO x
outBodyPush :: Builder -> IO ()
outBodyPushFinal :: Builder -> IO ()
outBodyCancel :: Maybe SomeException -> IO ()
outBodyFlush :: IO ()
outBodyUnmask :: OutBodyIface -> forall x. IO x -> IO x
outBodyPush :: OutBodyIface -> Builder -> IO ()
outBodyPushFinal :: OutBodyIface -> Builder -> IO ()
outBodyCancel :: OutBodyIface -> Maybe SomeException -> IO ()
outBodyFlush :: OutBodyIface -> IO ()
..} -> (Builder -> IO ()) -> IO () -> IO ()
strmbdy Builder -> IO ()
outBodyPush IO ()
outBodyFlush
            let next :: DynaNext
next = TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
q
            (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, TBQueue StreamingChunk -> Maybe (TBQueue StreamingChunk)
forall a. a -> Maybe a
Just TBQueue StreamingChunk
q)
        OutBodyStreamingIface OutBodyIface -> IO ()
strmbdy -> do
            TBQueue StreamingChunk
q <- Context
-> Stream
-> Handle
-> (OutBodyIface -> IO ())
-> IO (TBQueue StreamingChunk)
sendStreaming Context
ctx Stream
strm Handle
th OutBodyIface -> IO ()
strmbdy
            let next :: DynaNext
next = TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
q
            (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
-> IO (Maybe DynaNext, Maybe (TBQueue StreamingChunk))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (DynaNext -> Maybe DynaNext
forall a. a -> Maybe a
Just DynaNext
next, TBQueue StreamingChunk -> Maybe (TBQueue StreamingChunk)
forall a. a -> Maybe a
Just TBQueue StreamingChunk
q)
    ((MVar Sync
var, Maybe OutputType -> IO Bool
sync), Output
out) <-
        Stream
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ((MVar Sync, Maybe OutputType -> IO Bool), Output)
prepareSync Stream
strm ([Header] -> Maybe DynaNext -> TrailersMaker -> OutputType
OHeader [Header]
outObjHeaders Maybe DynaNext
mnext TrailersMaker
outObjTrailers) Maybe (TBQueue StreamingChunk)
mtbq
    TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ Output
out
    Context
-> Stream -> MVar Sync -> (Maybe OutputType -> IO Bool) -> IO ()
syncWithSender Context
ctx Stream
strm MVar Sync
var Maybe OutputType -> IO Bool
sync

sendStreaming
    :: Context
    -> Stream
    -> T.Handle
    -> (OutBodyIface -> IO ())
    -> IO (TBQueue StreamingChunk)
sendStreaming :: Context
-> Stream
-> Handle
-> (OutBodyIface -> IO ())
-> IO (TBQueue StreamingChunk)
sendStreaming Context{TVar Bool
TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
DynamicTable
Manager
Settings
RoleInfo
Role
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe Int)
myStreamId :: Context -> TVar Int
peerStreamId :: Context -> IORef Int
outputBufferLimit :: Context -> IORef Int
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar Int
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
threadManager :: Context -> Manager
senderDone :: Context -> TVar Bool
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: Manager
senderDone :: TVar Bool
..} Stream
strm Handle
th OutBodyIface -> IO ()
strmbdy = do
    let label :: String
label = String
"H2 streaming supporter for stream " String -> String -> String
forall a. [a] -> [a] -> [a]
++ Int -> String
forall a. Show a => a -> String
show (Stream -> Int
streamNumber Stream
strm)
    TBQueue StreamingChunk
tbq <- Natural -> IO (TBQueue StreamingChunk)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
10 -- fixme: hard coding: 10
    Manager -> String -> IO () -> IO ()
forkManaged Manager
threadManager String
label (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
        TBQueue StreamingChunk
-> (forall x. IO x -> IO x) -> (OutBodyIface -> IO ()) -> IO ()
forall r.
TBQueue StreamingChunk
-> (forall x. IO x -> IO x) -> (OutBodyIface -> IO r) -> IO r
withOutBodyIface TBQueue StreamingChunk
tbq IO a -> IO a
forall a. a -> a
forall x. IO x -> IO x
id ((OutBodyIface -> IO ()) -> IO ())
-> (OutBodyIface -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \OutBodyIface
iface -> do
            let iface' :: OutBodyIface
iface' =
                    OutBodyIface
iface
                        { outBodyPush = \Builder
b -> do
                            Handle -> IO ()
T.pause Handle
th
                            OutBodyIface -> Builder -> IO ()
outBodyPush OutBodyIface
iface Builder
b
                            Handle -> IO ()
T.resume Handle
th
                        , outBodyPushFinal = \Builder
b -> do
                            Handle -> IO ()
T.pause Handle
th
                            OutBodyIface -> Builder -> IO ()
outBodyPushFinal OutBodyIface
iface Builder
b
                            Handle -> IO ()
T.resume Handle
th
                        }
            OutBodyIface -> IO ()
strmbdy OutBodyIface
iface'
    TBQueue StreamingChunk -> IO (TBQueue StreamingChunk)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return TBQueue StreamingChunk
tbq

-- | Worker for server applications.
worker :: Config -> Server -> Context -> Stream -> InpObj -> IO ()
worker :: Config -> Server -> Context -> Stream -> InpObj -> IO ()
worker Config
conf Server
server ctx :: Context
ctx@Context{TVar Bool
TVar Int
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef Int
IORef (Maybe Int)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
DynamicTable
Manager
Settings
RoleInfo
Role
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe Int)
myStreamId :: Context -> TVar Int
peerStreamId :: Context -> IORef Int
outputBufferLimit :: Context -> IORef Int
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar Int
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
threadManager :: Context -> Manager
senderDone :: Context -> TVar Bool
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe Int)
myStreamId :: TVar Int
peerStreamId :: IORef Int
outputBufferLimit :: IORef Int
outputQ :: TQueue Output
outputQStreamID :: TVar Int
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: Manager
senderDone :: TVar Bool
..} Stream
strm InpObj
req =
    Manager -> (Handle -> IO ()) -> IO ()
forall a. Manager -> (Handle -> IO a) -> IO a
timeoutKillThread Manager
threadManager ((Handle -> IO ()) -> IO ()) -> (Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Handle
th -> do
        -- FIXME: exception
        Handle -> IO ()
T.pause Handle
th
        let req' :: InpObj
req' = Handle -> InpObj
pauseRequestBody Handle
th
        Handle -> IO ()
T.resume Handle
th
        Handle -> IO ()
T.tickle Handle
th
        let aux :: Aux
aux = Handle -> SockAddr -> SockAddr -> Aux
Aux Handle
th SockAddr
mySockAddr SockAddr
peerSockAddr
            request :: Request
request = InpObj -> Request
Request InpObj
req'
        Server
server Request
request Aux
aux ((Response -> [PushPromise] -> IO ()) -> IO ())
-> (Response -> [PushPromise] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ Config
-> Context
-> Handle
-> Stream
-> Request
-> Response
-> [PushPromise]
-> IO ()
sendResponse Config
conf Context
ctx Handle
th Stream
strm Request
request
        Context -> Stream -> IO ()
adjustRxWindow Context
ctx Stream
strm
  where
    pauseRequestBody :: Handle -> InpObj
pauseRequestBody Handle
th = InpObj
req{inpObjBody = readBody'}
      where
        readBody :: IO (FieldValue, Bool)
readBody = InpObj -> IO (FieldValue, Bool)
inpObjBody InpObj
req
        readBody' :: IO (FieldValue, Bool)
readBody' = do
            Handle -> IO ()
T.pause Handle
th
            (FieldValue, Bool)
bs <- IO (FieldValue, Bool)
readBody
            Handle -> IO ()
T.resume Handle
th
            (FieldValue, Bool) -> IO (FieldValue, Bool)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (FieldValue, Bool)
bs