module SecondTransfer.Http2.Session(
http2Session
,getFrameFromSession
,sendFirstFrameToSession
,sendMiddleFrameToSession
,sendCommandToSession
,CoherentSession
,SessionInput(..)
,SessionInputCommand(..)
,SessionOutput(..)
,SessionOutputCommand(..)
,SessionCoordinates(..)
,SessionComponent(..)
,SessionsCallbacks(..)
,SessionsConfig(..)
,ErrorCallback
,OutputFrame
,InputFrame
) where
#include "Logging.cpphs"
import Control.Concurrent (ThreadId, forkIO)
import Control.Concurrent.Chan
import Control.Exception (throwTo)
import qualified Control.Exception as E
import Control.Monad (forever)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.Trans.Reader
import Control.Concurrent.MVar
import qualified Data.ByteString as B
import qualified Data.ByteString.Builder as Bu
import qualified Data.ByteString.Lazy as Bl
import Data.Conduit
import qualified Data.HashTable.IO as H
import qualified Data.IntSet as NS
import Data.Maybe (isJust)
#ifndef IMPLICIT_MONOID
import Data.Monoid (mappend)
#endif
import Control.Lens
import qualified Network.HPACK as HP
import qualified Network.HTTP2 as NH2
import System.Log.Logger
import SecondTransfer.MainLoop.CoherentWorker
import SecondTransfer.MainLoop.Tokens
import SecondTransfer.Sessions.Config
import SecondTransfer.Sessions.Internal (sessionExceptionHandler, SessionsContext, sessionsConfig)
import SecondTransfer.Utils (unfoldChannelAndSource)
import SecondTransfer.Exception
import qualified SecondTransfer.Utils.HTTPHeaders as He
import SecondTransfer.MainLoop.Logging (logWithExclusivity)
type OutputFrame = (NH2.EncodeInfo, NH2.FramePayload)
type InputFrame = NH2.Frame
useChunkLength :: Int
useChunkLength = 16384
data HeadersSent = HeadersSent
type DataOutputToConveyor = (GlobalStreamId, Maybe B.ByteString)
data WorkerThreadEnvironment = WorkerThreadEnvironment {
_streamId :: GlobalStreamId
, _headersOutput :: Chan (GlobalStreamId, MVar HeadersSent, Headers)
,_dataOutput :: Chan DataOutputToConveyor
,_streamsCancelled_WTE :: MVar NS.IntSet
}
makeLenses ''WorkerThreadEnvironment
type Session = (SessionInput, SessionOutput)
newtype SessionInput = SessionInput ( Chan SessionInputCommand )
sendMiddleFrameToSession :: SessionInput -> InputFrame -> IO ()
sendMiddleFrameToSession (SessionInput chan) frame = writeChan chan $ MiddleFrame_SIC frame
sendFirstFrameToSession :: SessionInput -> InputFrame -> IO ()
sendFirstFrameToSession (SessionInput chan) frame = writeChan chan $ FirstFrame_SIC frame
sendCommandToSession :: SessionInput -> SessionInputCommand -> IO ()
sendCommandToSession (SessionInput chan) command = writeChan chan command
newtype SessionOutput = SessionOutput ( Chan (Either SessionOutputCommand OutputFrame) )
getFrameFromSession :: SessionOutput -> IO (Either SessionOutputCommand OutputFrame)
getFrameFromSession (SessionOutput chan) = readChan chan
type HashTable k v = H.CuckooHashTable k v
type Stream2HeaderBlockFragment = HashTable GlobalStreamId Bu.Builder
type WorkerMonad = ReaderT WorkerThreadEnvironment IO
data SessionInputCommand =
FirstFrame_SIC InputFrame
|MiddleFrame_SIC InputFrame
|InternalAbort_SIC
|CancelSession_SIC
deriving Show
data SessionOutputCommand =
CancelSession_SOC
deriving Show
type SessionMaker = SessionsContext -> IO Session
type CoherentSession = CoherentWorker -> SessionMaker
data PostInputMechanism = PostInputMechanism (Chan (Maybe B.ByteString), InputDataStream)
data SessionSettings = SessionSettings {
_pushEnabled :: Bool
}
makeLenses ''SessionSettings
data SessionData = SessionData {
_sessionsContext :: SessionsContext
,_sessionInput :: Chan SessionInputCommand
,_sessionOutput :: MVar (Chan (Either SessionOutputCommand OutputFrame))
,_toEncodeHeaders :: MVar HP.DynamicTable
,_toDecodeHeaders :: MVar HP.DynamicTable
,_receivingHeaders :: MVar (Maybe Int)
,_lastGoodStream :: MVar Int
,_stream2HeaderBlockFragment :: Stream2HeaderBlockFragment
,_forWorkerThread :: WorkerThreadEnvironment
,_coherentWorker :: CoherentWorker
,_streamsCancelled :: MVar NS.IntSet
,_stream2PostInputMechanism :: HashTable Int PostInputMechanism
,_stream2WorkerThread :: HashTable Int ThreadId
,_sessionIdAtSession :: Int
,_sessionSettings :: MVar SessionSettings
}
makeLenses ''SessionData
http2Session :: CoherentWorker -> Int -> SessionsContext -> IO Session
http2Session coherent_worker session_id sessions_context = do
session_input <- newChan
session_output <- newChan
session_output_mvar <- newMVar session_output
stream_request_headers <- H.new :: IO Stream2HeaderBlockFragment
decode_headers_table <- HP.newDynamicTableForDecoding 4096
decode_headers_table_mvar <- newMVar decode_headers_table
encode_headers_table <- HP.newDynamicTableForEncoding 4096
encode_headers_table_mvar <- newMVar encode_headers_table
headers_output <- newChan :: IO (Chan (GlobalStreamId, MVar HeadersSent, Headers))
data_output <- newChan :: IO (Chan DataOutputToConveyor)
stream2postinputmechanism <- H.new
stream2workerthread <- H.new
last_good_stream_mvar <- newMVar (1)
receiving_headers <- newMVar Nothing
session_settings <- newMVar $ SessionSettings { _pushEnabled = True }
cancelled_streams_mvar <- newMVar $ NS.empty :: IO (MVar NS.IntSet)
let for_worker_thread = WorkerThreadEnvironment {
_streamId = error "NotInitialized"
,_headersOutput = headers_output
,_dataOutput = data_output
,_streamsCancelled_WTE = cancelled_streams_mvar
}
let session_data = SessionData {
_sessionsContext = sessions_context
,_sessionInput = session_input
,_sessionOutput = session_output_mvar
,_toDecodeHeaders = decode_headers_table_mvar
,_toEncodeHeaders = encode_headers_table_mvar
,_stream2HeaderBlockFragment = stream_request_headers
,_forWorkerThread = for_worker_thread
,_coherentWorker = coherent_worker
,_streamsCancelled = cancelled_streams_mvar
,_stream2PostInputMechanism = stream2postinputmechanism
,_stream2WorkerThread = stream2workerthread
,_sessionIdAtSession = session_id
,_receivingHeaders = receiving_headers
,_sessionSettings = session_settings
,_lastGoodStream = last_good_stream_mvar
}
let
exc_handler :: SessionComponent -> HTTP2SessionException -> IO ()
exc_handler component e = sessionExceptionHandler component session_id sessions_context e
exc_guard :: SessionComponent -> IO () -> IO ()
exc_guard component action = E.catch
action
(\e -> do
INSTRUMENTATION( errorM "HTTP2.Session" "Exception processed" )
exc_handler component e
)
forkIO $ exc_guard SessionInputThread_HTTP2SessionComponent
$ runReaderT sessionInputThread session_data
forkIO $ exc_guard SessionHeadersOutputThread_HTTP2SessionComponent
$ runReaderT (headersOutputThread headers_output session_output_mvar) session_data
forkIO $ exc_guard SessionDataOutputThread_HTTP2SessionComponent
$ dataOutputThread data_output session_output_mvar
return ( (SessionInput session_input),
(SessionOutput session_output) )
sessionInputThread :: ReaderT SessionData IO ()
sessionInputThread = do
INSTRUMENTATION( debugM "HTTP2.Session" "Entering sessionInputThread" )
session_input <- view sessionInput
decode_headers_table_mvar <- view toDecodeHeaders
stream_request_headers <- view stream2HeaderBlockFragment
cancelled_streams_mvar <- view streamsCancelled
coherent_worker <- view coherentWorker
for_worker_thread_uns <- view forWorkerThread
stream2workerthread <- view stream2WorkerThread
receiving_headers_mvar <- view receivingHeaders
last_good_stream_mvar <- view lastGoodStream
input <- liftIO $ readChan session_input
case input of
FirstFrame_SIC (NH2.Frame
(NH2.FrameHeader _ 1 null_stream_id ) _ )| NH2.toStreamIdentifier 0 == null_stream_id -> do
continue
FirstFrame_SIC
(NH2.Frame
(NH2.FrameHeader _ 0 null_stream_id )
(NH2.SettingsFrame settings_list)
) | NH2.toStreamIdentifier 0 == null_stream_id -> do
handleSettingsFrame settings_list
continue
FirstFrame_SIC _ -> do
closeConnectionBecauseIsInvalid NH2.ProtocolError
return ()
CancelSession_SIC -> do
liftIO $ do
H.mapM_
(\ (_, thread_id) -> do
throwTo thread_id StreamCancelledException
return ()
)
stream2workerthread
return ()
InternalAbort_SIC -> do
closeConnectionBecauseIsInvalid NH2.InternalError
return ()
MiddleFrame_SIC frame | Just (stream_id, bytes) <- isAboutHeaders frame -> do
opens_stream <- appendHeaderFragmentBlock stream_id bytes
if opens_stream
then do
maybe_rcv_headers_of <- liftIO $ takeMVar receiving_headers_mvar
case maybe_rcv_headers_of of
Just _ -> do
closeConnectionBecauseIsInvalid NH2.ProtocolError
Nothing -> do
liftIO $ putMVar receiving_headers_mvar (Just stream_id)
last_good_stream <- liftIO $ takeMVar last_good_stream_mvar
if (odd stream_id ) && (stream_id > last_good_stream)
then do
liftIO $ putMVar last_good_stream_mvar (stream_id)
else do
INSTRUMENTATION( errorM "HTTP2.Session" "Protocol error: bad stream id")
closeConnectionBecauseIsInvalid NH2.ProtocolError
else do
maybe_rcv_headers_of <- liftIO $ takeMVar receiving_headers_mvar
case maybe_rcv_headers_of of
Just a_stream_id | a_stream_id == stream_id -> do
liftIO $ putMVar receiving_headers_mvar maybe_rcv_headers_of
Nothing -> error "InternalError, this should be set"
if frameEndsHeaders frame then
do
liftIO $ modifyMVar_
receiving_headers_mvar
(\ _ -> return Nothing )
let for_worker_thread = set streamId stream_id for_worker_thread_uns
headers_bytes <- getHeaderBytes stream_id
dyn_table <- liftIO $ takeMVar decode_headers_table_mvar
(new_table, header_list ) <- liftIO $ HP.decodeHeader dyn_table headers_bytes
liftIO $ do
H.delete stream_request_headers stream_id
putMVar decode_headers_table_mvar new_table
let
headers_editor = He.fromList header_list
maybe_good_headers_editor <- validateIncomingHeaders headers_editor
good_headers <- case maybe_good_headers_editor of
Just yes_they_are_good -> return yes_they_are_good
Nothing -> closeConnectionBecauseIsInvalid NH2.ProtocolError
headers_extra_good <- addExtraHeaders good_headers
let
header_list_after = He.toList headers_extra_good
post_data_source <- if not (frameEndsStream frame)
then do
mechanism <- createMechanismForStream stream_id
let source = postDataSourceFromMechanism mechanism
return $ Just source
else do
return Nothing
liftIO $ do
thread_id <- forkIO $ E.catch
(runReaderT
(workerThread (header_list_after, post_data_source) coherent_worker)
for_worker_thread
)
(
( \ _ -> do
writeChan session_input InternalAbort_SIC
)
:: HTTP500PrecursorException -> IO ()
)
H.insert stream2workerthread stream_id thread_id
return ()
else
return ()
continue
MiddleFrame_SIC frame@(NH2.Frame _ (NH2.RSTStreamFrame _error_code_id)) -> do
let stream_id = streamIdFromFrame frame
liftIO $ do
INSTRUMENTATION( infoM "HTTP2.Session" $ "Stream reset: " ++ (show _error_code_id) )
cancelled_streams <- takeMVar cancelled_streams_mvar
INSTRUMENTATION( infoM "HTTP2.Session" $ "Cancelled stream was: " ++ (show stream_id) )
putMVar cancelled_streams_mvar $ NS.insert stream_id cancelled_streams
maybe_thread_id <- H.lookup stream2workerthread stream_id
case maybe_thread_id of
Nothing ->
error "InterruptingUnexistentStream"
Just thread_id -> do
INSTRUMENTATION( infoM "HTTP2.Session" $ "Stream successfully interrupted" )
throwTo thread_id StreamCancelledException
continue
MiddleFrame_SIC frame@(NH2.Frame (NH2.FrameHeader _ _ nh2_stream_id) (NH2.DataFrame somebytes))
-> unlessReceivingHeaders $ do
let stream_id = NH2.fromStreamIdentifier nh2_stream_id
streamWorkerSendData stream_id somebytes
sendOutFrame
(NH2.EncodeInfo
NH2.defaultFlags
nh2_stream_id
Nothing
)
(NH2.WindowUpdateFrame
(fromIntegral (B.length somebytes))
)
sendOutFrame
(NH2.EncodeInfo
NH2.defaultFlags
(NH2.toStreamIdentifier 0)
Nothing
)
(NH2.WindowUpdateFrame
(fromIntegral (B.length somebytes))
)
if frameEndsStream frame
then do
closePostDataSource stream_id
else
return ()
continue
MiddleFrame_SIC (NH2.Frame (NH2.FrameHeader _ flags _) (NH2.PingFrame _)) | NH2.testAck flags-> do
continue
MiddleFrame_SIC (NH2.Frame (NH2.FrameHeader _ _ _) (NH2.PingFrame somebytes)) -> do
INSTRUMENTATION( debugM "HTTP2.Session" "Ping processed" )
sendOutFrame
(NH2.EncodeInfo
(NH2.setAck NH2.defaultFlags)
(NH2.toStreamIdentifier 0)
Nothing
)
(NH2.PingFrame somebytes)
continue
MiddleFrame_SIC (NH2.Frame frame_header (NH2.SettingsFrame _)) | isSettingsAck frame_header -> do
continue
MiddleFrame_SIC (NH2.Frame _ (NH2.SettingsFrame settings_list)) -> do
INSTRUMENTATION( debugM "HTTP2.Session" $ "Received settings: " ++ (show settings_list) )
handleSettingsFrame settings_list
continue
MiddleFrame_SIC somethingelse -> unlessReceivingHeaders $ do
INSTRUMENTATION( errorM "HTTP2.Session" $ "Received problematic frame: " )
INSTRUMENTATION( errorM "HTTP2.Session" $ ".. " ++ (show somethingelse) )
continue
where
continue = sessionInputThread
handleSettingsFrame :: NH2.SettingsList -> ReaderT SessionData IO ()
handleSettingsFrame _settings_list =
sendOutFrame
(NH2.EncodeInfo
(NH2.setAck NH2.defaultFlags)
(NH2.toStreamIdentifier 0)
Nothing )
(NH2.SettingsFrame [])
sendOutFrame :: NH2.EncodeInfo -> NH2.FramePayload -> ReaderT SessionData IO ()
sendOutFrame encode_info payload = do
session_output_mvar <- view sessionOutput
session_output <- liftIO $ takeMVar session_output_mvar
liftIO $ writeChan session_output $ Right (encode_info, payload)
liftIO $ putMVar session_output_mvar session_output
addExtraHeaders :: He.HeaderEditor -> ReaderT SessionData IO He.HeaderEditor
addExtraHeaders headers_editor = do
let
enriched_lens = (sessionsContext . sessionsConfig .sessionsEnrichedHeaders )
protocol_lens = He.headerLens "second-transfer-eh--used-protocol"
add_used_protocol <- view (enriched_lens . addUsedProtocol )
let
he1 = if add_used_protocol
then set protocol_lens (Just "HTTP/2") headers_editor
else headers_editor
if add_used_protocol
then return he1
else return headers_editor
validateIncomingHeaders :: He.HeaderEditor -> ReaderT SessionData IO (Maybe He.HeaderEditor)
validateIncomingHeaders headers_editor = do
let
h1 = He.replaceHostByAuthority headers_editor
headers_are_lowercase = He.headersAreLowercaseAtHeaderEditor headers_editor
maybe_authority = h1 ^. (He.headerLens ":authority")
maybe_method = h1 ^. (He.headerLens ":method")
maybe_scheme = h1 ^. (He.headerLens ":scheme")
maybe_path = h1 ^. (He.headerLens ":path")
if
(isJust maybe_authority) &&
(isJust maybe_method) &&
(isJust maybe_scheme) &&
(isJust maybe_path )
then
return (Just h1)
else
return Nothing
closeConnectionBecauseIsInvalid :: NH2.ErrorCodeId -> ReaderT SessionData IO a
closeConnectionBecauseIsInvalid error_code = do
last_good_stream_mvar <- view lastGoodStream
last_good_stream <- liftIO $ takeMVar last_good_stream_mvar
session_output_mvar <- view sessionOutput
stream2workerthread <- view stream2WorkerThread
sendOutFrame
(NH2.EncodeInfo
NH2.defaultFlags
(NH2.toStreamIdentifier 0)
Nothing
)
(NH2.GoAwayFrame
(NH2.toStreamIdentifier last_good_stream)
error_code
""
)
liftIO $ do
H.mapM_
( \(_stream_id, thread_id) ->
throwTo thread_id StreamCancelledException
)
stream2workerthread
session_output <- takeMVar session_output_mvar
writeChan session_output $ Left CancelSession_SOC
putMVar session_output_mvar session_output
E.throw HTTP2ProtocolException
frameEndsStream :: InputFrame -> Bool
frameEndsStream (NH2.Frame (NH2.FrameHeader _ flags _) _) = NH2.testEndStream flags
unlessReceivingHeaders :: ReaderT SessionData IO a -> ReaderT SessionData IO a
unlessReceivingHeaders comp = do
receiving_headers_mvar <- view receivingHeaders
maybe_recv_headers <- liftIO $ readMVar receiving_headers_mvar
if isJust maybe_recv_headers
then
closeConnectionBecauseIsInvalid NH2.ProtocolError
else
comp
createMechanismForStream :: GlobalStreamId -> ReaderT SessionData IO PostInputMechanism
createMechanismForStream stream_id = do
(chan, source) <- liftIO $ unfoldChannelAndSource
stream2postinputmechanism <- view stream2PostInputMechanism
let pim = PostInputMechanism (chan, source)
liftIO $ H.insert stream2postinputmechanism stream_id pim
return pim
closePostDataSource :: GlobalStreamId -> ReaderT SessionData IO ()
closePostDataSource stream_id = do
stream2postinputmechanism <- view stream2PostInputMechanism
pim_maybe <- liftIO $ H.lookup stream2postinputmechanism stream_id
case pim_maybe of
Just (PostInputMechanism (chan, _)) ->
liftIO $ writeChan chan Nothing
Nothing ->
error "Internal error/closePostDataSource"
streamWorkerSendData :: Int -> B.ByteString -> ReaderT SessionData IO ()
streamWorkerSendData stream_id bytes = do
s2pim <- view stream2PostInputMechanism
pim_maybe <- liftIO $ H.lookup s2pim stream_id
case pim_maybe of
Just pim ->
sendBytesToPim pim bytes
Nothing ->
error "Internal error"
sendBytesToPim :: PostInputMechanism -> B.ByteString -> ReaderT SessionData IO ()
sendBytesToPim (PostInputMechanism (chan, _)) bytes =
liftIO $ writeChan chan (Just bytes)
postDataSourceFromMechanism :: PostInputMechanism -> InputDataStream
postDataSourceFromMechanism (PostInputMechanism (_, source)) = source
isSettingsAck :: NH2.FrameHeader -> Bool
isSettingsAck (NH2.FrameHeader _ flags _) =
NH2.testAck flags
isStreamCancelled :: GlobalStreamId -> WorkerMonad Bool
isStreamCancelled stream_id = do
cancelled_streams_mvar <- view streamsCancelled_WTE
cancelled_streams <- liftIO $ readMVar cancelled_streams_mvar
return $ NS.member stream_id cancelled_streams
sendPrimitive500Error :: IO PrincipalStream
sendPrimitive500Error =
return (
[
(":status", "500")
],
[],
do
yield "Internal server error\n"
return []
)
workerThread :: Request -> CoherentWorker -> WorkerMonad ()
workerThread req coherent_worker =
do
headers_output <- view headersOutput
stream_id <- view streamId
(headers, _, data_and_conclussion) <-
liftIO $ E.catch
( do
(h, x, d) <- coherent_worker req
return $! (h,x,d)
)
(
(\ _ -> sendPrimitive500Error )
:: HTTP500PrecursorException -> IO (Headers, PushedStreams, DataAndConclusion)
)
headers_sent <- liftIO $ newEmptyMVar
liftIO $ writeChan headers_output (stream_id, headers_sent, headers)
is_stream_cancelled <- isStreamCancelled stream_id
if not is_stream_cancelled
then do
(_maybe_footers, _) <- runConduit $
(transPipe liftIO data_and_conclussion)
`fuseBothMaybe`
(sendDataOfStream stream_id headers_sent)
return ()
else
return ()
sendDataOfStream :: GlobalStreamId -> MVar HeadersSent -> Sink B.ByteString (ReaderT WorkerThreadEnvironment IO) ()
sendDataOfStream stream_id headers_sent = do
data_output <- view dataOutput
liftIO $ takeMVar headers_sent
consumer data_output
where
consumer data_output = do
maybe_bytes <- await
case maybe_bytes of
Nothing ->
liftIO $ writeChan data_output (stream_id, Nothing)
Just bytes -> do
liftIO $ writeChan data_output (stream_id, Just bytes)
consumer data_output
appendHeaderFragmentBlock :: GlobalStreamId -> B.ByteString -> ReaderT SessionData IO Bool
appendHeaderFragmentBlock global_stream_id bytes = do
ht <- view stream2HeaderBlockFragment
maybe_old_block <- liftIO $ H.lookup ht global_stream_id
(new_block, new_stream) <- case maybe_old_block of
Nothing -> do
return $ (Bu.byteString bytes, True)
Just something ->
return $ (something `mappend` (Bu.byteString bytes), False)
liftIO $ H.insert ht global_stream_id new_block
return new_stream
getHeaderBytes :: GlobalStreamId -> ReaderT SessionData IO B.ByteString
getHeaderBytes global_stream_id = do
ht <- view stream2HeaderBlockFragment
Just bytes <- liftIO $ H.lookup ht global_stream_id
return $ Bl.toStrict $ Bu.toLazyByteString bytes
isAboutHeaders :: InputFrame -> Maybe (GlobalStreamId, B.ByteString)
isAboutHeaders (NH2.Frame (NH2.FrameHeader _ _ stream_id) ( NH2.HeadersFrame _ block_fragment ) )
= Just (NH2.fromStreamIdentifier stream_id, block_fragment)
isAboutHeaders (NH2.Frame (NH2.FrameHeader _ _ stream_id) ( NH2.ContinuationFrame block_fragment) )
= Just (NH2.fromStreamIdentifier stream_id, block_fragment)
isAboutHeaders _
= Nothing
frameEndsHeaders :: InputFrame -> Bool
frameEndsHeaders (NH2.Frame (NH2.FrameHeader _ flags _) _) = NH2.testEndHeader flags
streamIdFromFrame :: InputFrame -> GlobalStreamId
streamIdFromFrame (NH2.Frame (NH2.FrameHeader _ _ stream_id) _) = NH2.fromStreamIdentifier stream_id
headersOutputThread :: Chan (GlobalStreamId, MVar HeadersSent, Headers)
-> MVar (Chan (Either SessionOutputCommand OutputFrame))
-> ReaderT SessionData IO ()
headersOutputThread input_chan session_output_mvar = forever $ do
(stream_id, headers_ready_mvar, headers) <- liftIO $ readChan input_chan
encode_dyn_table_mvar <- view toEncodeHeaders
encode_dyn_table <- liftIO $ takeMVar encode_dyn_table_mvar
(new_dyn_table, data_to_send ) <- liftIO $ HP.encodeHeader HP.defaultEncodeStrategy encode_dyn_table headers
liftIO $ putMVar encode_dyn_table_mvar new_dyn_table
bs_chunks <- return $! bytestringChunk useChunkLength data_to_send
liftIO $ E.bracket
(takeMVar session_output_mvar)
(putMVar session_output_mvar )
(\ session_output -> do
writeIndividualHeaderFrames session_output stream_id bs_chunks True
putMVar headers_ready_mvar HeadersSent
)
where
writeIndividualHeaderFrames ::
Chan (Either SessionOutputCommand OutputFrame)
-> GlobalStreamId
-> [B.ByteString]
-> Bool
-> IO ()
writeIndividualHeaderFrames session_output stream_id (last_fragment:[]) is_first =
writeChan session_output $ Right ( NH2.EncodeInfo {
NH2.encodeFlags = NH2.setEndHeader NH2.defaultFlags
,NH2.encodeStreamId = NH2.toStreamIdentifier stream_id
,NH2.encodePadding = Nothing },
(if is_first then NH2.HeadersFrame Nothing last_fragment else NH2.ContinuationFrame last_fragment)
)
writeIndividualHeaderFrames session_output stream_id (fragment:xs) is_first = do
writeChan session_output $ Right ( NH2.EncodeInfo {
NH2.encodeFlags = NH2.defaultFlags
,NH2.encodeStreamId = NH2.toStreamIdentifier stream_id
,NH2.encodePadding = Nothing },
(if is_first then NH2.HeadersFrame Nothing fragment else NH2.ContinuationFrame fragment)
)
writeIndividualHeaderFrames session_output stream_id xs False
bytestringChunk :: Int -> B.ByteString -> [B.ByteString]
bytestringChunk len s | (B.length s) < len = [ s ]
bytestringChunk len s = h:(bytestringChunk len xs)
where
(h, xs) = B.splitAt len s
dataOutputThread :: Chan DataOutputToConveyor
-> MVar (Chan (Either SessionOutputCommand OutputFrame))
-> IO ()
dataOutputThread input_chan session_output_mvar = forever $ do
(stream_id, maybe_contents) <- readChan input_chan
case maybe_contents of
Nothing -> do
liftIO $ do
withLockedSessionOutput
(\ session_output -> writeChan session_output $ Right ( NH2.EncodeInfo {
NH2.encodeFlags = NH2.setEndStream NH2.defaultFlags
,NH2.encodeStreamId = NH2.toStreamIdentifier stream_id
,NH2.encodePadding = Nothing },
NH2.DataFrame ""
)
)
Just contents -> do
let bs_chunks = bytestringChunk useChunkLength $! contents
writeContinuations bs_chunks stream_id
where
withLockedSessionOutput = E.bracket
(takeMVar session_output_mvar)
(putMVar session_output_mvar)
writeContinuations :: [B.ByteString] -> GlobalStreamId -> IO ()
writeContinuations fragments stream_id = mapM_ (\ fragment ->
withLockedSessionOutput (\ session_output -> writeChan session_output $ Right ( NH2.EncodeInfo {
NH2.encodeFlags = NH2.defaultFlags
,NH2.encodeStreamId = NH2.toStreamIdentifier stream_id
,NH2.encodePadding = Nothing },
NH2.DataFrame fragment ) )
) fragments