{-# LANGUAGE BangPatterns #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE PatternGuards #-} {-# LANGUAGE RecordWildCards #-} module Network.HTTP2.Server.Worker ( worker ) where import Control.Concurrent.STM import Control.Exception (SomeException(..), AsyncException(..)) import qualified Control.Exception as E import Data.IORef import qualified Network.HTTP.Types as H import qualified System.TimeManager as T import Imports hiding (insert) import Network.HPACK import Network.HPACK.Token import Network.HTTP2 import Network.HTTP2.Priority import Network.HTTP2.Server.API import Network.HTTP2.Server.Context import Network.HTTP2.Server.EncodeFrame import Network.HTTP2.Server.Manager import Network.HTTP2.Server.Queue import Network.HTTP2.Server.Stream import Network.HTTP2.Server.Types ---------------------------------------------------------------- pushStream :: Context -> Stream -- parent stream -> ValueTable -- request -> [PushPromise] -> IO OutputType pushStream _ _ _ [] = return ORspn pushStream ctx@Context{..} pstrm reqvt pps0 | len == 0 = return ORspn | otherwise = do pushable <- enablePush <$> readIORef http2settings if pushable then do tvar <- newTVarIO 0 lim <- push tvar pps0 0 if lim == 0 then return ORspn else return $ OWait (waiter lim tvar) else return ORspn where !pid = streamNumber pstrm !len = length pps0 increment tvar = atomically $ modifyTVar' tvar (+1) waiter lim tvar = atomically $ do n <- readTVar tvar check (n >= lim) push _ [] !n = return (n :: Int) push tvar (pp:pps) !n = do ws <- initialWindowSize <$> readIORef http2settings let !w = promiseWeight pp !pri = defaultPriority { weight = w } !pre = toPrecedence pri newstrm <- newPushStream ctx ws pre let !sid = streamNumber newstrm insert streamTable sid newstrm let !scheme = fromJust $ getHeaderValue tokenScheme reqvt -- fixme: this value can be Nothing !auth = fromJust (getHeaderValue tokenHost reqvt <|> getHeaderValue tokenAuthority reqvt) !path = promiseRequestPath pp !promiseRequest = [(tokenMethod, H.methodGet) ,(tokenScheme, scheme) ,(tokenAuthority, auth) ,(tokenPath, path)] !ot = OPush promiseRequest pid !rsp = promiseResponse pp !out = Output newstrm rsp ot Nothing $ increment tvar enqueueOutput outputQ out push tvar pps (n + 1) -- | This function is passed to workers. -- They also pass 'Response's from a server to this function. -- This function enqueues commands for the HTTP/2 sender. response :: Context -> Manager -> T.Handle -> ThreadContinue -> Stream -> Request -> Response -> [PushPromise] -> IO () response ctx@Context{..} mgr th tconf strm req rsp pps = case responseBody rsp of RspNoBody -> do setThreadContinue tconf True enqueueOutput outputQ $ Output strm rsp ORspn Nothing (return ()) RspBuilder _ -> do otyp <- pushStream ctx strm reqvt pps setThreadContinue tconf True enqueueOutput outputQ $ Output strm rsp otyp Nothing (return ()) RspFile _ -> do otyp <- pushStream ctx strm reqvt pps setThreadContinue tconf True enqueueOutput outputQ $ Output strm rsp otyp Nothing (return ()) RspStreaming strmbdy -> do otyp <- pushStream ctx strm reqvt pps -- We must not exit this WAI application. -- If the application exits, streaming would be also closed. -- So, this work occupies this thread. -- -- We need to increase the number of workers. spawnAction mgr -- After this work, this thread stops to decease -- the number of workers. setThreadContinue tconf False -- Since streaming body is loop, we cannot control it. -- So, let's serialize 'Builder' with a designated queue. tbq <- newTBQueueIO 10 -- fixme: hard coding: 10 enqueueOutput outputQ $ Output strm rsp otyp (Just tbq) (return ()) let push b = do T.pause th atomically $ writeTBQueue tbq (RSBuilder b) T.resume th flush = atomically $ writeTBQueue tbq RSFlush strmbdy push flush atomically $ writeTBQueue tbq RSFinish deleteMyId mgr where (_,reqvt) = requestHeaders req worker :: Context -> Manager -> Server -> Action worker ctx@Context{inputQ,controlQ} mgr server = do sinfo <- newStreamInfo tcont <- newThreadContinue timeoutKillThread mgr $ go sinfo tcont where go sinfo tcont th = do setThreadContinue tcont True ex <- E.try $ do T.pause th Input strm req <- atomically $ readTQueue inputQ let req' = pauseRequestBody req th setStreamInfo sinfo strm T.resume th T.tickle th let aux = Aux th server req aux $ response ctx mgr th tcont strm req' cont1 <- case ex of Right () -> return True Left e@(SomeException _) -- killed by the local worker manager | Just ThreadKilled <- E.fromException e -> return False -- killed by the local timeout manager | Just T.TimeoutThread <- E.fromException e -> do cleanup sinfo return True | otherwise -> do cleanup sinfo return True cont2 <- getThreadContinue tcont clearStreamInfo sinfo when (cont1 && cont2) $ go sinfo tcont th pauseRequestBody req th = req { requestBody = readBody' } where !readBody = requestBody req !readBody' = do T.pause th bs <- readBody T.resume th return bs cleanup sinfo = do minp <- getStreamInfo sinfo case minp of Nothing -> return () Just strm -> do closed ctx strm Killed let !frame = resetFrame InternalError (streamNumber strm) enqueueControl controlQ $ CFrame frame ---------------------------------------------------------------- -- A reference is shared by a responder and its worker. -- The reference refers a value of this type as a return value. -- If 'True', the worker continue to serve requests. -- Otherwise, the worker get finished. newtype ThreadContinue = ThreadContinue (IORef Bool) {-# INLINE newThreadContinue #-} newThreadContinue :: IO ThreadContinue newThreadContinue = ThreadContinue <$> newIORef True {-# INLINE setThreadContinue #-} setThreadContinue :: ThreadContinue -> Bool -> IO () setThreadContinue (ThreadContinue ref) x = writeIORef ref x {-# INLINE getThreadContinue #-} getThreadContinue :: ThreadContinue -> IO Bool getThreadContinue (ThreadContinue ref) = readIORef ref ---------------------------------------------------------------- -- | The type for cleaning up. newtype StreamInfo = StreamInfo (IORef (Maybe Stream)) {-# INLINE newStreamInfo #-} newStreamInfo :: IO StreamInfo newStreamInfo = StreamInfo <$> newIORef Nothing {-# INLINE clearStreamInfo #-} clearStreamInfo :: StreamInfo -> IO () clearStreamInfo (StreamInfo ref) = writeIORef ref Nothing {-# INLINE setStreamInfo #-} setStreamInfo :: StreamInfo -> Stream -> IO () setStreamInfo (StreamInfo ref) inp = writeIORef ref $ Just inp {-# INLINE getStreamInfo #-} getStreamInfo :: StreamInfo -> IO (Maybe Stream) getStreamInfo (StreamInfo ref) = readIORef ref