{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE PatternGuards #-} {-# LANGUAGE CPP #-} module Network.Wai.Handler.Warp.HTTP2.Worker ( Respond , response , worker ) where #if __GLASGOW_HASKELL__ < 709 import Control.Applicative #endif import Control.Concurrent import Control.Concurrent.STM import Control.Exception (Exception, SomeException(..), AsyncException(..)) import qualified Control.Exception as E import Control.Monad (void, when) import Data.Typeable import qualified Network.HTTP.Types as H import Network.HTTP2 import Network.HTTP2.Priority import Network.Wai import Network.Wai.Handler.Warp.HTTP2.EncodeFrame import Network.Wai.Handler.Warp.HTTP2.Manager import Network.Wai.Handler.Warp.HTTP2.Types import Network.Wai.Handler.Warp.IORef import Network.Wai.HTTP2 ( Chunk(..) , HTTP2Application , PushPromise , Responder(runResponder) , RespondFunc ) import qualified Network.Wai.Handler.Warp.Settings as S import qualified Network.Wai.Handler.Warp.Timeout as T ---------------------------------------------------------------- -- | An 'HTTP2Application' takes a function of status, headers, trailers, and -- body; this type implements that by currying some internal arguments. -- -- The token type of the RespondFunc is set to be (). This is a bit -- anti-climactic, but the real benefit of the token type is that the -- application is forced to call the responder, and making it a boring type -- doesn't break that property. -- -- This is the argument to a 'Responder'. type Respond = IO () -> Stream -> RespondFunc () -- | This function is passed to workers. They also pass responses from -- 'HTTP2Application's to this function. This function enqueues commands for -- the HTTP/2 sender. response :: Context -> Manager -> ThreadContinue -> Respond response ctx mgr tconf tickle strm s h strmbdy = do -- TODO(awpr) HEAD requests will still stream. -- We must not exit this WAI application. -- If the application exits, streaming would be also closed. -- So, this work occupies this thread. -- -- We need to increase the number of workers. myThreadId >>= replaceWithAction mgr -- After this work, this thread stops to decrease the number of workers. setThreadContinue tconf False runStream ctx OResponse tickle strm s h strmbdy -- | Set up a waiter thread and run the stream body with functions to enqueue -- 'Sequence's on the stream's queue. runStream :: Context -> (Stream -> H.Status -> H.ResponseHeaders -> Aux -> Output) -> Respond runStream Context{outputQ} mkOutput tickle strm s h strmbdy = do -- Since 'Body' is loop, we cannot control it. -- So, let's serialize 'Builder' with a designated queue. sq <- newTBQueueIO 10 -- fixme: hard coding: 10 tvar <- newTVarIO SyncNone let out = mkOutput strm s h (Persist sq tvar) -- Since we must not enqueue an empty queue to the priority -- queue, we spawn a thread to ensure that the designated -- queue is not empty. void $ forkIO $ waiter tvar sq strm outputQ atomically $ writeTVar tvar $ SyncNext out let write chunk = do atomically $ writeTBQueue sq $ case chunk of BuilderChunk b -> SBuilder b FileChunk path part -> SFile path part tickle flush = atomically $ writeTBQueue sq SFlush trailers <- strmbdy write flush atomically $ writeTBQueue sq $ SFinish trailers -- | Handle abnormal termination of a stream: mark it as closed, send a reset -- frame, and call the user's 'settingsOnException' handler if applicable. cleanupStream :: Context -> S.Settings -> Stream -> Maybe Request -> Maybe SomeException -> IO () cleanupStream Context{outputQ} set strm req me = do closed strm Killed let sid = streamNumber strm frame = resetFrame InternalError sid enqueueControl outputQ sid $ OFrame frame case me of Nothing -> return () Just e -> S.settingsOnException set req e -- | Push the given 'Responder' to the client if the settings allow it -- (specifically 'enablePush' and 'maxConcurrentStreams'). Returns 'True' if -- the stream was actually pushed. -- -- This is the push function given to an 'HTTP2Application'. pushResponder :: Context -> S.Settings -> Stream -> PushPromise -> Responder -> IO Bool pushResponder ctx set strm promise responder = do let Context{ http2settings , pushConcurrency } = ctx cnt <- readIORef pushConcurrency settings <- readIORef http2settings let enabled = enablePush settings fits = maybe True (cnt <) $ maxConcurrentStreams settings canPush = fits && enabled if canPush then actuallyPushResponder ctx set strm promise responder else return False -- | Set up a pushed stream and run the 'Responder' in its own thread. Waits -- for the sender thread to handle the push request. This can fail to push the -- stream and return 'False' if the sender dequeued the push request after the -- associated stream was closed. actuallyPushResponder :: Context -> S.Settings -> Stream -> PushPromise -> Responder -> IO Bool actuallyPushResponder ctx set strm promise responder = do let Context{ http2settings , nextPushStreamId , pushConcurrency , streamTable } = ctx -- Claim the next outgoing stream. newSid <- atomicModifyIORef nextPushStreamId $ \sid -> (sid+2, sid) ws <- initialWindowSize <$> readIORef http2settings newStrm <- newStream pushConcurrency newSid ws -- Section 5.3.5 of RFC 7540 defines the weight of push promise is 16. -- But we need not to follow the spec. So, this value would change -- if necessary. writeIORef (streamPrecedence newStrm) $ toPrecedence $ defaultPriority { streamDependency = streamNumber strm } opened newStrm insert streamTable newSid newStrm -- Set up a channel for the sender to report back whether it pushed the -- stream. mvar <- newEmptyMVar let mkOutput = OPush strm promise mvar tickle = return () respond = runStream ctx mkOutput -- TODO(awpr): synthesize a Request for 'settingsOnException'? _ <- forkIO $ runResponder responder (respond tickle newStrm) `E.catch` (cleanupStream ctx set strm Nothing . Just) takeMVar mvar data Break = Break deriving (Show, Typeable) instance Exception Break worker :: Context -> S.Settings -> T.Manager -> HTTP2Application -> (ThreadContinue -> Respond) -> IO () worker ctx@Context{inputQ} set tm app respond = do tid <- myThreadId sinfo <- newStreamInfo tcont <- newThreadContinue let setup = T.register tm $ E.throwTo tid Break E.bracket setup T.cancel $ go sinfo tcont where go sinfo tcont th = do setThreadContinue tcont True ex <- E.try $ do T.pause th Input strm req <- atomically $ readTQueue inputQ setStreamInfo sinfo strm req T.resume th T.tickle th let responder = app req $ pushResponder ctx set strm runResponder responder $ respond tcont (T.tickle th) strm cont1 <- case ex of Right () -> return True Left e@(SomeException _) | Just Break <- E.fromException e -> do cleanup sinfo Nothing return True -- killed by the sender | Just ThreadKilled <- E.fromException e -> do cleanup sinfo Nothing return False | otherwise -> do cleanup sinfo (Just e) return True cont2 <- getThreadContinue tcont when (cont1 && cont2) $ go sinfo tcont th cleanup sinfo me = do m <- getStreamInfo sinfo case m of Nothing -> return () Just (strm,req) -> do cleanupStream ctx set strm (Just req) me clearStreamInfo sinfo -- | A dedicated waiter thread to re-enqueue the stream in the priority tree -- whenever output becomes available. When the sender drains the queue and -- moves on to another stream, it drops a message in the 'TVar', and this -- thread wakes up, waits for more output to become available, and re-enqueues -- the stream. waiter :: TVar Sync -> TBQueue Sequence -> Stream -> PriorityTree Output -> IO () waiter tvar sq strm outQ = do -- waiting for actions other than SyncNone mx <- atomically $ do mout <- readTVar tvar case mout of SyncNone -> retry SyncNext out -> do writeTVar tvar SyncNone return $ Just out SyncFinish -> return Nothing case mx of Nothing -> return () Just out -> do -- ensuring that the streaming queue is not empty. atomically $ do isEmpty <- isEmptyTBQueue sq when isEmpty retry -- ensuring that stream window is greater than 0. enqueueWhenWindowIsOpen outQ out waiter tvar sq strm outQ ---------------------------------------------------------------- -- | It would nice if responders could return values to workers. -- Unfortunately, 'ResponseReceived' is already defined in WAI 2.0. -- It is not wise to change this type. -- So, a reference is shared by a 'Respond' and its worker. -- The reference refers a value of this type as a return value. -- If 'True', the worker continue to serve requests. -- Otherwise, the worker get finished. newtype ThreadContinue = ThreadContinue (IORef Bool) newThreadContinue :: IO ThreadContinue newThreadContinue = ThreadContinue <$> newIORef True setThreadContinue :: ThreadContinue -> Bool -> IO () setThreadContinue (ThreadContinue ref) x = writeIORef ref x getThreadContinue :: ThreadContinue -> IO Bool getThreadContinue (ThreadContinue ref) = readIORef ref ---------------------------------------------------------------- -- | The type to store enough information for 'settingsOnException'. newtype StreamInfo = StreamInfo (IORef (Maybe (Stream,Request))) newStreamInfo :: IO StreamInfo newStreamInfo = StreamInfo <$> newIORef Nothing clearStreamInfo :: StreamInfo -> IO () clearStreamInfo (StreamInfo ref) = writeIORef ref Nothing setStreamInfo :: StreamInfo -> Stream -> Request -> IO () setStreamInfo (StreamInfo ref) strm req = writeIORef ref $ Just (strm,req) getStreamInfo :: StreamInfo -> IO (Maybe (Stream, Request)) getStreamInfo (StreamInfo ref) = readIORef ref