-- Session: links frames to streams, and helps in ordering the header frames -- so that they don't get mixed with header frames from other streams when -- resources are being served concurrently. {-# LANGUAGE FlexibleContexts, Rank2Types, TemplateHaskell, OverloadedStrings #-} {-# OPTIONS_HADDOCK hide #-} module SecondTransfer.Http2.Session( http2Session ,getFrameFromSession ,sendFrameToSession ,sendCommandToSession ,defaultSessionsConfig ,sessionId ,reportErrorCallback ,sessionsCallbacks ,nextSessionId ,makeSessionsContext ,sessionsConfig ,sessionExceptionHandler ,CoherentSession ,SessionInput(..) ,SessionInputCommand(..) ,SessionOutput(..) ,SessionOutputCommand(..) ,SessionsContext(..) ,SessionCoordinates(..) ,SessionComponent(..) ,SessionsCallbacks(..) ,SessionsConfig(..) ,ErrorCallback -- Internal stuff ,OutputFrame ,InputFrame ) where #include "Logging.cpphs" -- System grade utilities import Control.Concurrent (ThreadId, forkIO) import Control.Concurrent.Chan import Control.Exception (SomeException, throwTo) import qualified Control.Exception as E import Control.Monad (forever) import Control.Monad.IO.Class (liftIO) import Control.Monad.Trans.Reader import Control.Concurrent.MVar import qualified Data.ByteString as B import qualified Data.ByteString.Builder as Bu import qualified Data.ByteString.Lazy as Bl import Data.Conduit import qualified Data.HashTable.IO as H import qualified Data.IntSet as NS import Data.Monoid as Mo import Control.Lens -- No framing layer here... let's use Kazu's Yamamoto library import qualified Network.HPACK as HP import qualified Network.HTTP2 as NH2 -- Logging utilities import System.Log.Logger -- Imports from other parts of the program import SecondTransfer.MainLoop.CoherentWorker import SecondTransfer.MainLoop.Tokens import SecondTransfer.Utils (unfoldChannelAndSource) import SecondTransfer.Exception -- Unfortunately the frame encoding API of Network.HTTP2 is a bit difficult to -- use :-( type OutputFrame = (NH2.EncodeInfo, NH2.FramePayload) type InputFrame = NH2.Frame useChunkLength :: Int useChunkLength = 16384 -- Singleton instance used for concurrency data HeadersSent = HeadersSent -- All streams put their data bits here. A "Nothing" value signals -- end of data. type DataOutputToConveyor = (GlobalStreamId, Maybe B.ByteString) -- Whatever a worker thread is going to need comes here.... -- this is to make refactoring easier, but not strictly needed. data WorkerThreadEnvironment = WorkerThreadEnvironment { -- What's the header stream id? _streamId :: GlobalStreamId -- A full block of headers can come here... the mvar in the middle should -- be populate to signal end of headers transmission. A thread will be suspended -- waiting for that , _headersOutput :: Chan (GlobalStreamId, MVar HeadersSent, Headers) -- And regular contents can come this way and thus be properly mixed -- with everything else.... for now... ,_dataOutput :: Chan DataOutputToConveyor ,_streamsCancelled_WTE :: MVar NS.IntSet } makeLenses ''WorkerThreadEnvironment -- Basically a couple of channels ... type Session = (SessionInput, SessionOutput) -- From outside, one can only write to this one ... the newtype is to enforce -- this. newtype SessionInput = SessionInput ( Chan (Either SessionInputCommand InputFrame) ) sendFrameToSession :: SessionInput -> InputFrame -> IO () sendFrameToSession (SessionInput chan) frame = writeChan chan $ Right frame sendCommandToSession :: SessionInput -> SessionInputCommand -> IO () sendCommandToSession (SessionInput chan) command = writeChan chan $ Left command -- From outside, one can only read from this one newtype SessionOutput = SessionOutput ( Chan (Either SessionOutputCommand OutputFrame) ) getFrameFromSession :: SessionOutput -> IO (Either SessionOutputCommand OutputFrame) getFrameFromSession (SessionOutput chan) = readChan chan type HashTable k v = H.CuckooHashTable k v -- Blaze builder could be more proper here... type Stream2HeaderBlockFragment = HashTable GlobalStreamId Bu.Builder type WorkerMonad = ReaderT WorkerThreadEnvironment IO -- Have to figure out which are these...but I would expect to have things -- like unexpected aborts here in this type. data SessionInputCommand = CancelSession_SIC deriving Show -- temporary data SessionOutputCommand = CancelSession_SOC deriving Show -- | Information used to identify a particular session. newtype SessionCoordinates = SessionCoordinates Int deriving Show instance Eq SessionCoordinates where (SessionCoordinates a) == (SessionCoordinates b) = a == b -- | Get/set a numeric Id from a `SessionCoordinates`. For example, to -- get the session id with this, import `Control.Lens.(^.)` and then do -- -- @ -- session_id = session_coordinates ^. sessionId -- @ -- sessionId :: Functor f => (Int -> f Int) -> SessionCoordinates -> f SessionCoordinates sessionId f (SessionCoordinates session_id) = fmap (\ s' -> (SessionCoordinates s')) (f session_id) -- | Components at an individual session. Used to report -- where in the session an error was produced. This interface is likely -- to change in the future, as we add more metadata to exceptions data SessionComponent = SessionInputThread_SessionComponent |SessionHeadersOutputThread_SessionComponent |SessionDataOutputThread_SessionComponent |Framer_SessionComponent deriving Show -- | Used by this session engine to report an error at some component, in a particular -- session. type ErrorCallback = (SessionComponent, SessionCoordinates, SomeException) -> IO () -- | Callbacks that you can provide your sessions to notify you -- of interesting things happening in the server. data SessionsCallbacks = SessionsCallbacks { -- Callback used to report errors during this session _reportErrorCallback :: Maybe ErrorCallback } makeLenses ''SessionsCallbacks -- | Configuration information you can provide to the session maker. data SessionsConfig = SessionsConfig { -- | Session callbacks _sessionsCallbacks :: SessionsCallbacks } -- makeLenses ''SessionsConfig -- | Lens to access sessionsCallbacks in the `SessionsConfig` object. sessionsCallbacks :: Lens' SessionsConfig SessionsCallbacks sessionsCallbacks f ( SessionsConfig { _sessionsCallbacks= s }) = fmap (\ s' -> SessionsConfig {_sessionsCallbacks = s'}) (f s) -- | Contains information that applies to all -- sessions created in the program. Use the lenses -- interface to access members of this struct. -- data SessionsContext = SessionsContext { _sessionsConfig :: SessionsConfig ,_nextSessionId :: MVar Int } makeLenses ''SessionsContext -- Here is how we make a session type SessionMaker = SessionsContext -> IO Session -- Here is how we make a session wrapping a CoherentWorker type CoherentSession = CoherentWorker -> SessionMaker -- | Creates a default sessions context. Modify as needed using -- the lenses interfaces defaultSessionsConfig :: SessionsConfig defaultSessionsConfig = SessionsConfig { _sessionsCallbacks = SessionsCallbacks { _reportErrorCallback = Nothing } } -- Adds runtime data to a context, and let it work.... makeSessionsContext :: SessionsConfig -> IO SessionsContext makeSessionsContext sessions_config = do next_session_id_mvar <- newMVar 1 return $ SessionsContext { _sessionsConfig = sessions_config, _nextSessionId = next_session_id_mvar } data PostInputMechanism = PostInputMechanism (Chan (Maybe B.ByteString), InputDataStream) -- NH2.Frame != Frame data SessionData = SessionData { -- ATTENTION: Ignore the warning coming from here for now _sessionsContext :: SessionsContext ,_sessionInput :: Chan (Either SessionInputCommand InputFrame) -- We need to lock this channel occassionally so that we can order multiple -- header frames properly.... ,_sessionOutput :: MVar (Chan (Either SessionOutputCommand OutputFrame)) -- Use to encode ,_toEncodeHeaders :: MVar HP.DynamicTable -- And used to decode ,_toDecodeHeaders :: MVar HP.DynamicTable -- Used for decoding the headers ,_stream2HeaderBlockFragment :: Stream2HeaderBlockFragment -- Used for worker threads... this is actually a pre-filled template -- I make copies of it in different contexts, and as needed. ,_forWorkerThread :: WorkerThreadEnvironment ,_coherentWorker :: CoherentWorker -- Some streams may be cancelled ,_streamsCancelled :: MVar NS.IntSet -- Data input mechanism corresponding to some threads ,_stream2PostInputMechanism :: HashTable Int PostInputMechanism -- Worker thread register. This is a dictionary from stream id to -- the ThreadId of the thread with the worker thread. I use this to -- raise asynchronous exceptions in the worker thread if the stream -- is cancelled by the client. This way we get early finalization. ,_stream2WorkerThread :: HashTable Int ThreadId ,_sessionIdAtSession :: Int } makeLenses ''SessionData -- v- {headers table size comes here!!} http2Session :: CoherentWorker -> Int -> SessionsContext -> IO Session http2Session coherent_worker session_id sessions_context = do session_input <- newChan session_output <- newChan session_output_mvar <- newMVar session_output -- For incremental construction of headers... stream_request_headers <- H.new :: IO Stream2HeaderBlockFragment -- Warning: we should find a way of coping with different table sizes. decode_headers_table <- HP.newDynamicTableForDecoding 4096 decode_headers_table_mvar <- newMVar decode_headers_table encode_headers_table <- HP.newDynamicTableForEncoding 4096 encode_headers_table_mvar <- newMVar encode_headers_table -- These ones need independent threads taking care of sending stuff -- their way... headers_output <- newChan :: IO (Chan (GlobalStreamId, MVar HeadersSent, Headers)) data_output <- newChan :: IO (Chan DataOutputToConveyor) stream2postinputmechanism <- H.new stream2workerthread <- H.new -- What about stream cancellation? cancelled_streams_mvar <- newMVar $ NS.empty :: IO (MVar NS.IntSet) let for_worker_thread = WorkerThreadEnvironment { _streamId = error "NotInitialized" ,_headersOutput = headers_output ,_dataOutput = data_output ,_streamsCancelled_WTE = cancelled_streams_mvar } let session_data = SessionData { _sessionsContext = sessions_context ,_sessionInput = session_input ,_sessionOutput = session_output_mvar ,_toDecodeHeaders = decode_headers_table_mvar ,_toEncodeHeaders = encode_headers_table_mvar ,_stream2HeaderBlockFragment = stream_request_headers ,_forWorkerThread = for_worker_thread ,_coherentWorker = coherent_worker ,_streamsCancelled = cancelled_streams_mvar ,_stream2PostInputMechanism = stream2postinputmechanism ,_stream2WorkerThread = stream2workerthread ,_sessionIdAtSession = session_id } let exc_handler :: SessionComponent -> HTTP2SessionException -> IO () exc_handler component e = sessionExceptionHandler component session_id sessions_context e exc_guard :: SessionComponent -> IO () -> IO () exc_guard component action = E.catch action $ exc_handler component -- Create an input thread that decodes frames... forkIO $ exc_guard SessionInputThread_SessionComponent $ runReaderT sessionInputThread session_data -- Create a thread that captures headers and sends them down the tube forkIO $ exc_guard SessionHeadersOutputThread_SessionComponent $ runReaderT (headersOutputThread headers_output session_output_mvar) session_data -- Create a thread that captures data and sends it down the tube forkIO $ exc_guard SessionDataOutputThread_SessionComponent $ dataOutputThread data_output session_output_mvar -- The two previous threads fill the session_output argument below (they write to it) -- the session machinery in the other end is in charge of sending that data through the -- socket. return ( (SessionInput session_input), (SessionOutput session_output) ) -- TODO: Some ill clients can break this thread with exceptions. Make these paths a bit --- more robust. sessionInputThread :: ReaderT SessionData IO () sessionInputThread = do INSTRUMENTATION( liftIO $ debugM "HTTP2.Session" "Entering sessionInputThread" ) -- This is an introductory and declarative block... all of this is tail-executed -- every time that a packet needs to be processed. It may be a good idea to abstract -- these values in a closure... session_input <- view sessionInput decode_headers_table_mvar <- view toDecodeHeaders stream_request_headers <- view stream2HeaderBlockFragment cancelled_streams_mvar <- view streamsCancelled coherent_worker <- view coherentWorker for_worker_thread_uns <- view forWorkerThread stream2workerthread <- view stream2WorkerThread input <- liftIO $ readChan session_input INSTRUMENTATION( liftIO $ debugM "HTTP2.Session" $ "Got a frame or a command: " ++ (show input) ) case input of Left CancelSession_SIC -> do -- Good place to tear down worker threads... Let the rest of the finalization -- to the framer liftIO $ do H.mapM_ (\ (_, thread_id) -> do throwTo thread_id StreamCancelledException infoM "HTTP2.Session" $ "Stream successfully interrupted" ) stream2workerthread -- We do not continue here, but instead let it finish return () Right frame | Just (stream_id, bytes) <- frameIsHeaderOfStream frame -> do -- Just append the frames to streamRequestHeaders appendHeaderFragmentBlock stream_id bytes if frameEndsHeaders frame then do -- Let's decode the headers let for_worker_thread = set streamId stream_id for_worker_thread_uns headers_bytes <- getHeaderBytes stream_id dyn_table <- liftIO $ takeMVar decode_headers_table_mvar (new_table, header_list ) <- liftIO $ {-# SCC "decodeHeader" #-} HP.decodeHeader dyn_table headers_bytes -- Good moment to remove the headers from the table.... we don't want a space -- leak here liftIO $ do H.delete stream_request_headers stream_id putMVar decode_headers_table_mvar new_table -- TODO: Validate headers, abort session if the headers are invalid. -- Otherwise other invariants will break!! -- THIS IS PROBABLY THE BEST PLACE FOR DOING IT. -- If the headers end the request.... post_data_source <- if not (frameEndsStream frame) then do mechanism <- createMechanismForStream stream_id let source = postDataSourceFromMechanism mechanism return $ Just source else do return Nothing -- I'm clear to start the worker, in its own thread liftIO $ do thread_id <- forkIO $ runReaderT (workerThread (header_list, post_data_source) coherent_worker) for_worker_thread H.insert stream2workerthread stream_id thread_id return () else -- Frame doesn't end the headers... it was added before... so -- probably do nothing return () continue Right frame@(NH2.Frame _ (NH2.RSTStreamFrame error_code_id)) -> do let stream_id = streamIdFromFrame frame liftIO $ do INSTRUMENTATION( infoM "HTTP2.Session" $ "Stream reset: " ++ (show error_code_id) ) cancelled_streams <- takeMVar cancelled_streams_mvar INSTRUMENTATION( infoM "HTTP2.Session" $ "Cancelled stream was: " ++ (show stream_id) ) putMVar cancelled_streams_mvar $ NS.insert stream_id cancelled_streams maybe_thread_id <- H.lookup stream2workerthread stream_id case maybe_thread_id of Nothing -> -- This is actually more like an internal error error "InterruptingUnexistentStream" Just thread_id -> do throwTo thread_id StreamCancelledException INSTRUMENTATION( infoM "HTTP2.Session" $ "Stream successfully interrupted" ) continue Right frame@(NH2.Frame (NH2.FrameHeader _ _ nh2_stream_id) (NH2.DataFrame somebytes)) -> do -- So I got data to process -- TODO: Handle end of stream let stream_id = NH2.fromStreamIdentifier nh2_stream_id -- TODO: Handle the cases where the stream_id doesn't match an already existent -- stream. In such cases it is justified to reset the connection with a protocol_error. streamWorkerSendData stream_id somebytes -- After that data has been received and forwarded downstream, we can issue a windows update -- -- TODO: We can use wider frames to avoid congestion... -- .... and we can also be more compositional with these short bursts of data.... -- -- TODO: Consider that the best place to output these frames can be somewhere else... -- -- TODO: Use a special, with-quota queue here to do flow control. Don't send meaningless -- WindowUpdateFrame's sendOutFrame (NH2.EncodeInfo NH2.defaultFlags nh2_stream_id Nothing ) (NH2.WindowUpdateFrame (fromIntegral (B.length somebytes)) ) sendOutFrame (NH2.EncodeInfo NH2.defaultFlags (NH2.toStreamIdentifier 0) Nothing ) (NH2.WindowUpdateFrame (fromIntegral (B.length somebytes)) ) if frameEndsStream frame then do -- Good place to close the source ... closePostDataSource stream_id else return () continue Right (NH2.Frame (NH2.FrameHeader _ flags _) (NH2.PingFrame _)) | NH2.testAck flags-> do -- Deal with pings: this is an Ack, so do nothing continue Right (NH2.Frame (NH2.FrameHeader _ _ _) (NH2.PingFrame somebytes)) -> do -- Deal with pings: NOT an Ack, so answer INSTRUMENTATION( liftIO $ debugM "HTTP2.Session" "Ping processed" ) sendOutFrame (NH2.EncodeInfo (NH2.setAck NH2.defaultFlags) (NH2.toStreamIdentifier 0) Nothing ) (NH2.PingFrame somebytes) continue Right (NH2.Frame frame_header (NH2.SettingsFrame _)) | isSettingsAck frame_header -> do -- Frame was received by the peer, do nothing here... continue Right (NH2.Frame _ (NH2.SettingsFrame settings_list)) -> do INSTRUMENTATION( liftIO $ debugM "HTTP2.Session" $ "Received settings: " ++ (show settings_list) ) -- Just acknowledge the frame.... for now sendOutFrame (NH2.EncodeInfo (NH2.setAck NH2.defaultFlags) (NH2.toStreamIdentifier 0) Nothing ) (NH2.SettingsFrame []) continue Right somethingelse -> do -- An undhandled case here.... INSTRUMENTATION( liftIO $ errorM "HTTP2.Session" $ "Received problematic frame: " ) liftIO $ errorM "HTTP2.Session" $ ".. " ++ (show somethingelse) continue where continue = sessionInputThread sendOutFrame :: NH2.EncodeInfo -> NH2.FramePayload -> ReaderT SessionData IO () sendOutFrame encode_info payload = do session_output_mvar <- view sessionOutput session_output <- liftIO $ takeMVar session_output_mvar liftIO $ writeChan session_output $ Right (encode_info, payload) liftIO $ putMVar session_output_mvar session_output frameEndsStream :: InputFrame -> Bool frameEndsStream (NH2.Frame (NH2.FrameHeader _ flags _) _) = NH2.testEndStream flags createMechanismForStream :: GlobalStreamId -> ReaderT SessionData IO PostInputMechanism createMechanismForStream stream_id = do (chan, source) <- liftIO $ unfoldChannelAndSource stream2postinputmechanism <- view stream2PostInputMechanism let pim = PostInputMechanism (chan, source) liftIO $ H.insert stream2postinputmechanism stream_id pim return pim -- TODO: Can be optimized by factoring out the mechanism lookup -- TODO IMPORTANT: This is a good place to drop the postinputmechanism -- for a stream, so that unprocessed data can be garbage-collected. closePostDataSource :: GlobalStreamId -> ReaderT SessionData IO () closePostDataSource stream_id = do stream2postinputmechanism <- view stream2PostInputMechanism pim_maybe <- liftIO $ H.lookup stream2postinputmechanism stream_id case pim_maybe of Just (PostInputMechanism (chan, _)) -> liftIO $ writeChan chan Nothing Nothing -> -- TODO: This is a protocol error, handle it properly error "Internal error/closePostDataSource" streamWorkerSendData :: Int -> B.ByteString -> ReaderT SessionData IO () streamWorkerSendData stream_id bytes = do s2pim <- view stream2PostInputMechanism pim_maybe <- liftIO $ H.lookup s2pim stream_id case pim_maybe of Just pim -> sendBytesToPim pim bytes Nothing -> -- This is an internal error, the mechanism should be -- created when the headers end (and if the headers -- do not finish the stream) error "Internal error" sendBytesToPim :: PostInputMechanism -> B.ByteString -> ReaderT SessionData IO () sendBytesToPim (PostInputMechanism (chan, _)) bytes = liftIO $ writeChan chan (Just bytes) postDataSourceFromMechanism :: PostInputMechanism -> InputDataStream postDataSourceFromMechanism (PostInputMechanism (_, source)) = source isSettingsAck :: NH2.FrameHeader -> Bool isSettingsAck (NH2.FrameHeader _ flags _) = NH2.testAck flags isStreamCancelled :: GlobalStreamId -> WorkerMonad Bool isStreamCancelled stream_id = do cancelled_streams_mvar <- view streamsCancelled_WTE cancelled_streams <- liftIO $ readMVar cancelled_streams_mvar return $ NS.member stream_id cancelled_streams workerThread :: Request -> CoherentWorker -> WorkerMonad () workerThread req coherent_worker = do headers_output <- view headersOutput stream_id <- view streamId (headers, _, data_and_conclussion) <- liftIO $ coherent_worker req -- Now I send the headers, if that's possible at all headers_sent <- liftIO $ newEmptyMVar liftIO $ writeChan headers_output (stream_id, headers_sent, headers) -- At this moment I should ask if the stream hasn't been cancelled by the browser before -- commiting to the work of sending addtitional data is_stream_cancelled <- isStreamCancelled stream_id if not is_stream_cancelled then do -- I have a beautiful source that I can de-construct... -- TODO: Optionally pulling data out from a Conduit .... -- liftIO ( data_and_conclussion $$ (_sendDataOfStream stream_id) ) -- -- This threadlet should block here waiting for the headers to finish going (maybe_footers, _) <- runConduit $ (transPipe liftIO data_and_conclussion) `fuseBothMaybe` (sendDataOfStream stream_id headers_sent) -- BIG TODO: Send the footers ... likely stream conclusion semantics -- will need to be changed. return () else return () -- v-- comp. monad. sendDataOfStream :: GlobalStreamId -> MVar HeadersSent -> Sink B.ByteString (ReaderT WorkerThreadEnvironment IO) () sendDataOfStream stream_id headers_sent = do data_output <- view dataOutput -- Wait for all headers sent liftIO $ takeMVar headers_sent consumer data_output where consumer data_output = do maybe_bytes <- await case maybe_bytes of Nothing -> liftIO $ writeChan data_output (stream_id, Nothing) Just bytes -> do liftIO $ writeChan data_output (stream_id, Just bytes) consumer data_output appendHeaderFragmentBlock :: GlobalStreamId -> B.ByteString -> ReaderT SessionData IO () appendHeaderFragmentBlock global_stream_id bytes = do ht <- view stream2HeaderBlockFragment maybe_old_block <- liftIO $ H.lookup ht global_stream_id new_block <- case maybe_old_block of Nothing -> do INSTRUMENTATION( liftIO $ infoM "HTTP2.Session" $ "Starting stream " ++ (show global_stream_id) ) return $ Bu.byteString bytes Just something -> return $ something `mappend` (Bu.byteString bytes) liftIO $ H.insert ht global_stream_id new_block getHeaderBytes :: GlobalStreamId -> ReaderT SessionData IO B.ByteString getHeaderBytes global_stream_id = do ht <- view stream2HeaderBlockFragment Just bytes <- liftIO $ H.lookup ht global_stream_id return $ Bl.toStrict $ Bu.toLazyByteString bytes frameIsHeaderOfStream :: InputFrame -> Maybe (GlobalStreamId, B.ByteString) frameIsHeaderOfStream (NH2.Frame (NH2.FrameHeader _ _ stream_id) ( NH2.HeadersFrame _ block_fragment ) ) = Just (NH2.fromStreamIdentifier stream_id, block_fragment) frameIsHeaderOfStream (NH2.Frame (NH2.FrameHeader _ _ stream_id) ( NH2.ContinuationFrame block_fragment) ) = Just (NH2.fromStreamIdentifier stream_id, block_fragment) frameIsHeaderOfStream _ = Nothing frameEndsHeaders :: InputFrame -> Bool frameEndsHeaders (NH2.Frame (NH2.FrameHeader _ flags _) _) = NH2.testEndHeader flags streamIdFromFrame :: InputFrame -> GlobalStreamId streamIdFromFrame (NH2.Frame (NH2.FrameHeader _ _ stream_id) _) = NH2.fromStreamIdentifier stream_id -- TODO: Have different size for the headers..... just now going with a default size of 16 k... -- TODO: Find a way to kill this thread.... headersOutputThread :: Chan (GlobalStreamId, MVar HeadersSent, Headers) -> MVar (Chan (Either SessionOutputCommand OutputFrame)) -> ReaderT SessionData IO () headersOutputThread input_chan session_output_mvar = forever $ do (stream_id, headers_ready_mvar, headers) <- liftIO $ readChan input_chan -- liftIO $ putStrLn $ "Output headers: " ++ (show headers) -- First encode the headers using the table encode_dyn_table_mvar <- view toEncodeHeaders encode_dyn_table <- liftIO $ takeMVar encode_dyn_table_mvar (new_dyn_table, data_to_send ) <- liftIO $ HP.encodeHeader HP.defaultEncodeStrategy encode_dyn_table headers liftIO $ putMVar encode_dyn_table_mvar new_dyn_table -- Now split the bytestring in chunks of the needed size.... bs_chunks <- return $! bytestringChunk useChunkLength data_to_send -- And send the chunks through while locking the output place.... liftIO $ E.bracket (takeMVar session_output_mvar) (putMVar session_output_mvar ) (\ session_output -> do writeIndividualHeaderFrames session_output stream_id bs_chunks True -- And say that the headers for this thread are out INSTRUMENTATION( infoM "HTTP2.Session" $ "Headers were output for stream " ++ (show stream_id) ) putMVar headers_ready_mvar HeadersSent ) where writeIndividualHeaderFrames :: Chan (Either SessionOutputCommand OutputFrame) -> GlobalStreamId -> [B.ByteString] -> Bool -> IO () writeIndividualHeaderFrames session_output stream_id (last_fragment:[]) is_first = writeChan session_output $ Right ( NH2.EncodeInfo { NH2.encodeFlags = NH2.setEndHeader NH2.defaultFlags ,NH2.encodeStreamId = NH2.toStreamIdentifier stream_id ,NH2.encodePadding = Nothing }, (if is_first then NH2.HeadersFrame Nothing last_fragment else NH2.ContinuationFrame last_fragment) ) writeIndividualHeaderFrames session_output stream_id (fragment:xs) is_first = do writeChan session_output $ Right ( NH2.EncodeInfo { NH2.encodeFlags = NH2.defaultFlags ,NH2.encodeStreamId = NH2.toStreamIdentifier stream_id ,NH2.encodePadding = Nothing }, (if is_first then NH2.HeadersFrame Nothing fragment else NH2.ContinuationFrame fragment) ) writeIndividualHeaderFrames session_output stream_id xs False bytestringChunk :: Int -> B.ByteString -> [B.ByteString] bytestringChunk len s | (B.length s) < len = [ s ] bytestringChunk len s = h:(bytestringChunk len xs) where (h, xs) = B.splitAt len s -- TODO: find a clean way to finish this thread (maybe with negative stream ids?) -- TODO: This function does non-optimal chunking for the case where responses are -- actually streamed.... in those cases we need to keep state for frames in -- some other format.... -- TODO: Right now, we are transmitting an empty last frame with the end-of-stream -- flag set. I'm afraid that the only -- way to avoid that is by holding a frame or by augmenting the end-user interface -- so that the user can signal which one is the last frame. The first approach -- restricts responsiviness, the second one clutters things. dataOutputThread :: Chan DataOutputToConveyor -> MVar (Chan (Either SessionOutputCommand OutputFrame)) -> IO () dataOutputThread input_chan session_output_mvar = forever $ do (stream_id, maybe_contents) <- readChan input_chan case maybe_contents of Nothing -> do liftIO $ do INSTRUMENTATION( debugM "HTTP2.Session" "End-of-stream flag set " ) withLockedSessionOutput (\ session_output -> writeChan session_output $ Right ( NH2.EncodeInfo { NH2.encodeFlags = NH2.setEndStream NH2.defaultFlags ,NH2.encodeStreamId = NH2.toStreamIdentifier stream_id ,NH2.encodePadding = Nothing }, NH2.DataFrame "" ) ) Just contents -> do -- And now just simply output it... let bs_chunks = bytestringChunk useChunkLength $! contents -- And send the chunks through while locking the output place.... writeContinuations bs_chunks stream_id where withLockedSessionOutput = E.bracket (takeMVar session_output_mvar) (putMVar session_output_mvar) -- <-- There is an implicit argument there!! writeContinuations :: [B.ByteString] -> GlobalStreamId -> IO () writeContinuations fragments stream_id = mapM_ (\ fragment -> withLockedSessionOutput (\ session_output -> writeChan session_output $ Right ( NH2.EncodeInfo { NH2.encodeFlags = NH2.defaultFlags ,NH2.encodeStreamId = NH2.toStreamIdentifier stream_id ,NH2.encodePadding = Nothing }, NH2.DataFrame fragment ) ) ) fragments sessionExceptionHandler :: E.Exception e => SessionComponent -> Int -> SessionsContext -> e -> IO () sessionExceptionHandler session_component session_id sessions_context e = do let getit = ( sessionsConfig . sessionsCallbacks . reportErrorCallback ) maybe_error_callback = sessions_context ^. getit error_tuple = ( session_component, SessionCoordinates session_id, E.toException e ) case maybe_error_callback of Nothing -> errorM "HTTP2.Session" (show (e)) Just callback -> callback error_tuple