{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PatternGuards #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Server.Worker (
    worker
  , WorkerConf(..)
  , fromContext
  ) where

import Control.Concurrent.STM
import Control.Exception (SomeException(..), AsyncException(..))
import qualified Control.Exception as E
import Data.IORef
import qualified Network.HTTP.Types as H
import qualified System.TimeManager as T

import Imports hiding (insert)
import Network.HPACK
import Network.HPACK.Token
import Network.HTTP2.Arch
import Network.HTTP2.Frame
import Network.HTTP2.Server.Types

----------------------------------------------------------------

data WorkerConf a = WorkerConf {
    WorkerConf a -> IO (Input a)
readInputQ     :: IO (Input a)
  , WorkerConf a -> Output a -> IO ()
writeOutputQ   :: Output a -> IO ()
  , WorkerConf a -> a -> IO ()
workerCleanup  :: a -> IO ()
  , WorkerConf a -> IO Bool
isPushable     :: IO Bool
  , WorkerConf a -> StreamId -> a -> IO ()
insertStream   :: StreamId -> a -> IO ()
  , WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
  }

fromContext :: Context -> WorkerConf Stream
fromContext :: Context -> WorkerConf Stream
fromContext ctx :: Context
ctx@Context{TVar StreamId
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef Settings
TQueue Control
TQueue (Output Stream)
DynamicTable
Rate
StreamTable
RoleInfo
Role
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
connectionWindow :: Context -> TVar StreamId
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
controlQ :: Context -> TQueue Control
outputQ :: Context -> TQueue (Output Stream)
peerStreamId :: Context -> IORef StreamId
myStreamId :: Context -> IORef StreamId
continued :: Context -> IORef (Maybe StreamId)
concurrency :: Context -> IORef StreamId
streamTable :: Context -> StreamTable
firstSettings :: Context -> IORef Bool
http2settings :: Context -> IORef Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
emptyFrameRate :: Rate
settingsRate :: Rate
pingRate :: Rate
connectionWindow :: TVar StreamId
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQ :: TQueue (Output Stream)
peerStreamId :: IORef StreamId
myStreamId :: IORef StreamId
continued :: IORef (Maybe StreamId)
concurrency :: IORef StreamId
streamTable :: StreamTable
firstSettings :: IORef Bool
http2settings :: IORef Settings
roleInfo :: RoleInfo
role :: Role
..} = WorkerConf :: forall a.
IO (Input a)
-> (Output a -> IO ())
-> (a -> IO ())
-> IO Bool
-> (StreamId -> a -> IO ())
-> (a -> PushPromise -> IO (StreamId, StreamId, a))
-> WorkerConf a
WorkerConf {
    readInputQ :: IO (Input Stream)
readInputQ = STM (Input Stream) -> IO (Input Stream)
forall a. STM a -> IO a
atomically (STM (Input Stream) -> IO (Input Stream))
-> STM (Input Stream) -> IO (Input Stream)
forall a b. (a -> b) -> a -> b
$ TQueue (Input Stream) -> STM (Input Stream)
forall a. TQueue a -> STM a
readTQueue (TQueue (Input Stream) -> STM (Input Stream))
-> TQueue (Input Stream) -> STM (Input Stream)
forall a b. (a -> b) -> a -> b
$ RoleInfo -> TQueue (Input Stream)
inputQ RoleInfo
roleInfo
  , writeOutputQ :: Output Stream -> IO ()
writeOutputQ = TQueue (Output Stream) -> Output Stream -> IO ()
enqueueOutput TQueue (Output Stream)
outputQ
  , workerCleanup :: Stream -> IO ()
workerCleanup = \Stream
strm -> do
        Context -> Stream -> ClosedCode -> IO ()
closed Context
ctx Stream
strm ClosedCode
Killed
        let frame :: ByteString
frame = ErrorCodeId -> StreamId -> ByteString
resetFrame ErrorCodeId
InternalError (Stream -> StreamId
streamNumber Stream
strm)
        TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ (Control -> IO ()) -> Control -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Control
CFrame ByteString
frame
  , isPushable :: IO Bool
isPushable = Settings -> Bool
enablePush (Settings -> Bool) -> IO Settings -> IO Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef IORef Settings
http2settings
  , insertStream :: StreamId -> Stream -> IO ()
insertStream = StreamTable -> StreamId -> Stream -> IO ()
insert StreamTable
streamTable
  , makePushStream :: Stream -> PushPromise -> IO (StreamId, StreamId, Stream)
makePushStream = \Stream
pstrm PushPromise
_ -> do
        StreamId
ws <- Settings -> StreamId
initialWindowSize (Settings -> StreamId) -> IO Settings -> IO StreamId
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> IORef Settings -> IO Settings
forall a. IORef a -> IO a
readIORef IORef Settings
http2settings
        StreamId
sid <- Context -> IO StreamId
getMyNewStreamId Context
ctx
        Stream
newstrm <- StreamId -> StreamId -> IO Stream
newPushStream StreamId
sid StreamId
ws
        let pid :: StreamId
pid = Stream -> StreamId
streamNumber Stream
pstrm
        (StreamId, StreamId, Stream) -> IO (StreamId, StreamId, Stream)
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
pid, StreamId
sid, Stream
newstrm)
  }

----------------------------------------------------------------

pushStream :: WorkerConf a
           -> a -- parent stream
           -> ValueTable -- request
           -> [PushPromise]
           -> IO OutputType
pushStream :: WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
_ a
_ ValueTable
_ [] = OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
pushStream WorkerConf{IO Bool
IO (Input a)
a -> IO ()
a -> PushPromise -> IO (StreamId, StreamId, a)
StreamId -> a -> IO ()
Output a -> IO ()
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: StreamId -> a -> IO ()
isPushable :: IO Bool
workerCleanup :: a -> IO ()
writeOutputQ :: Output a -> IO ()
readInputQ :: IO (Input a)
makePushStream :: forall a.
WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: forall a. WorkerConf a -> StreamId -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
..} a
pstrm ValueTable
reqvt [PushPromise]
pps0
  | StreamId
len StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
0 = OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
  | Bool
otherwise = do
        Bool
pushable <- IO Bool
isPushable
        if Bool
pushable then do
            TVar StreamId
tvar <- StreamId -> IO (TVar StreamId)
forall a. a -> IO (TVar a)
newTVarIO StreamId
0
            StreamId
lim <- TVar StreamId -> [PushPromise] -> StreamId -> IO StreamId
forall a.
Num a =>
TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar StreamId
tvar [PushPromise]
pps0 StreamId
0
            if StreamId
lim StreamId -> StreamId -> Bool
forall a. Eq a => a -> a -> Bool
== StreamId
0 then
              OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
             else
              OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return (OutputType -> IO OutputType) -> OutputType -> IO OutputType
forall a b. (a -> b) -> a -> b
$ IO () -> OutputType
OWait (StreamId -> TVar StreamId -> IO ()
forall a. Ord a => a -> TVar a -> IO ()
waiter StreamId
lim TVar StreamId
tvar)
          else
            OutputType -> IO OutputType
forall (m :: * -> *) a. Monad m => a -> m a
return OutputType
OObj
  where
    len :: StreamId
len = [PushPromise] -> StreamId
forall (t :: * -> *) a. Foldable t => t a -> StreamId
length [PushPromise]
pps0
    increment :: TVar a -> IO ()
increment TVar a
tvar = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar a -> (a -> a) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar a
tvar (a -> a -> a
forall a. Num a => a -> a -> a
+a
1)
    waiter :: a -> TVar a -> IO ()
waiter a
lim TVar a
tvar = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
        a
n <- TVar a -> STM a
forall a. TVar a -> STM a
readTVar TVar a
tvar
        Bool -> STM ()
check (a
n a -> a -> Bool
forall a. Ord a => a -> a -> Bool
>= a
lim)
    push :: TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar a
_ [] StreamId
n = StreamId -> IO StreamId
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
n :: Int)
    push TVar a
tvar (PushPromise
pp:[PushPromise]
pps) StreamId
n = do
        (StreamId
pid, StreamId
sid, a
newstrm) <- a -> PushPromise -> IO (StreamId, StreamId, a)
makePushStream a
pstrm PushPromise
pp
        StreamId -> a -> IO ()
insertStream StreamId
sid a
newstrm
        let scheme :: ByteString
scheme = Maybe ByteString -> ByteString
forall a. HasCallStack => Maybe a -> a
fromJust (Maybe ByteString -> ByteString) -> Maybe ByteString -> ByteString
forall a b. (a -> b) -> a -> b
$ Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenScheme ValueTable
reqvt
            -- fixme: this value can be Nothing
            auth :: ByteString
auth   = Maybe ByteString -> ByteString
forall a. HasCallStack => Maybe a -> a
fromJust (Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenHost ValueTable
reqvt
                            Maybe ByteString -> Maybe ByteString -> Maybe ByteString
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> Token -> ValueTable -> Maybe ByteString
getHeaderValue Token
tokenAuthority ValueTable
reqvt)
            path :: ByteString
path = PushPromise -> ByteString
promiseRequestPath PushPromise
pp
            promiseRequest :: [(Token, ByteString)]
promiseRequest = [(Token
tokenMethod, ByteString
H.methodGet)
                             ,(Token
tokenScheme, ByteString
scheme)
                             ,(Token
tokenAuthority, ByteString
auth)
                             ,(Token
tokenPath, ByteString
path)]
            ot :: OutputType
ot = [(Token, ByteString)] -> StreamId -> OutputType
OPush [(Token, ByteString)]
promiseRequest StreamId
pid
            Response OutObj
rsp = PushPromise -> Response
promiseResponse PushPromise
pp
            out :: Output a
out = a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
newstrm OutObj
rsp OutputType
ot Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (IO () -> Output a) -> IO () -> Output a
forall a b. (a -> b) -> a -> b
$ TVar a -> IO ()
forall a. Num a => TVar a -> IO ()
increment TVar a
tvar
        Output a -> IO ()
writeOutputQ Output a
out
        TVar a -> [PushPromise] -> StreamId -> IO StreamId
push TVar a
tvar [PushPromise]
pps (StreamId
n StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ StreamId
1)

-- | This function is passed to workers.
--   They also pass 'Response's from a server to this function.
--   This function enqueues commands for the HTTP/2 sender.
response :: WorkerConf a -> Manager -> T.Handle -> ThreadContinue -> a -> Request -> Response -> [PushPromise] -> IO ()
response :: WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response wc :: WorkerConf a
wc@WorkerConf{IO Bool
IO (Input a)
a -> IO ()
a -> PushPromise -> IO (StreamId, StreamId, a)
StreamId -> a -> IO ()
Output a -> IO ()
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: StreamId -> a -> IO ()
isPushable :: IO Bool
workerCleanup :: a -> IO ()
writeOutputQ :: Output a -> IO ()
readInputQ :: IO (Input a)
makePushStream :: forall a.
WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: forall a. WorkerConf a -> StreamId -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
..} Manager
mgr Handle
th ThreadContinue
tconf a
strm (Request InpObj
req) (Response OutObj
rsp) [PushPromise]
pps = case OutObj -> OutBody
outObjBody OutObj
rsp of
  OutBody
OutBodyNone -> do
      ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
      Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
OObj Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
  OutBodyBuilder Builder
_ -> do
      OutputType
otyp <- WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
      ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
      Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
  OutBodyFile FileSpec
_ -> do
      OutputType
otyp <- WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
      ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
True
      Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp Maybe (TBQueue StreamingChunk)
forall a. Maybe a
Nothing (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
  OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy -> do
      OutputType
otyp <- WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
forall a.
WorkerConf a -> a -> ValueTable -> [PushPromise] -> IO OutputType
pushStream WorkerConf a
wc a
strm ValueTable
reqvt [PushPromise]
pps
      -- We must not exit this server application.
      -- If the application exits, streaming would be also closed.
      -- So, this work occupies this thread.
      --
      -- We need to increase the number of workers.
      Manager -> IO ()
spawnAction Manager
mgr
      -- After this work, this thread stops to decease
      -- the number of workers.
      ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tconf Bool
False
      -- Since streaming body is loop, we cannot control it.
      -- So, let's serialize 'Builder' with a designated queue.
      TBQueue StreamingChunk
tbq <- Natural -> IO (TBQueue StreamingChunk)
forall a. Natural -> IO (TBQueue a)
newTBQueueIO Natural
10 -- fixme: hard coding: 10
      Output a -> IO ()
writeOutputQ (Output a -> IO ()) -> Output a -> IO ()
forall a b. (a -> b) -> a -> b
$ a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output a
strm OutObj
rsp OutputType
otyp (TBQueue StreamingChunk -> Maybe (TBQueue StreamingChunk)
forall a. a -> Maybe a
Just TBQueue StreamingChunk
tbq) (() -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
      let push :: Builder -> IO ()
push Builder
b = do
            Handle -> IO ()
T.pause Handle
th
            STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Builder -> StreamingChunk
StreamingBuilder Builder
b)
            Handle -> IO ()
T.resume Handle
th
          flush :: IO ()
flush  = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
      (Builder -> IO ()) -> IO () -> IO ()
strmbdy Builder -> IO ()
push IO ()
flush
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFinished
      Manager -> IO ()
deleteMyId Manager
mgr
  where
    ([(Token, ByteString)]
_,ValueTable
reqvt) = InpObj -> ([(Token, ByteString)], ValueTable)
inpObjHeaders InpObj
req

-- | Worker for server applications.
worker :: WorkerConf a -> Manager -> Server -> Action
worker :: WorkerConf a -> Manager -> Server -> IO ()
worker wc :: WorkerConf a
wc@WorkerConf{IO Bool
IO (Input a)
a -> IO ()
a -> PushPromise -> IO (StreamId, StreamId, a)
StreamId -> a -> IO ()
Output a -> IO ()
makePushStream :: a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: StreamId -> a -> IO ()
isPushable :: IO Bool
workerCleanup :: a -> IO ()
writeOutputQ :: Output a -> IO ()
readInputQ :: IO (Input a)
makePushStream :: forall a.
WorkerConf a -> a -> PushPromise -> IO (StreamId, StreamId, a)
insertStream :: forall a. WorkerConf a -> StreamId -> a -> IO ()
isPushable :: forall a. WorkerConf a -> IO Bool
workerCleanup :: forall a. WorkerConf a -> a -> IO ()
writeOutputQ :: forall a. WorkerConf a -> Output a -> IO ()
readInputQ :: forall a. WorkerConf a -> IO (Input a)
..} Manager
mgr Server
server = do
    StreamInfo a
sinfo <- IO (StreamInfo a)
forall a. IO (StreamInfo a)
newStreamInfo
    ThreadContinue
tcont <- IO ThreadContinue
newThreadContinue
    Manager -> (Handle -> IO ()) -> IO ()
timeoutKillThread Manager
mgr ((Handle -> IO ()) -> IO ()) -> (Handle -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont
  where
    go :: StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont Handle
th = do
        ThreadContinue -> Bool -> IO ()
setThreadContinue ThreadContinue
tcont Bool
True
        Either SomeException ()
ex <- IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
E.try (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ do
            Handle -> IO ()
T.pause Handle
th
            Input a
strm InpObj
req <- IO (Input a)
readInputQ
            let req' :: InpObj
req' = InpObj -> Handle -> InpObj
pauseRequestBody InpObj
req Handle
th
            StreamInfo a -> a -> IO ()
forall a. StreamInfo a -> a -> IO ()
setStreamInfo StreamInfo a
sinfo a
strm
            Handle -> IO ()
T.resume Handle
th
            Handle -> IO ()
T.tickle Handle
th
            let aux :: Aux
aux = Handle -> Aux
Aux Handle
th
            Server
server (InpObj -> Request
Request InpObj
req') Aux
aux ((Response -> [PushPromise] -> IO ()) -> IO ())
-> (Response -> [PushPromise] -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
forall a.
WorkerConf a
-> Manager
-> Handle
-> ThreadContinue
-> a
-> Request
-> Response
-> [PushPromise]
-> IO ()
response WorkerConf a
wc Manager
mgr Handle
th ThreadContinue
tcont a
strm (InpObj -> Request
Request InpObj
req')
        Bool
cont1 <- case Either SomeException ()
ex of
            Right () -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
            Left e :: SomeException
e@(SomeException e
_)
              -- killed by the local worker manager
              | Just AsyncException
ThreadKilled    <- SomeException -> Maybe AsyncException
forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
              -- killed by the local timeout manager
              | Just TimeoutThread
T.TimeoutThread <- SomeException -> Maybe TimeoutThread
forall e. Exception e => SomeException -> Maybe e
E.fromException SomeException
e -> do
                  StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo
                  Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
              | Bool
otherwise -> do
                  StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo
                  Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Bool
cont2 <- ThreadContinue -> IO Bool
getThreadContinue ThreadContinue
tcont
        StreamInfo a -> IO ()
forall a. StreamInfo a -> IO ()
clearStreamInfo StreamInfo a
sinfo
        Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool
cont1 Bool -> Bool -> Bool
&& Bool
cont2) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamInfo a -> ThreadContinue -> Handle -> IO ()
go StreamInfo a
sinfo ThreadContinue
tcont Handle
th
    pauseRequestBody :: InpObj -> Handle -> InpObj
pauseRequestBody InpObj
req Handle
th = InpObj
req { inpObjBody :: InpBody
inpObjBody = InpBody
readBody' }
      where
        readBody :: InpBody
readBody = InpObj -> InpBody
inpObjBody InpObj
req
        readBody' :: InpBody
readBody' = do
            Handle -> IO ()
T.pause Handle
th
            ByteString
bs <- InpBody
readBody
            Handle -> IO ()
T.resume Handle
th
            ByteString -> InpBody
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
bs
    cleanup :: StreamInfo a -> IO ()
cleanup StreamInfo a
sinfo = do
        Maybe a
minp <- StreamInfo a -> IO (Maybe a)
forall a. StreamInfo a -> IO (Maybe a)
getStreamInfo StreamInfo a
sinfo
        case Maybe a
minp of
            Maybe a
Nothing   -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just a
strm -> a -> IO ()
workerCleanup a
strm

----------------------------------------------------------------

--   A reference is shared by a responder and its worker.
--   The reference refers a value of this type as a return value.
--   If 'True', the worker continue to serve requests.
--   Otherwise, the worker get finished.
newtype ThreadContinue = ThreadContinue (IORef Bool)

{-# INLINE newThreadContinue #-}
newThreadContinue :: IO ThreadContinue
newThreadContinue :: IO ThreadContinue
newThreadContinue = IORef Bool -> ThreadContinue
ThreadContinue (IORef Bool -> ThreadContinue)
-> IO (IORef Bool) -> IO ThreadContinue
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
True

{-# INLINE setThreadContinue #-}
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue :: ThreadContinue -> Bool -> IO ()
setThreadContinue (ThreadContinue IORef Bool
ref) Bool
x = IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
ref Bool
x

{-# INLINE getThreadContinue #-}
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue :: ThreadContinue -> IO Bool
getThreadContinue (ThreadContinue IORef Bool
ref) = IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef IORef Bool
ref

----------------------------------------------------------------

-- | The type for cleaning up.
newtype StreamInfo a = StreamInfo (IORef (Maybe a))

{-# INLINE newStreamInfo #-}
newStreamInfo :: IO (StreamInfo a)
newStreamInfo :: IO (StreamInfo a)
newStreamInfo = IORef (Maybe a) -> StreamInfo a
forall a. IORef (Maybe a) -> StreamInfo a
StreamInfo (IORef (Maybe a) -> StreamInfo a)
-> IO (IORef (Maybe a)) -> IO (StreamInfo a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe a -> IO (IORef (Maybe a))
forall a. a -> IO (IORef a)
newIORef Maybe a
forall a. Maybe a
Nothing

{-# INLINE clearStreamInfo #-}
clearStreamInfo :: StreamInfo a -> IO ()
clearStreamInfo :: StreamInfo a -> IO ()
clearStreamInfo (StreamInfo IORef (Maybe a)
ref) = IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref Maybe a
forall a. Maybe a
Nothing

{-# INLINE setStreamInfo #-}
setStreamInfo :: StreamInfo a -> a -> IO ()
setStreamInfo :: StreamInfo a -> a -> IO ()
setStreamInfo (StreamInfo IORef (Maybe a)
ref) a
inp = IORef (Maybe a) -> Maybe a -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef (Maybe a)
ref (Maybe a -> IO ()) -> Maybe a -> IO ()
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
inp

{-# INLINE getStreamInfo #-}
getStreamInfo :: StreamInfo a -> IO (Maybe a)
getStreamInfo :: StreamInfo a -> IO (Maybe a)
getStreamInfo (StreamInfo IORef (Maybe a)
ref) = IORef (Maybe a) -> IO (Maybe a)
forall a. IORef a -> IO a
readIORef IORef (Maybe a)
ref