{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE BangPatterns, CPP #-} {-# LANGUAGE NamedFieldPuns #-} module Network.Wai.Handler.Warp.HTTP2.Sender (frameSender) where #if __GLASGOW_HASKELL__ < 709 import Control.Applicative #endif import Control.Concurrent.MVar (putMVar) import Control.Concurrent.STM import qualified Control.Exception as E import Control.Monad (void, when) import qualified Data.ByteString as BS import qualified Data.ByteString.Builder as B (int32BE) import qualified Data.ByteString.Builder.Extra as B import Data.Monoid ((<>)) import Foreign.Ptr import qualified Network.HTTP.Types as H import Network.HPACK (setLimitForEncoding) import Network.HTTP2 import Network.HTTP2.Priority import Network.Wai (FilePart(..)) import Network.Wai.HTTP2 (Trailers, promiseHeaders) import Network.Wai.Handler.Warp.Buffer import Network.Wai.Handler.Warp.HTTP2.EncodeFrame import Network.Wai.Handler.Warp.HTTP2.HPACK import Network.Wai.Handler.Warp.HTTP2.Types import Network.Wai.Handler.Warp.IORef import qualified Network.Wai.Handler.Warp.Settings as S import Network.Wai.Handler.Warp.Types #ifdef WINDOWS import qualified System.IO as IO #else import Network.Wai.Handler.Warp.FdCache (getFd) import Network.Wai.Handler.Warp.SendFile (positionRead) import qualified Network.Wai.Handler.Warp.Timeout as T import System.Posix.IO (openFd, OpenFileFlags(..), defaultFileFlags, OpenMode(ReadOnly), closeFd) import System.Posix.Types (Fd) #endif ---------------------------------------------------------------- -- | The platform-specific type of an open file to stream from. On Windows we -- don't have pread, so this is just a Handle; on Unix platforms with pread, -- this is a file descriptor supplied by the fd cache. #ifdef WINDOWS type OpenFile = IO.Handle #else type OpenFile = Fd #endif data Leftover = LZero | LOne B.BufferWriter | LTwo BS.ByteString B.BufferWriter | LFile OpenFile Integer Integer (IO ()) ---------------------------------------------------------------- -- | Run the given action if the stream is not closed; handle any exceptions by -- resetting the stream. unlessClosed :: Connection -> Stream -> IO () -> IO Bool unlessClosed Connection{connSendAll} strm@Stream{streamState,streamNumber} body = E.handle resetStream $ do state <- readIORef streamState if (isClosed state) then return False else body >> return True where resetStream e = do closed strm (ResetByMe e) let rst = resetFrame InternalError streamNumber connSendAll rst return False getWindowSize :: TVar WindowSize -> TVar WindowSize -> IO WindowSize getWindowSize connWindow strmWindow = do -- Waiting that the connection window gets open. cw <- atomically $ do w <- readTVar connWindow check (w > 0) return w -- This stream window is greater than 0 thanks to the invariant. sw <- atomically $ readTVar strmWindow return $ min cw sw frameSender :: Context -> Connection -> InternalInfo -> S.Settings -> IO () frameSender ctx@Context{outputQ,connectionWindow,encodeDynamicTable} conn@Connection{connWriteBuffer,connBufferSize,connSendAll} ii settings = go `E.catch` ignore where initialSettings = [(SettingsMaxConcurrentStreams,recommendedConcurrency)] initialFrame = settingsFrame id initialSettings bufHeaderPayload = connWriteBuffer `plusPtr` frameHeaderLength headerPayloadLim = connBufferSize - frameHeaderLength go = do connSendAll initialFrame loop -- ignoring the old priority because the value might be changed. loop = dequeue outputQ >>= \(_sid,pre,out) -> switch out pre ignore :: E.SomeException -> IO () ignore _ = return () switch OFinish _ = return () switch (OGoaway frame) _ = connSendAll frame switch (OSettings frame alist) _ = do connSendAll frame case lookup SettingsHeaderTableSize alist of Nothing -> return () Just siz -> do dyntbl <- readIORef encodeDynamicTable setLimitForEncoding siz dyntbl loop switch (OFrame frame) _ = do connSendAll frame loop switch (OResponse strm s h aux) pre = do writeIORef (streamPrecedence strm) pre -- fixme _ <- unlessClosed conn strm $ getWindowSize connectionWindow (streamWindow strm) >>= sendResponse strm s h aux loop switch (ONext strm curr) pre = do writeIORef (streamPrecedence strm) pre _ <- unlessClosed conn strm $ do lim <- getWindowSize connectionWindow (streamWindow strm) -- Data frame payload Next datPayloadLen mnext <- curr lim fillDataHeaderSend strm 0 datPayloadLen dispatchNext strm mnext loop switch (OPush oldStrm push mvar strm s h aux) pre = do writeIORef (streamPrecedence strm) pre -- fixme pushed <- unlessClosed conn oldStrm $ do lim <- getWindowSize connectionWindow (streamWindow strm) -- Write and send the promise. builder <- hpackEncodeCIHeaders ctx $ promiseHeaders push off <- pushContinue (streamNumber oldStrm) (streamNumber strm) builder flushN $ off + frameHeaderLength -- TODO(awpr): refactor sendResponse to be able to handle non-zero -- initial offsets and use that to potentially avoid the extra syscall. sendResponse strm s h aux lim putMVar mvar pushed loop -- Send the response headers and as much of the response as is immediately -- available; shared by normal responses and pushed streams. sendResponse :: Stream -> H.Status -> H.ResponseHeaders -> Aux -> WindowSize -> IO () sendResponse strm s h (Persist sq tvar) lim = do -- Header frame and Continuation frame let sid = streamNumber strm builder <- hpackEncodeHeader ctx ii settings s h len <- headerContinue sid builder False let total = len + frameHeaderLength (off, needSend) <- sendHeadersIfNecessary total let payloadOff = off + frameHeaderLength Next datPayloadLen mnext <- fillStreamBodyGetNext ii conn payloadOff lim sq tvar strm -- If no data was immediately available, avoid sending an -- empty data frame. if datPayloadLen > 0 then fillDataHeaderSend strm total datPayloadLen else when needSend $ flushN off dispatchNext strm mnext -- Send the stream's trailers and close the stream. sendTrailers :: Stream -> Trailers -> IO () sendTrailers strm trailers = do -- Trailers always indicate the end of a stream; send them in -- consecutive header+continuation frames and end the stream. Some -- clients dislike empty headers frames, so end the stream with an -- empty data frame instead, as recommended by the spec. toFlush <- case trailers of [] -> frameHeaderLength <$ fillFrameHeader FrameData 0 (streamNumber strm) (setEndStream defaultFlags) connWriteBuffer _ -> do builder <- hpackEncodeCIHeaders ctx trailers off <- headerContinue (streamNumber strm) builder True return (off + frameHeaderLength) -- 'closed' must be before 'flushN'. If not, the context would be -- switched to the receiver, resulting in the inconsistency of -- concurrency. closed strm Finished flushN toFlush -- Flush the connection buffer to the socket, where the first 'n' bytes of -- the buffer are filled. flushN :: Int -> IO () flushN n = bufferIO connWriteBuffer n connSendAll -- A flags value with the end-header flag set iff the argument is B.Done. maybeEndHeaders B.Done = setEndHeader defaultFlags maybeEndHeaders _ = defaultFlags -- Write PUSH_PROMISE and possibly CONTINUATION frames into the connection -- buffer, using the given builder as their contents; flush them to the -- socket as necessary. pushContinue sid newSid builder = do let builder' = B.int32BE (fromIntegral newSid) <> builder (len, signal) <- B.runBuilder builder' bufHeaderPayload headerPayloadLim let flag = maybeEndHeaders signal fillFrameHeader FramePushPromise len sid flag connWriteBuffer continue sid len signal -- Write HEADER and possibly CONTINUATION frames. headerContinue sid builder endOfStream = do (len, signal) <- B.runBuilder builder bufHeaderPayload headerPayloadLim let flag0 = maybeEndHeaders signal flag = if endOfStream then setEndStream flag0 else flag0 fillFrameHeader FrameHeaders len sid flag connWriteBuffer continue sid len signal continue _ len B.Done = return len continue sid len (B.More _ writer) = do flushN $ len + frameHeaderLength (len', signal') <- writer bufHeaderPayload headerPayloadLim let flag = maybeEndHeaders signal' fillFrameHeader FrameContinuation len' sid flag connWriteBuffer continue sid len' signal' continue sid len (B.Chunk bs writer) = do flushN $ len + frameHeaderLength let (bs1,bs2) = BS.splitAt headerPayloadLim bs len' = BS.length bs1 void $ copy bufHeaderPayload bs1 fillFrameHeader FrameContinuation len' sid defaultFlags connWriteBuffer if bs2 == "" then continue sid len' (B.More 0 writer) else continue sid len' (B.Chunk bs2 writer) -- True if the connection buffer has room for a 1-byte data frame. canFitDataFrame total = total + frameHeaderLength < connBufferSize -- Take the appropriate action based on the given 'Control': -- - If more output is immediately available, re-enqueue the stream in the -- output queue. -- - If the output is over and trailers are available, send them now and -- end the stream. -- - If we've drained the queue and handed the stream back to its waiter, -- do nothing. -- -- This is done after sending any part of the stream body, so it's shared -- by 'sendResponse' and @switch (ONext ...)@. dispatchNext :: Stream -> Control DynaNext -> IO () dispatchNext _ CNone = return () dispatchNext strm (CFinish trailers) = sendTrailers strm trailers dispatchNext strm (CNext next) = do let out = ONext strm next enqueueOrSpawnTemporaryWaiter strm outputQ out -- Send headers if there is not room for a 1-byte data frame, and return -- the offset of the next frame's first header byte and whether the headers -- still need to be sent. sendHeadersIfNecessary total | canFitDataFrame total = return (total, True) | otherwise = do flushN total return (0, False) fillDataHeaderSend strm otherLen datPayloadLen = do -- Data frame header let sid = streamNumber strm buf = connWriteBuffer `plusPtr` otherLen total = otherLen + frameHeaderLength + datPayloadLen fillFrameHeader FrameData datPayloadLen sid defaultFlags buf flushN total atomically $ do modifyTVar' connectionWindow (subtract datPayloadLen) modifyTVar' (streamWindow strm) (subtract datPayloadLen) fillFrameHeader ftyp len sid flag buf = encodeFrameHeaderBuf ftyp hinfo buf where hinfo = FrameHeader len flag sid ---------------------------------------------------------------- fillStreamBodyGetNext :: InternalInfo -> Connection -> Int -> WindowSize -> TBQueue Sequence -> TVar Sync -> Stream -> IO Next fillStreamBodyGetNext ii Connection{connWriteBuffer,connBufferSize} off lim sq tvar strm = do let datBuf = connWriteBuffer `plusPtr` off room = min (connBufferSize - off) lim (leftover, cont, len) <- runStreamBuilder ii datBuf room sq nextForStream ii connWriteBuffer connBufferSize sq tvar strm leftover cont len ---------------------------------------------------------------- runStreamBuilder :: InternalInfo -> Buffer -> BufSize -> TBQueue Sequence -> IO (Leftover, Maybe Trailers, BytesFilled) runStreamBuilder ii buf0 room0 sq = loop buf0 room0 0 where loop !buf !room !total = do mbuilder <- atomically $ tryReadTBQueue sq case mbuilder of Nothing -> return (LZero, Nothing, total) Just (SBuilder builder) -> do (len, signal) <- B.runBuilder builder buf room let !total' = total + len case signal of B.Done -> loop (buf `plusPtr` len) (room - len) total' B.More _ writer -> return (LOne writer, Nothing, total') B.Chunk bs writer -> return (LTwo bs writer, Nothing, total') Just (SFile path part) -> do (leftover, len) <- runStreamFile ii buf room path part let !total' = total + len case leftover of LZero -> loop (buf `plusPtr` len) (room - len) total' _ -> return (leftover, Nothing, total') Just SFlush -> return (LZero, Nothing, total) Just (SFinish trailers) -> return (LZero, Just trailers, total) -- | Open the file and start reading into the send buffer. runStreamFile :: InternalInfo -> Buffer -> BufSize -> FilePath -> FilePart -> IO (Leftover, BytesFilled) -- | Read the given (OS-specific) file representation into the buffer. On -- non-Windows systems this uses pread; on Windows this ignores the position -- because we use the Handle's internal read position instead (because it's not -- potentially shared with other readers). readOpenFile :: OpenFile -> Buffer -> BufSize -> Integer -> IO Int #ifdef WINDOWS runStreamFile _ buf room path part = do let start = filePartOffset part bytes = filePartByteCount part -- fixme: how to close Handle? GC does it at this moment. h <- IO.openBinaryFile path IO.ReadMode IO.hSeek h IO.AbsoluteSeek start fillBufFile buf room h start bytes (return ()) readOpenFile h buf room _ = IO.hGetBufSome h buf room #else runStreamFile ii buf room path part = do let start = filePartOffset part bytes = filePartByteCount part (fd, refresh) <- case fdCacher ii of Just fdcache -> getFd fdcache path Nothing -> do fd' <- openFd path ReadOnly Nothing defaultFileFlags{nonBlock=True} th <- T.register (timeoutManager ii) (closeFd fd') return (fd', T.tickle th) fillBufFile buf room fd start bytes refresh readOpenFile = positionRead #endif -- | Read as much of the file as is currently available into the buffer, then -- return a 'Leftover' to indicate whether this file chunk has more data to -- send. If this read hit the end of the file range, return 'LZero'; otherwise -- return 'LFile' so this stream will continue reading from the file the next -- time it's pulled from the queue. fillBufFile :: Buffer -> BufSize -> OpenFile -> Integer -> Integer -> (IO ()) -> IO (Leftover, BytesFilled) fillBufFile buf room f start bytes refresh = do len <- readOpenFile f buf (mini room bytes) start refresh let len' = fromIntegral len leftover = if bytes > len' then LFile f (start + len') (bytes - len') refresh else LZero return (leftover, len) mini :: Int -> Integer -> Int mini i n | fromIntegral i < n = i | otherwise = fromIntegral n fillBufStream :: InternalInfo -> Buffer -> BufSize -> Leftover -> TBQueue Sequence -> TVar Sync -> Stream -> DynaNext fillBufStream ii buf0 siz0 leftover0 sq tvar strm lim0 = do let payloadBuf = buf0 `plusPtr` frameHeaderLength room0 = min (siz0 - frameHeaderLength) lim0 case leftover0 of LZero -> do (leftover, end, len) <- runStreamBuilder ii payloadBuf room0 sq getNext leftover end len LOne writer -> write writer payloadBuf room0 0 LTwo bs writer | BS.length bs <= room0 -> do buf1 <- copy payloadBuf bs let len = BS.length bs write writer buf1 (room0 - len) len | otherwise -> do let (bs1,bs2) = BS.splitAt room0 bs void $ copy payloadBuf bs1 getNext (LTwo bs2 writer) Nothing room0 LFile fd start bytes refresh -> do (leftover, len) <- fillBufFile payloadBuf room0 fd start bytes refresh getNext leftover Nothing len where getNext = nextForStream ii buf0 siz0 sq tvar strm write writer1 buf room sofar = do (len, signal) <- writer1 buf room case signal of B.Done -> do (leftover, end, extra) <- runStreamBuilder ii (buf `plusPtr` len) (room - len) sq let !total = sofar + len + extra getNext leftover end total B.More _ writer -> do let !total = sofar + len getNext (LOne writer) Nothing total B.Chunk bs writer -> do let !total = sofar + len getNext (LTwo bs writer) Nothing total nextForStream :: InternalInfo -> Buffer -> BufSize -> TBQueue Sequence -> TVar Sync -> Stream -> Leftover -> Maybe Trailers -> BytesFilled -> IO Next nextForStream _ _ _ _ tvar _ _ (Just trailers) len = do atomically $ writeTVar tvar $ SyncFinish return $ Next len $ CFinish trailers nextForStream ii buf siz sq tvar strm LZero Nothing len = do let out = ONext strm (fillBufStream ii buf siz LZero sq tvar strm) atomically $ writeTVar tvar $ SyncNext out return $ Next len CNone nextForStream ii buf siz sq tvar strm leftover Nothing len = return $ Next len (CNext (fillBufStream ii buf siz leftover sq tvar strm))