-- Session: links frames to streams, and helps in ordering the header frames -- so that they don't get mixed with header frames from other streams when -- resources are being served concurrently. {-# LANGUAGE FlexibleContexts, Rank2Types, TemplateHaskell, OverloadedStrings #-} {-# OPTIONS_HADDOCK hide #-} module SecondTransfer.Http2.Session( http2Session ,getFrameFromSession ,sendFrameToSession ,sendCommandToSession ,CoherentSession ,SessionInput(..) ,SessionInputCommand(..) ,SessionOutput(..) ,SessionOutputCommand(..) ,SessionCoordinates(..) ,SessionComponent(..) ,SessionsCallbacks(..) ,SessionsConfig(..) ,ErrorCallback -- Internal stuff ,OutputFrame ,InputFrame ) where #include "Logging.cpphs" -- System grade utilities import Control.Concurrent (ThreadId, forkIO) import Control.Concurrent.Chan import Control.Exception (throwTo) import qualified Control.Exception as E import Control.Monad (forever) import Control.Monad.IO.Class (liftIO) import Control.Monad.Trans.Reader import Control.Concurrent.MVar import qualified Data.ByteString as B import qualified Data.ByteString.Builder as Bu import qualified Data.ByteString.Lazy as Bl import Data.Conduit import qualified Data.HashTable.IO as H import qualified Data.IntSet as NS import Data.Maybe (isJust) #ifndef IMPLICIT_MONOID import Data.Monoid (mappend) #endif import Control.Lens -- No framing layer here... let's use Kazu's Yamamoto library import qualified Network.HPACK as HP import qualified Network.HTTP2 as NH2 -- Logging utilities import System.Log.Logger -- Imports from other parts of the program import SecondTransfer.MainLoop.CoherentWorker import SecondTransfer.MainLoop.Tokens import SecondTransfer.Sessions.Config import SecondTransfer.Sessions.Internal (sessionExceptionHandler, SessionsContext, sessionsConfig) import SecondTransfer.Utils (unfoldChannelAndSource) import SecondTransfer.Exception import qualified SecondTransfer.Utils.HTTPHeaders as He -- Unfortunately the frame encoding API of Network.HTTP2 is a bit difficult to -- use :-( type OutputFrame = (NH2.EncodeInfo, NH2.FramePayload) type InputFrame = NH2.Frame useChunkLength :: Int useChunkLength = 16384 -- Singleton instance used for concurrency data HeadersSent = HeadersSent -- All streams put their data bits here. A "Nothing" value signals -- end of data. type DataOutputToConveyor = (GlobalStreamId, Maybe B.ByteString) -- Whatever a worker thread is going to need comes here.... -- this is to make refactoring easier, but not strictly needed. data WorkerThreadEnvironment = WorkerThreadEnvironment { -- What's the header stream id? _streamId :: GlobalStreamId -- A full block of headers can come here... the mvar in the middle should -- be populate to signal end of headers transmission. A thread will be suspended -- waiting for that , _headersOutput :: Chan (GlobalStreamId, MVar HeadersSent, Headers) -- And regular contents can come this way and thus be properly mixed -- with everything else.... for now... ,_dataOutput :: Chan DataOutputToConveyor ,_streamsCancelled_WTE :: MVar NS.IntSet } makeLenses ''WorkerThreadEnvironment -- An HTTP/2 session. Basically a couple of channels ... type Session = (SessionInput, SessionOutput) -- From outside, one can only write to this one ... the newtype is to enforce -- this. newtype SessionInput = SessionInput ( Chan (Either SessionInputCommand InputFrame) ) sendFrameToSession :: SessionInput -> InputFrame -> IO () sendFrameToSession (SessionInput chan) frame = writeChan chan $ Right frame sendCommandToSession :: SessionInput -> SessionInputCommand -> IO () sendCommandToSession (SessionInput chan) command = writeChan chan $ Left command -- From outside, one can only read from this one newtype SessionOutput = SessionOutput ( Chan (Either SessionOutputCommand OutputFrame) ) getFrameFromSession :: SessionOutput -> IO (Either SessionOutputCommand OutputFrame) getFrameFromSession (SessionOutput chan) = readChan chan type HashTable k v = H.CuckooHashTable k v type Stream2HeaderBlockFragment = HashTable GlobalStreamId Bu.Builder type WorkerMonad = ReaderT WorkerThreadEnvironment IO -- Have to figure out which are these...but I would expect to have things -- like unexpected aborts here in this type. data SessionInputCommand = CancelSession_SIC deriving Show -- temporary data SessionOutputCommand = CancelSession_SOC deriving Show -- Here is how we make a session type SessionMaker = SessionsContext -> IO Session -- Here is how we make a session wrapping a CoherentWorker type CoherentSession = CoherentWorker -> SessionMaker data PostInputMechanism = PostInputMechanism (Chan (Maybe B.ByteString), InputDataStream) -- Settings imposed by the peer data SessionSettings = SessionSettings { _pushEnabled :: Bool } makeLenses ''SessionSettings -- NH2.Frame != Frame data SessionData = SessionData { -- ATTENTION: Ignore the warning coming from here for now _sessionsContext :: SessionsContext ,_sessionInput :: Chan (Either SessionInputCommand InputFrame) -- We need to lock this channel occassionally so that we can order multiple -- header frames properly.... ,_sessionOutput :: MVar (Chan (Either SessionOutputCommand OutputFrame)) -- Use to encode ,_toEncodeHeaders :: MVar HP.DynamicTable -- And used to decode ,_toDecodeHeaders :: MVar HP.DynamicTable -- While I'm receiving headers, anything which -- is not a header should end in the connection being -- closed ,_receivingHeaders :: MVar (Maybe Int) -- _lastGoodStream is used both to report the last good stream in the -- GoAwayFrame and to keep track of streams oppened by the client. In -- other words, it contains the stream_id of the last valid client -- stream and is updated as soon as the first frame of that stream is -- received. ,_lastGoodStream :: MVar Int -- Used for decoding the headers ,_stream2HeaderBlockFragment :: Stream2HeaderBlockFragment -- Used for worker threads... this is actually a pre-filled template -- I make copies of it in different contexts, and as needed. ,_forWorkerThread :: WorkerThreadEnvironment ,_coherentWorker :: CoherentWorker -- Some streams may be cancelled ,_streamsCancelled :: MVar NS.IntSet -- Data input mechanism corresponding to some threads ,_stream2PostInputMechanism :: HashTable Int PostInputMechanism -- Worker thread register. This is a dictionary from stream id to -- the ThreadId of the thread with the worker thread. I use this to -- raise asynchronous exceptions in the worker thread if the stream -- is cancelled by the client. This way we get early finalization. ,_stream2WorkerThread :: HashTable Int ThreadId -- Use to retrieve/set the session id ,_sessionIdAtSession :: Int -- And used to keep peer session settings ,_sessionSettings :: MVar SessionSettings } makeLenses ''SessionData -- v- {headers table size comes here!!} http2Session :: CoherentWorker -> Int -> SessionsContext -> IO Session http2Session coherent_worker session_id sessions_context = do session_input <- newChan session_output <- newChan session_output_mvar <- newMVar session_output -- For incremental construction of headers... stream_request_headers <- H.new :: IO Stream2HeaderBlockFragment -- Warning: we should find a way of coping with different table sizes. decode_headers_table <- HP.newDynamicTableForDecoding 4096 decode_headers_table_mvar <- newMVar decode_headers_table encode_headers_table <- HP.newDynamicTableForEncoding 4096 encode_headers_table_mvar <- newMVar encode_headers_table -- These ones need independent threads taking care of sending stuff -- their way... headers_output <- newChan :: IO (Chan (GlobalStreamId, MVar HeadersSent, Headers)) data_output <- newChan :: IO (Chan DataOutputToConveyor) stream2postinputmechanism <- H.new stream2workerthread <- H.new last_good_stream_mvar <- newMVar (-1) receiving_headers <- newMVar Nothing session_settings <- newMVar $ SessionSettings { _pushEnabled = True } -- What about stream cancellation? cancelled_streams_mvar <- newMVar $ NS.empty :: IO (MVar NS.IntSet) let for_worker_thread = WorkerThreadEnvironment { _streamId = error "NotInitialized" ,_headersOutput = headers_output ,_dataOutput = data_output ,_streamsCancelled_WTE = cancelled_streams_mvar } let session_data = SessionData { _sessionsContext = sessions_context ,_sessionInput = session_input ,_sessionOutput = session_output_mvar ,_toDecodeHeaders = decode_headers_table_mvar ,_toEncodeHeaders = encode_headers_table_mvar ,_stream2HeaderBlockFragment = stream_request_headers ,_forWorkerThread = for_worker_thread ,_coherentWorker = coherent_worker ,_streamsCancelled = cancelled_streams_mvar ,_stream2PostInputMechanism = stream2postinputmechanism ,_stream2WorkerThread = stream2workerthread ,_sessionIdAtSession = session_id ,_receivingHeaders = receiving_headers ,_sessionSettings = session_settings ,_lastGoodStream = last_good_stream_mvar } let exc_handler :: SessionComponent -> HTTP2SessionException -> IO () exc_handler component e = sessionExceptionHandler component session_id sessions_context e exc_guard :: SessionComponent -> IO () -> IO () exc_guard component action = E.catch action (\e -> do INSTRUMENTATION( errorM "HTTP2.Session" "Exception processed" ) exc_handler component e ) -- Create an input thread that decodes frames... forkIO $ exc_guard SessionInputThread_HTTP2SessionComponent $ runReaderT sessionInputThread session_data -- Create a thread that captures headers and sends them down the tube forkIO $ exc_guard SessionHeadersOutputThread_HTTP2SessionComponent $ runReaderT (headersOutputThread headers_output session_output_mvar) session_data -- Create a thread that captures data and sends it down the tube forkIO $ exc_guard SessionDataOutputThread_HTTP2SessionComponent $ dataOutputThread data_output session_output_mvar -- The two previous threads fill the session_output argument below (they write to it) -- the session machinery in the other end is in charge of sending that data through the -- socket. return ( (SessionInput session_input), (SessionOutput session_output) ) -- TODO: Some ill clients can break this thread with exceptions. Make these paths a bit --- more robust. sessionInputThread :: ReaderT SessionData IO () sessionInputThread = do INSTRUMENTATION( liftIO $ debugM "HTTP2.Session" "Entering sessionInputThread" ) -- This is an introductory and declarative block... all of this is tail-executed -- every time that a packet needs to be processed. It may be a good idea to abstract -- these values in a closure... session_input <- view sessionInput decode_headers_table_mvar <- view toDecodeHeaders stream_request_headers <- view stream2HeaderBlockFragment cancelled_streams_mvar <- view streamsCancelled coherent_worker <- view coherentWorker for_worker_thread_uns <- view forWorkerThread stream2workerthread <- view stream2WorkerThread receiving_headers_mvar <- view receivingHeaders last_good_stream_mvar <- view lastGoodStream input <- liftIO $ readChan session_input -- INSTRUMENTATION( liftIO $ infoM "HTTP2.Session" $ "Got a frame or a command: " ++ (show input) ) case input of Left CancelSession_SIC -> do -- Good place to tear down worker threads... Let the rest of the finalization -- to the framer liftIO $ do H.mapM_ (\ (_, thread_id) -> do throwTo thread_id StreamCancelledException return () ) stream2workerthread -- We do not continue here, but instead let it finish return () -- The block below will process both HEADERS and CONTINUATION frames. -- TODO: As it stands now, the server will happily start a new stream with -- a CONTINUATION frame instead of a HEADERS frame. That's against the -- protocol. Right frame | Just (stream_id, bytes) <- isAboutHeaders frame -> do -- Just append the frames to streamRequestHeaders opens_stream <- appendHeaderFragmentBlock stream_id bytes if opens_stream then do maybe_rcv_headers_of <- liftIO $ takeMVar receiving_headers_mvar case maybe_rcv_headers_of of Just _ -> do -- INSTRUMENTATION( liftIO $ errorM "HTTP2.Session" "headers being received") -- Bad client, it is already sending headers -- and trying to open another one closeConnectionBecauseIsInvalid NH2.ProtocolError -- An exception will be thrown above, so to not complicate -- control flow here too much. Nothing -> do -- Signal that we are receiving headers now, for this stream liftIO $ putMVar receiving_headers_mvar (Just stream_id) -- And go to check if the stream id is valid last_good_stream <- liftIO $ takeMVar last_good_stream_mvar if (odd stream_id ) && (stream_id > last_good_stream) then do -- We are golden, set the new good stream liftIO $ putMVar last_good_stream_mvar (stream_id) else do -- We are not golden INSTRUMENTATION( liftIO $ errorM "HTTP2.Session" "Protocol error: bad stream id") closeConnectionBecauseIsInvalid NH2.ProtocolError else do maybe_rcv_headers_of <- liftIO $ takeMVar receiving_headers_mvar case maybe_rcv_headers_of of Just a_stream_id | a_stream_id == stream_id -> do -- Nothing to complain about liftIO $ putMVar receiving_headers_mvar maybe_rcv_headers_of Nothing -> error "InternalError, this should be set" if frameEndsHeaders frame then do -- Ok, let it be known that we are not receiving more headers liftIO $ modifyMVar_ receiving_headers_mvar (\ _ -> return Nothing ) -- Let's decode the headers let for_worker_thread = set streamId stream_id for_worker_thread_uns headers_bytes <- getHeaderBytes stream_id dyn_table <- liftIO $ takeMVar decode_headers_table_mvar (new_table, header_list ) <- liftIO $ HP.decodeHeader dyn_table headers_bytes -- Good moment to remove the headers from the table.... we don't want a space -- leak here liftIO $ do H.delete stream_request_headers stream_id putMVar decode_headers_table_mvar new_table -- TODO: Validate headers, abort session if the headers are invalid. -- Otherwise other invariants will break!! -- THIS IS PROBABLY THE BEST PLACE FOR DOING IT. let headers_editor = He.fromList header_list maybe_good_headers_editor <- validateIncomingHeaders headers_editor good_headers <- case maybe_good_headers_editor of Just yes_they_are_good -> return yes_they_are_good Nothing -> closeConnectionBecauseIsInvalid NH2.ProtocolError -- Add any extra headers, on demand headers_extra_good <- addExtraHeaders good_headers let header_list_after = He.toList headers_extra_good -- liftIO $ putStrLn $ "header list after " ++ (show header_list_after) -- If the headers end the request.... post_data_source <- if not (frameEndsStream frame) then do mechanism <- createMechanismForStream stream_id let source = postDataSourceFromMechanism mechanism return $ Just source else do return Nothing -- TODO: Handle the cases where a request tries to send data -- even if the method doesn't allow for data. -- I'm clear to start the worker, in its own thread liftIO $ do thread_id <- forkIO $ runReaderT (workerThread (header_list_after, post_data_source) coherent_worker) for_worker_thread H.insert stream2workerthread stream_id thread_id return () else -- Frame doesn't end the headers... it was added before... so -- probably do nothing return () continue Right frame@(NH2.Frame _ (NH2.RSTStreamFrame _error_code_id)) -> do let stream_id = streamIdFromFrame frame liftIO $ do INSTRUMENTATION( infoM "HTTP2.Session" $ "Stream reset: " ++ (show _error_code_id) ) cancelled_streams <- takeMVar cancelled_streams_mvar INSTRUMENTATION( infoM "HTTP2.Session" $ "Cancelled stream was: " ++ (show stream_id) ) putMVar cancelled_streams_mvar $ NS.insert stream_id cancelled_streams maybe_thread_id <- H.lookup stream2workerthread stream_id case maybe_thread_id of Nothing -> -- This is actually more like an internal error, when this -- happend, cancell the session error "InterruptingUnexistentStream" Just thread_id -> do INSTRUMENTATION( infoM "HTTP2.Session" $ "Stream successfully interrupted" ) throwTo thread_id StreamCancelledException continue Right frame@(NH2.Frame (NH2.FrameHeader _ _ nh2_stream_id) (NH2.DataFrame somebytes)) -> unlessReceivingHeaders $ do -- So I got data to process -- TODO: Handle end of stream let stream_id = NH2.fromStreamIdentifier nh2_stream_id -- TODO: Handle the cases where the stream_id doesn't match an already existent -- stream. In such cases it is justified to reset the connection with a protocol_error. streamWorkerSendData stream_id somebytes -- After that data has been received and forwarded downstream, we can issue a windows update -- -- TODO: We can use wider frames to avoid congestion... -- .... and we can also be more compositional with these short bursts of data.... -- -- TODO: Consider that the best place to output these frames can be somewhere else... -- -- TODO: Use a special, with-quota queue here to do flow control. Don't send meaningless -- WindowUpdateFrame's sendOutFrame (NH2.EncodeInfo NH2.defaultFlags nh2_stream_id Nothing ) (NH2.WindowUpdateFrame (fromIntegral (B.length somebytes)) ) sendOutFrame (NH2.EncodeInfo NH2.defaultFlags (NH2.toStreamIdentifier 0) Nothing ) (NH2.WindowUpdateFrame (fromIntegral (B.length somebytes)) ) if frameEndsStream frame then do -- Good place to close the source ... closePostDataSource stream_id else return () continue Right (NH2.Frame (NH2.FrameHeader _ flags _) (NH2.PingFrame _)) | NH2.testAck flags-> do -- Deal with pings: this is an Ack, so do nothing continue Right (NH2.Frame (NH2.FrameHeader _ _ _) (NH2.PingFrame somebytes)) -> do -- Deal with pings: NOT an Ack, so answer INSTRUMENTATION( liftIO $ debugM "HTTP2.Session" "Ping processed" ) sendOutFrame (NH2.EncodeInfo (NH2.setAck NH2.defaultFlags) (NH2.toStreamIdentifier 0) Nothing ) (NH2.PingFrame somebytes) continue Right (NH2.Frame frame_header (NH2.SettingsFrame _)) | isSettingsAck frame_header -> do -- Frame was received by the peer, do nothing here... continue -- TODO: Do something with these settings!! Right (NH2.Frame _ (NH2.SettingsFrame settings_list)) -> do INSTRUMENTATION( liftIO $ debugM "HTTP2.Session" $ "Received settings: " ++ (show settings_list) ) -- Just acknowledge the frame.... for now sendOutFrame (NH2.EncodeInfo (NH2.setAck NH2.defaultFlags) (NH2.toStreamIdentifier 0) Nothing ) (NH2.SettingsFrame []) continue Right somethingelse -> unlessReceivingHeaders $ do -- An undhandled case here.... INSTRUMENTATION( liftIO $ errorM "HTTP2.Session" $ "Received problematic frame: " ) liftIO $ errorM "HTTP2.Session" $ ".. " ++ (show somethingelse) continue where continue = sessionInputThread sendOutFrame :: NH2.EncodeInfo -> NH2.FramePayload -> ReaderT SessionData IO () sendOutFrame encode_info payload = do session_output_mvar <- view sessionOutput session_output <- liftIO $ takeMVar session_output_mvar liftIO $ writeChan session_output $ Right (encode_info, payload) liftIO $ putMVar session_output_mvar session_output -- TODO: This function, but using the headers editor, triggers -- some renormalization of the header order. A good thing, if -- I get that order well enough.... addExtraHeaders :: He.HeaderEditor -> ReaderT SessionData IO He.HeaderEditor addExtraHeaders headers_editor = do let enriched_lens = (sessionsContext . sessionsConfig .sessionsEnrichedHeaders ) -- TODO: Figure out which is the best way to put this contact in the -- source code protocol_lens = He.headerLens "second-transfer-eh--used-protocol" add_used_protocol <- view (enriched_lens . addUsedProtocol ) -- liftIO $ putStrLn $ "AAA" ++ (show add_used_protocol) let he1 = if add_used_protocol then set protocol_lens (Just "HTTP/2") headers_editor else headers_editor if add_used_protocol -- Nothing will be computed here if the headers are not modified. then return he1 else return headers_editor validateIncomingHeaders :: He.HeaderEditor -> ReaderT SessionData IO (Maybe He.HeaderEditor) validateIncomingHeaders headers_editor = do -- Check that the headers block comes with all mandatory headers. -- Right now I'm not checking that they come in the mandatory order though... -- -- Notice that this function will transform a "host" header to an ":authority" -- one. let h1 = He.replaceHostByAuthority headers_editor -- Check that headers are lowercase headers_are_lowercase = He.headersAreLowercaseAtHeaderEditor headers_editor -- Check that we have mandatory headers maybe_authority = h1 ^. (He.headerLens ":authority") maybe_method = h1 ^. (He.headerLens ":method") maybe_scheme = h1 ^. (He.headerLens ":scheme") maybe_path = h1 ^. (He.headerLens ":path") if (isJust maybe_authority) && (isJust maybe_method) && (isJust maybe_scheme) && (isJust maybe_path ) then return (Just h1) else return Nothing -- Sends a GO_AWAY frame and raises an exception, effectively terminating the input -- thread of the session. closeConnectionBecauseIsInvalid :: NH2.ErrorCodeId -> ReaderT SessionData IO a closeConnectionBecauseIsInvalid error_code = do liftIO $ errorM "HTTP2.Session" "closeConnectionBecauseIsInvalid called!" last_good_stream_mvar <- view lastGoodStream last_good_stream <- liftIO $ takeMVar last_good_stream_mvar session_output_mvar <- view sessionOutput stream2workerthread <- view stream2WorkerThread sendOutFrame (NH2.EncodeInfo NH2.defaultFlags (NH2.toStreamIdentifier 0) Nothing ) (NH2.GoAwayFrame (NH2.toStreamIdentifier last_good_stream) error_code "" ) liftIO $ do -- Close all active threads for this session H.mapM_ ( \(_stream_id, thread_id) -> throwTo thread_id StreamCancelledException ) stream2workerthread -- Notify the framer that the session is closing, so -- that it stops accepting frames from connected sources -- (Streams?) session_output <- takeMVar session_output_mvar writeChan session_output $ Left CancelSession_SOC putMVar session_output_mvar session_output -- And unwind the input thread in the session, so that the -- exception handler runs.... E.throw HTTP2ProtocolException frameEndsStream :: InputFrame -> Bool frameEndsStream (NH2.Frame (NH2.FrameHeader _ flags _) _) = NH2.testEndStream flags -- Executes its argument, unless receiving -- headers, in which case the connection is closed. unlessReceivingHeaders :: ReaderT SessionData IO a -> ReaderT SessionData IO a unlessReceivingHeaders comp = do receiving_headers_mvar <- view receivingHeaders -- First check if we are receiving headers maybe_recv_headers <- liftIO $ readMVar receiving_headers_mvar if isJust maybe_recv_headers then -- So, this frame is highly illegal closeConnectionBecauseIsInvalid NH2.ProtocolError else comp createMechanismForStream :: GlobalStreamId -> ReaderT SessionData IO PostInputMechanism createMechanismForStream stream_id = do (chan, source) <- liftIO $ unfoldChannelAndSource stream2postinputmechanism <- view stream2PostInputMechanism let pim = PostInputMechanism (chan, source) liftIO $ H.insert stream2postinputmechanism stream_id pim return pim -- TODO: Can be optimized by factoring out the mechanism lookup -- TODO IMPORTANT: This is a good place to drop the postinputmechanism -- for a stream, so that unprocessed data can be garbage-collected. closePostDataSource :: GlobalStreamId -> ReaderT SessionData IO () closePostDataSource stream_id = do stream2postinputmechanism <- view stream2PostInputMechanism pim_maybe <- liftIO $ H.lookup stream2postinputmechanism stream_id case pim_maybe of Just (PostInputMechanism (chan, _)) -> liftIO $ writeChan chan Nothing Nothing -> -- TODO: This is a protocol error, handle it properly error "Internal error/closePostDataSource" streamWorkerSendData :: Int -> B.ByteString -> ReaderT SessionData IO () streamWorkerSendData stream_id bytes = do s2pim <- view stream2PostInputMechanism pim_maybe <- liftIO $ H.lookup s2pim stream_id case pim_maybe of Just pim -> sendBytesToPim pim bytes Nothing -> -- This is an internal error, the mechanism should be -- created when the headers end (and if the headers -- do not finish the stream) error "Internal error" sendBytesToPim :: PostInputMechanism -> B.ByteString -> ReaderT SessionData IO () sendBytesToPim (PostInputMechanism (chan, _)) bytes = liftIO $ writeChan chan (Just bytes) postDataSourceFromMechanism :: PostInputMechanism -> InputDataStream postDataSourceFromMechanism (PostInputMechanism (_, source)) = source isSettingsAck :: NH2.FrameHeader -> Bool isSettingsAck (NH2.FrameHeader _ flags _) = NH2.testAck flags isStreamCancelled :: GlobalStreamId -> WorkerMonad Bool isStreamCancelled stream_id = do cancelled_streams_mvar <- view streamsCancelled_WTE cancelled_streams <- liftIO $ readMVar cancelled_streams_mvar return $ NS.member stream_id cancelled_streams workerThread :: Request -> CoherentWorker -> WorkerMonad () workerThread req coherent_worker = do headers_output <- view headersOutput stream_id <- view streamId -- TODO: Handle exceptions here: what happens if the coherent worker -- throws an exception signaling that the request is ill-formed -- and should be dropped? That could happen in a couple of occassions, -- but really most cases should be handled here in this file... (headers, _, data_and_conclussion) <- liftIO $ coherent_worker req -- Now I send the headers, if that's possible at all headers_sent <- liftIO $ newEmptyMVar liftIO $ writeChan headers_output (stream_id, headers_sent, headers) -- At this moment I should ask if the stream hasn't been cancelled by the browser before -- commiting to the work of sending addtitional data is_stream_cancelled <- isStreamCancelled stream_id if not is_stream_cancelled then do -- I have a beautiful source that I can de-construct... -- TODO: Optionally pulling data out from a Conduit .... -- liftIO ( data_and_conclussion $$ (_sendDataOfStream stream_id) ) -- -- This threadlet should block here waiting for the headers to finish going (maybe_footers, _) <- runConduit $ (transPipe liftIO data_and_conclussion) `fuseBothMaybe` (sendDataOfStream stream_id headers_sent) -- BIG TODO: Send the footers ... likely stream conclusion semantics -- will need to be changed. return () else return () -- v-- comp. monad. sendDataOfStream :: GlobalStreamId -> MVar HeadersSent -> Sink B.ByteString (ReaderT WorkerThreadEnvironment IO) () sendDataOfStream stream_id headers_sent = do data_output <- view dataOutput -- Wait for all headers sent liftIO $ takeMVar headers_sent consumer data_output where consumer data_output = do maybe_bytes <- await case maybe_bytes of Nothing -> liftIO $ writeChan data_output (stream_id, Nothing) Just bytes -> do liftIO $ writeChan data_output (stream_id, Just bytes) consumer data_output -- Returns if the frame is the first in the stream appendHeaderFragmentBlock :: GlobalStreamId -> B.ByteString -> ReaderT SessionData IO Bool appendHeaderFragmentBlock global_stream_id bytes = do ht <- view stream2HeaderBlockFragment maybe_old_block <- liftIO $ H.lookup ht global_stream_id (new_block, new_stream) <- case maybe_old_block of Nothing -> do -- TODO: Make the commented message below more informative -- INSTRUMENTATION( liftIO $ debugM "HTTP2.Session" $ "Starting stream " ++ (show global_stream_id) ) return $ (Bu.byteString bytes, True) Just something -> return $ (something `mappend` (Bu.byteString bytes), False) liftIO $ H.insert ht global_stream_id new_block return new_stream getHeaderBytes :: GlobalStreamId -> ReaderT SessionData IO B.ByteString getHeaderBytes global_stream_id = do ht <- view stream2HeaderBlockFragment Just bytes <- liftIO $ H.lookup ht global_stream_id return $ Bl.toStrict $ Bu.toLazyByteString bytes isAboutHeaders :: InputFrame -> Maybe (GlobalStreamId, B.ByteString) isAboutHeaders (NH2.Frame (NH2.FrameHeader _ _ stream_id) ( NH2.HeadersFrame _ block_fragment ) ) = Just (NH2.fromStreamIdentifier stream_id, block_fragment) isAboutHeaders (NH2.Frame (NH2.FrameHeader _ _ stream_id) ( NH2.ContinuationFrame block_fragment) ) = Just (NH2.fromStreamIdentifier stream_id, block_fragment) isAboutHeaders _ = Nothing frameEndsHeaders :: InputFrame -> Bool frameEndsHeaders (NH2.Frame (NH2.FrameHeader _ flags _) _) = NH2.testEndHeader flags streamIdFromFrame :: InputFrame -> GlobalStreamId streamIdFromFrame (NH2.Frame (NH2.FrameHeader _ _ stream_id) _) = NH2.fromStreamIdentifier stream_id -- TODO: Have different size for the headers..... just now going with a default size of 16 k... -- TODO: Find a way to kill this thread.... headersOutputThread :: Chan (GlobalStreamId, MVar HeadersSent, Headers) -> MVar (Chan (Either SessionOutputCommand OutputFrame)) -> ReaderT SessionData IO () headersOutputThread input_chan session_output_mvar = forever $ do (stream_id, headers_ready_mvar, headers) <- liftIO $ readChan input_chan -- First encode the headers using the table encode_dyn_table_mvar <- view toEncodeHeaders encode_dyn_table <- liftIO $ takeMVar encode_dyn_table_mvar (new_dyn_table, data_to_send ) <- liftIO $ HP.encodeHeader HP.defaultEncodeStrategy encode_dyn_table headers liftIO $ putMVar encode_dyn_table_mvar new_dyn_table -- Now split the bytestring in chunks of the needed size.... bs_chunks <- return $! bytestringChunk useChunkLength data_to_send -- And send the chunks through while locking the output place.... liftIO $ E.bracket (takeMVar session_output_mvar) (putMVar session_output_mvar ) (\ session_output -> do writeIndividualHeaderFrames session_output stream_id bs_chunks True -- And say that the headers for this thread are out -- INSTRUMENTATION( debugM "HTTP2.Session" $ "Headers were output for stream " ++ (show stream_id) ) putMVar headers_ready_mvar HeadersSent ) where writeIndividualHeaderFrames :: Chan (Either SessionOutputCommand OutputFrame) -> GlobalStreamId -> [B.ByteString] -> Bool -> IO () writeIndividualHeaderFrames session_output stream_id (last_fragment:[]) is_first = writeChan session_output $ Right ( NH2.EncodeInfo { NH2.encodeFlags = NH2.setEndHeader NH2.defaultFlags ,NH2.encodeStreamId = NH2.toStreamIdentifier stream_id ,NH2.encodePadding = Nothing }, (if is_first then NH2.HeadersFrame Nothing last_fragment else NH2.ContinuationFrame last_fragment) ) writeIndividualHeaderFrames session_output stream_id (fragment:xs) is_first = do writeChan session_output $ Right ( NH2.EncodeInfo { NH2.encodeFlags = NH2.defaultFlags ,NH2.encodeStreamId = NH2.toStreamIdentifier stream_id ,NH2.encodePadding = Nothing }, (if is_first then NH2.HeadersFrame Nothing fragment else NH2.ContinuationFrame fragment) ) writeIndividualHeaderFrames session_output stream_id xs False bytestringChunk :: Int -> B.ByteString -> [B.ByteString] bytestringChunk len s | (B.length s) < len = [ s ] bytestringChunk len s = h:(bytestringChunk len xs) where (h, xs) = B.splitAt len s -- TODO: find a clean way to finish this thread (maybe with negative stream ids?) -- TODO: This function does non-optimal chunking for the case where responses are -- actually streamed.... in those cases we need to keep state for frames in -- some other format.... -- TODO: Right now, we are transmitting an empty last frame with the end-of-stream -- flag set. I'm afraid that the only -- way to avoid that is by holding a frame or by augmenting the end-user interface -- so that the user can signal which one is the last frame. The first approach -- restricts responsiviness, the second one clutters things. dataOutputThread :: Chan DataOutputToConveyor -> MVar (Chan (Either SessionOutputCommand OutputFrame)) -> IO () dataOutputThread input_chan session_output_mvar = forever $ do (stream_id, maybe_contents) <- readChan input_chan case maybe_contents of Nothing -> do liftIO $ do withLockedSessionOutput (\ session_output -> writeChan session_output $ Right ( NH2.EncodeInfo { NH2.encodeFlags = NH2.setEndStream NH2.defaultFlags ,NH2.encodeStreamId = NH2.toStreamIdentifier stream_id ,NH2.encodePadding = Nothing }, NH2.DataFrame "" ) ) Just contents -> do -- And now just simply output it... let bs_chunks = bytestringChunk useChunkLength $! contents -- And send the chunks through while locking the output place.... writeContinuations bs_chunks stream_id where withLockedSessionOutput = E.bracket (takeMVar session_output_mvar) (putMVar session_output_mvar) -- <-- There is an implicit argument there!! writeContinuations :: [B.ByteString] -> GlobalStreamId -> IO () writeContinuations fragments stream_id = mapM_ (\ fragment -> withLockedSessionOutput (\ session_output -> writeChan session_output $ Right ( NH2.EncodeInfo { NH2.encodeFlags = NH2.defaultFlags ,NH2.encodeStreamId = NH2.toStreamIdentifier stream_id ,NH2.encodePadding = Nothing }, NH2.DataFrame fragment ) ) ) fragments