module SecondTransfer.Http2.Session(
http2Session
,getFrameFromSession
,sendFirstFrameToSession
,sendMiddleFrameToSession
,sendCommandToSession
,CoherentSession
,SessionInput(..)
,SessionInputCommand(..)
,SessionOutput(..)
,SessionOutputCommand(..)
,SessionCoordinates(..)
,SessionComponent(..)
,SessionsCallbacks(..)
,SessionsConfig(..)
,ErrorCallback
,OutputFrame
,InputFrame
) where
#include "Logging.cpphs"
import Control.Concurrent (ThreadId, forkIO, threadDelay)
import Control.Concurrent.Chan
import Control.Exception (throwTo)
import qualified Control.Exception as E
import Control.Monad (forever,unless, when, mapM_, forM, forM_)
import Control.Monad.IO.Class (liftIO)
import Control.DeepSeq ( ($!!), deepseq )
import Control.Monad.Trans.Reader
import Control.Concurrent.MVar
import qualified Data.ByteString as B
import Data.ByteString.Char8 (pack,unpack)
import qualified Data.ByteString.Builder as Bu
import qualified Data.ByteString.Lazy as Bl
import Data.Conduit
import qualified Data.HashTable.IO as H
import qualified Data.IntSet as NS
import Data.Maybe (isJust)
#ifndef IMPLICIT_MONOID
import Data.Monoid (mappend)
#endif
import Control.Lens
import qualified Network.HPACK as HP
import qualified Network.HTTP2 as NH2
import System.Log.Logger
import System.Clock (getTime, TimeSpec, Clock(..))
import SecondTransfer.MainLoop.CoherentWorker
import SecondTransfer.MainLoop.Tokens
import SecondTransfer.Sessions.Config
import SecondTransfer.Sessions.Internal (sessionExceptionHandler,
SessionsContext,
sessionsConfig)
import SecondTransfer.Utils (unfoldChannelAndSource)
import SecondTransfer.Exception
import qualified SecondTransfer.Utils.HTTPHeaders as He
import SecondTransfer.MainLoop.Logging (logWithExclusivity, logit)
type OutputFrame = (NH2.EncodeInfo, NH2.FramePayload, Effect)
type InputFrame = NH2.Frame
data HeadersSent = HeadersSent
type DataOutputToConveyor = (GlobalStreamId, Maybe B.ByteString, Effect)
data HeaderOutputMessage =
NormalResponse_HM (GlobalStreamId, MVar HeadersSent, Headers, Effect)
|PushPromise_HM (GlobalStreamId, GlobalStreamId, Headers, Effect)
data SessionSettings = SessionSettings {
_pushEnabled :: Bool
}
makeLenses ''SessionSettings
data WorkerThreadEnvironment = WorkerThreadEnvironment {
_streamId :: GlobalStreamId
, _headersOutput :: Chan HeaderOutputMessage
,_dataOutput :: MVar DataOutputToConveyor
,_streamsCancelled_WTE :: MVar NS.IntSet
,_sessionSettings_WTE :: MVar SessionSettings
,_nextPushStream_WTE :: MVar Int
}
makeLenses ''WorkerThreadEnvironment
type Session = (SessionInput, SessionOutput)
newtype SessionInput = SessionInput ( Chan SessionInputCommand )
sendMiddleFrameToSession :: SessionInput -> InputFrame -> IO ()
sendMiddleFrameToSession (SessionInput chan) frame = writeChan chan $ MiddleFrame_SIC frame
sendFirstFrameToSession :: SessionInput -> InputFrame -> IO ()
sendFirstFrameToSession (SessionInput chan) frame = writeChan chan $ FirstFrame_SIC frame
sendCommandToSession :: SessionInput -> SessionInputCommand -> IO ()
sendCommandToSession (SessionInput chan) command = writeChan chan command
type SessionOutputPacket = Either SessionOutputCommand OutputFrame
type SessionOutputChannelAbstraction = Chan SessionOutputPacket
newtype SessionOutput = SessionOutput SessionOutputChannelAbstraction
getFrameFromSession :: SessionOutput -> IO SessionOutputPacket
getFrameFromSession (SessionOutput chan) = readChan chan
type HashTable k v = H.CuckooHashTable k v
type Stream2HeaderBlockFragment = HashTable GlobalStreamId Bu.Builder
type WorkerMonad = ReaderT WorkerThreadEnvironment IO
data SessionInputCommand =
FirstFrame_SIC InputFrame
|MiddleFrame_SIC InputFrame
|InternalAbort_SIC
|CancelSession_SIC
deriving Show
data SessionOutputCommand =
CancelSession_SOC
|FinishStream_SOC GlobalStreamId
deriving Show
type SessionMaker = SessionsContext -> IO Session
type CoherentSession = AwareWorker -> SessionMaker
data PostInputMechanism = PostInputMechanism (MVar (Maybe B.ByteString), InputDataStream)
data SessionData = SessionData {
_sessionsContext :: SessionsContext
,_sessionInput :: Chan SessionInputCommand
,_sessionOutput :: MVar SessionOutputChannelAbstraction
,_toEncodeHeaders :: MVar HP.DynamicTable
,_toDecodeHeaders :: MVar HP.DynamicTable
,_receivingHeaders :: MVar (Maybe Int)
,_lastGoodStream :: MVar Int
,_stream2HeaderBlockFragment :: Stream2HeaderBlockFragment
,_forWorkerThread :: WorkerThreadEnvironment
,_awareWorker :: AwareWorker
,_streamsCancelled :: MVar NS.IntSet
,_stream2PostInputMechanism :: HashTable Int PostInputMechanism
,_stream2WorkerThread :: HashTable Int ThreadId
,_sessionIdAtSession :: Int
,_sessionSettings :: MVar SessionSettings
,_nextPushStream :: MVar Int
}
makeLenses ''SessionData
http2Session :: AwareWorker -> Int -> SessionsContext -> IO Session
http2Session aware_worker session_id sessions_context = do
session_input <- newChan
session_output <- newChan
session_output_mvar <- newMVar session_output
stream_request_headers <- H.new :: IO Stream2HeaderBlockFragment
decode_headers_table <- HP.newDynamicTableForDecoding 4096
decode_headers_table_mvar <- newMVar decode_headers_table
encode_headers_table <- HP.newDynamicTableForEncoding 4096
encode_headers_table_mvar <- newMVar encode_headers_table
headers_output <- newChan :: IO (Chan HeaderOutputMessage)
data_output <- newEmptyMVar :: IO (MVar DataOutputToConveyor)
stream2postinputmechanism <- H.new
stream2workerthread <- H.new
last_good_stream_mvar <- newMVar (1)
receiving_headers <- newMVar Nothing
session_settings <- newMVar $ SessionSettings { _pushEnabled = True }
next_push_stream <- newMVar 8
cancelled_streams_mvar <- newMVar $ NS.empty :: IO (MVar NS.IntSet)
let for_worker_thread = WorkerThreadEnvironment {
_streamId = error "NotInitialized"
,_headersOutput = headers_output
,_dataOutput = data_output
,_streamsCancelled_WTE = cancelled_streams_mvar
,_sessionSettings_WTE = session_settings
,_nextPushStream_WTE = next_push_stream
}
let session_data = SessionData {
_sessionsContext = sessions_context
,_sessionInput = session_input
,_sessionOutput = session_output_mvar
,_toDecodeHeaders = decode_headers_table_mvar
,_toEncodeHeaders = encode_headers_table_mvar
,_stream2HeaderBlockFragment = stream_request_headers
,_forWorkerThread = for_worker_thread
,_awareWorker = aware_worker
,_streamsCancelled = cancelled_streams_mvar
,_stream2PostInputMechanism = stream2postinputmechanism
,_stream2WorkerThread = stream2workerthread
,_sessionIdAtSession = session_id
,_receivingHeaders = receiving_headers
,_sessionSettings = session_settings
,_lastGoodStream = last_good_stream_mvar
,_nextPushStream = next_push_stream
}
let
exc_handler :: SessionComponent -> HTTP2SessionException -> IO ()
exc_handler component e = sessionExceptionHandler component session_id sessions_context e
exc_guard :: SessionComponent -> IO () -> IO ()
exc_guard component action = E.catch
action
(\e -> do
exc_handler component e
)
use_chunk_length = sessions_context ^. sessionsConfig . dataFrameSize
forkIO $ exc_guard SessionInputThread_HTTP2SessionComponent
$ runReaderT sessionInputThread session_data
forkIO $ exc_guard SessionHeadersOutputThread_HTTP2SessionComponent
$ runReaderT (headersOutputThread headers_output session_output_mvar) session_data
forkIO $ exc_guard SessionDataOutputThread_HTTP2SessionComponent
$ dataOutputThread use_chunk_length data_output session_output_mvar
return ( (SessionInput session_input),
(SessionOutput session_output) )
sessionInputThread :: ReaderT SessionData IO ()
sessionInputThread = do
INSTRUMENTATION( debugM "HTTP2.Session" "Entering sessionInputThread" )
session_input <- view sessionInput
decode_headers_table_mvar <- view toDecodeHeaders
stream_request_headers <- view stream2HeaderBlockFragment
cancelled_streams_mvar <- view streamsCancelled
coherent_worker <- view awareWorker
for_worker_thread_uns <- view forWorkerThread
stream2workerthread <- view stream2WorkerThread
receiving_headers_mvar <- view receivingHeaders
last_good_stream_mvar <- view lastGoodStream
current_session_id <- view sessionIdAtSession
input <- liftIO $ readChan session_input
case input of
FirstFrame_SIC (NH2.Frame
(NH2.FrameHeader _ 1 null_stream_id ) _ )| 0 == null_stream_id -> do
continue
FirstFrame_SIC
(NH2.Frame
(NH2.FrameHeader _ 0 null_stream_id )
(NH2.SettingsFrame settings_list)
) | 0 == null_stream_id -> do
handleSettingsFrame settings_list
continue
FirstFrame_SIC _ -> do
closeConnectionBecauseIsInvalid NH2.ProtocolError
return ()
CancelSession_SIC -> do
liftIO $ do
H.mapM_
(\ (_, thread_id) -> do
throwTo thread_id StreamCancelledException
return ()
)
stream2workerthread
return ()
InternalAbort_SIC -> do
closeConnectionBecauseIsInvalid NH2.InternalError
return ()
MiddleFrame_SIC frame | Just (stream_id, bytes) <- isAboutHeaders frame -> do
opens_stream <- appendHeaderFragmentBlock stream_id bytes
if opens_stream
then do
maybe_rcv_headers_of <- liftIO $ takeMVar receiving_headers_mvar
case maybe_rcv_headers_of of
Just _ -> do
closeConnectionBecauseIsInvalid NH2.ProtocolError
Nothing -> do
liftIO $ putMVar receiving_headers_mvar (Just stream_id)
last_good_stream <- liftIO $ takeMVar last_good_stream_mvar
if (odd stream_id ) && (stream_id > last_good_stream)
then do
liftIO $ putMVar last_good_stream_mvar (stream_id)
else do
closeConnectionBecauseIsInvalid NH2.ProtocolError
else do
maybe_rcv_headers_of <- liftIO $ takeMVar receiving_headers_mvar
case maybe_rcv_headers_of of
Just a_stream_id | a_stream_id == stream_id -> do
liftIO $ putMVar receiving_headers_mvar maybe_rcv_headers_of
Nothing -> error "InternalError, this should be set"
if frameEndsHeaders frame then
do
liftIO $ modifyMVar_
receiving_headers_mvar
(\ _ -> return Nothing )
headers_arrived_time <- liftIO $ getTime Monotonic
let for_worker_thread = set streamId stream_id for_worker_thread_uns
headers_bytes <- getHeaderBytes stream_id
dyn_table <- liftIO $ takeMVar decode_headers_table_mvar
(new_table, header_list ) <- liftIO $ HP.decodeHeader dyn_table headers_bytes
#if LOGIT_SWITCH_TIMINGS
let
(Just path) = He.fetchHeader header_list ":path"
liftIO $ logit $ (pack . show $ stream_id ) `mappend` " -> " `mappend` path
#endif
liftIO $ do
H.delete stream_request_headers stream_id
putMVar decode_headers_table_mvar new_table
let
headers_editor = He.fromList header_list
maybe_good_headers_editor <- validateIncomingHeaders headers_editor
good_headers <- case maybe_good_headers_editor of
Just yes_they_are_good -> return yes_they_are_good
Nothing -> closeConnectionBecauseIsInvalid NH2.ProtocolError
headers_extra_good <- addExtraHeaders good_headers
let
header_list_after = He.toList headers_extra_good
post_data_source <- if not (frameEndsStream frame)
then do
mechanism <- createMechanismForStream stream_id
let source = postDataSourceFromMechanism mechanism
return $ Just source
else do
return Nothing
let
perception = Perception {
_startedTime_Pr = headers_arrived_time,
_streamId_Pr = stream_id,
_sessionId_Pr = current_session_id
}
request = Request {
_headers_RQ = header_list_after,
_inputData_RQ = post_data_source,
_perception_RQ = perception
}
liftIO $ do
thread_id <- forkIO $ E.catch
(runReaderT
(workerThread
request
coherent_worker)
for_worker_thread
)
(
( \ _ -> do
writeChan session_input InternalAbort_SIC
)
:: HTTP500PrecursorException -> IO ()
)
H.insert stream2workerthread stream_id thread_id
return ()
else
return ()
continue
MiddleFrame_SIC frame@(NH2.Frame _ (NH2.RSTStreamFrame _error_code_id)) -> do
let stream_id = streamIdFromFrame frame
liftIO $ do
cancelled_streams <- takeMVar cancelled_streams_mvar
putMVar cancelled_streams_mvar $ NS.insert stream_id cancelled_streams
maybe_thread_id <- H.lookup stream2workerthread stream_id
case maybe_thread_id of
Nothing ->
error "InterruptingUnexistentStream"
Just thread_id -> do
throwTo thread_id StreamCancelledException
continue
MiddleFrame_SIC frame@(NH2.Frame (NH2.FrameHeader _ _ nh2_stream_id) (NH2.DataFrame somebytes))
-> unlessReceivingHeaders $ do
let stream_id = nh2_stream_id
streamWorkerSendData stream_id somebytes
sendOutFrame
(NH2.EncodeInfo
NH2.defaultFlags
nh2_stream_id
Nothing
)
(NH2.WindowUpdateFrame
(fromIntegral (B.length somebytes))
)
sendOutFrame
(NH2.EncodeInfo
NH2.defaultFlags
0
Nothing
)
(NH2.WindowUpdateFrame
(fromIntegral (B.length somebytes))
)
if frameEndsStream frame
then do
closePostDataSource stream_id
else
return ()
continue
MiddleFrame_SIC (NH2.Frame (NH2.FrameHeader _ flags _) (NH2.PingFrame _)) | NH2.testAck flags-> do
continue
MiddleFrame_SIC (NH2.Frame (NH2.FrameHeader _ _ _) (NH2.PingFrame somebytes)) -> do
sendOutFrame
(NH2.EncodeInfo
(NH2.setAck NH2.defaultFlags)
0
Nothing
)
(NH2.PingFrame somebytes)
continue
MiddleFrame_SIC (NH2.Frame frame_header (NH2.SettingsFrame _)) | isSettingsAck frame_header -> do
continue
MiddleFrame_SIC (NH2.Frame _ (NH2.SettingsFrame settings_list)) -> do
handleSettingsFrame settings_list
continue
MiddleFrame_SIC somethingelse -> unlessReceivingHeaders $ do
continue
where
continue = sessionInputThread
handleSettingsFrame :: NH2.SettingsList -> ReaderT SessionData IO ()
handleSettingsFrame _settings_list = do
let
enable_push = lookup NH2.SettingsEnablePush _settings_list
session_settings_mvar <- view sessionSettings
case enable_push of
Just 1 -> liftIO $ modifyMVar_ session_settings_mvar
$ \ old_settings -> return ( pushEnabled .~ True $ old_settings)
Just 0 -> liftIO $ modifyMVar_ session_settings_mvar
$ \ old_settings -> return ( pushEnabled .~ False $ old_settings)
Just _ -> closeConnectionBecauseIsInvalid NH2.ProtocolError
Nothing -> return ()
sendOutFrame
(NH2.EncodeInfo
(NH2.setAck NH2.defaultFlags)
0
Nothing
)
(NH2.SettingsFrame [])
sendOutFrame :: NH2.EncodeInfo -> NH2.FramePayload -> ReaderT SessionData IO ()
sendOutFrame encode_info payload = do
session_output_mvar <- view sessionOutput
session_output <- liftIO $ takeMVar session_output_mvar
liftIO $ writeChan session_output $ Right (
encode_info,
payload,
error "sendOutFrameNotFor")
liftIO $ putMVar session_output_mvar session_output
addExtraHeaders :: He.HeaderEditor -> ReaderT SessionData IO He.HeaderEditor
addExtraHeaders headers_editor = do
let
enriched_lens = (sessionsContext . sessionsConfig .sessionsEnrichedHeaders )
protocol_lens = He.headerLens "second-transfer-eh--used-protocol"
add_used_protocol <- view (enriched_lens . addUsedProtocol )
let
he1 = if add_used_protocol
then set protocol_lens (Just "HTTP/2") headers_editor
else headers_editor
if add_used_protocol
then return he1
else return headers_editor
validateIncomingHeaders :: He.HeaderEditor -> ReaderT SessionData IO (Maybe He.HeaderEditor)
validateIncomingHeaders headers_editor = do
let
h1 = He.replaceHostByAuthority headers_editor
headers_are_lowercase = He.headersAreLowercaseAtHeaderEditor headers_editor
maybe_authority = h1 ^. (He.headerLens ":authority")
maybe_method = h1 ^. (He.headerLens ":method")
maybe_scheme = h1 ^. (He.headerLens ":scheme")
maybe_path = h1 ^. (He.headerLens ":path")
if
(isJust maybe_authority) &&
(isJust maybe_method) &&
(isJust maybe_scheme) &&
(isJust maybe_path )
then
return (Just h1)
else
return Nothing
closeConnectionBecauseIsInvalid :: NH2.ErrorCodeId -> ReaderT SessionData IO a
closeConnectionBecauseIsInvalid error_code = do
last_good_stream_mvar <- view lastGoodStream
last_good_stream <- liftIO $ takeMVar last_good_stream_mvar
session_output_mvar <- view sessionOutput
stream2workerthread <- view stream2WorkerThread
sendOutFrame
(NH2.EncodeInfo
NH2.defaultFlags
0
Nothing
)
(NH2.GoAwayFrame
last_good_stream
error_code
""
)
liftIO $ do
H.mapM_
( \(_stream_id, thread_id) ->
throwTo thread_id StreamCancelledException
)
stream2workerthread
session_output <- takeMVar session_output_mvar
writeChan session_output $ Left CancelSession_SOC
putMVar session_output_mvar session_output
E.throw HTTP2ProtocolException
frameEndsStream :: InputFrame -> Bool
frameEndsStream (NH2.Frame (NH2.FrameHeader _ flags _) _) = NH2.testEndStream flags
unlessReceivingHeaders :: ReaderT SessionData IO a -> ReaderT SessionData IO a
unlessReceivingHeaders comp = do
receiving_headers_mvar <- view receivingHeaders
maybe_recv_headers <- liftIO $ readMVar receiving_headers_mvar
if isJust maybe_recv_headers
then
closeConnectionBecauseIsInvalid NH2.ProtocolError
else
comp
createMechanismForStream :: GlobalStreamId -> ReaderT SessionData IO PostInputMechanism
createMechanismForStream stream_id = do
(chan, source) <- liftIO $ unfoldChannelAndSource
stream2postinputmechanism <- view stream2PostInputMechanism
let pim = PostInputMechanism (chan, source)
liftIO $ H.insert stream2postinputmechanism stream_id pim
return pim
closePostDataSource :: GlobalStreamId -> ReaderT SessionData IO ()
closePostDataSource stream_id = do
stream2postinputmechanism <- view stream2PostInputMechanism
pim_maybe <- liftIO $ H.lookup stream2postinputmechanism stream_id
case pim_maybe of
Just (PostInputMechanism (chan, _)) ->
liftIO $ putMVar chan Nothing
Nothing ->
error "Internal error/closePostDataSource"
streamWorkerSendData :: Int -> B.ByteString -> ReaderT SessionData IO ()
streamWorkerSendData stream_id bytes = do
s2pim <- view stream2PostInputMechanism
pim_maybe <- liftIO $ H.lookup s2pim stream_id
case pim_maybe of
Just pim ->
sendBytesToPim pim bytes
Nothing ->
error "Internal error"
sendBytesToPim :: PostInputMechanism -> B.ByteString -> ReaderT SessionData IO ()
sendBytesToPim (PostInputMechanism (chan, _)) bytes =
liftIO $ putMVar chan (Just bytes)
postDataSourceFromMechanism :: PostInputMechanism -> InputDataStream
postDataSourceFromMechanism (PostInputMechanism (_, source)) = source
isSettingsAck :: NH2.FrameHeader -> Bool
isSettingsAck (NH2.FrameHeader _ flags _) =
NH2.testAck flags
isStreamCancelled :: GlobalStreamId -> WorkerMonad Bool
isStreamCancelled stream_id = do
cancelled_streams_mvar <- view streamsCancelled_WTE
cancelled_streams <- liftIO $ readMVar cancelled_streams_mvar
return $ NS.member stream_id cancelled_streams
sendPrimitive500Error :: IO TupledPrincipalStream
sendPrimitive500Error =
return (
[
(":status", "500")
],
[],
do
yield "Internal server error\n"
return []
)
workerThread :: Request -> AwareWorker -> WorkerMonad ()
workerThread req aware_worker =
do
headers_output <- view headersOutput
stream_id <- view streamId
session_settings_mvar <- view sessionSettings_WTE
session_settings <- liftIO $ readMVar session_settings_mvar
next_push_stream_mvar <- view nextPushStream_WTE
#if LOGIT_SWITCH_TIMINGS
liftIO . logit $ "worker-thread " `mappend` (pack . show $ stream_id)
#endif
principal_stream <-
liftIO $ E.catch
(
aware_worker req
)
(
const $ tupledPrincipalStreamToPrincipalStream <$> sendPrimitive500Error
:: HTTP500PrecursorException -> IO PrincipalStream
)
let
headers = principal_stream ^. headers_PS
data_and_conclusion = principal_stream ^. dataAndConclusion_PS
effects = principal_stream ^. effect_PS
pushed_streams = principal_stream ^. pushedStreams_PS
can_push = session_settings ^. pushEnabled
data_promises <- if can_push then do
forM pushed_streams $ \ pushed_stream_comp -> do
pushed_stream <- liftIO pushed_stream_comp
let
request_headers = pushed_stream ^. requestHeaders_Psh
response_headers = pushed_stream ^. responseHeaders_Psh
pushed_data_and_conclusion = pushed_stream ^. dataAndConclusion_Psh
child_stream_id <- liftIO $ modifyMVar next_push_stream_mvar
$ (\ x -> return (x+2,x) )
liftIO . writeChan headers_output . PushPromise_HM $
(stream_id, child_stream_id, request_headers, effects)
return (child_stream_id, response_headers, pushed_data_and_conclusion, effects)
else
return []
headers_sent <- liftIO newEmptyMVar
liftIO $ writeChan headers_output $ NormalResponse_HM (stream_id, headers_sent, headers, effects)
is_stream_cancelled <- isStreamCancelled stream_id
unless is_stream_cancelled $ do
(_maybe_footers, _) <- runConduit $
transPipe liftIO data_and_conclusion
`fuseBothMaybe`
sendDataOfStream stream_id headers_sent effects
forM_ data_promises
$ \ (child_stream_id, response_headers, pushed_data_and_conclusion, effects) -> do
environment <- ask
let
action = pusherThread
child_stream_id
response_headers
pushed_data_and_conclusion
effects
liftIO . forkIO $ runReaderT action environment
return ()
pusherThread :: GlobalStreamId -> Headers -> DataAndConclusion -> Effect -> WorkerMonad ()
pusherThread child_stream_id response_headers pushed_data_and_conclusion effects =
do
headers_output <- view headersOutput
session_settings_mvar <- view sessionSettings_WTE
session_settings <- liftIO $ readMVar session_settings_mvar
headers_sent <- liftIO newEmptyMVar
liftIO . writeChan headers_output
$ NormalResponse_HM (child_stream_id, headers_sent, response_headers, effects)
is_stream_cancelled <- isStreamCancelled child_stream_id
unless is_stream_cancelled $ do
(_maybe_footers, _) <- runConduit $
transPipe liftIO pushed_data_and_conclusion
`fuseBothMaybe`
sendDataOfStream child_stream_id headers_sent effects
return ()
sendDataOfStream :: GlobalStreamId -> MVar HeadersSent -> Effect -> Sink B.ByteString (ReaderT WorkerThreadEnvironment IO) ()
sendDataOfStream stream_id headers_sent effect = do
data_output <- view dataOutput
liftIO $ takeMVar headers_sent
consumer data_output
where
consumer data_output = do
maybe_bytes <- await
case maybe_bytes of
Nothing ->
liftIO $ putMVar data_output (stream_id, Nothing, effect)
Just bytes -> do
liftIO $ do
putMVar data_output (stream_id, Just bytes, effect)
consumer data_output
appendHeaderFragmentBlock :: GlobalStreamId -> B.ByteString -> ReaderT SessionData IO Bool
appendHeaderFragmentBlock global_stream_id bytes = do
ht <- view stream2HeaderBlockFragment
maybe_old_block <- liftIO $ H.lookup ht global_stream_id
(new_block, new_stream) <- case maybe_old_block of
Nothing -> do
return $ (Bu.byteString bytes, True)
Just something ->
return $ (something `mappend` (Bu.byteString bytes), False)
liftIO $ H.insert ht global_stream_id new_block
return new_stream
getHeaderBytes :: GlobalStreamId -> ReaderT SessionData IO B.ByteString
getHeaderBytes global_stream_id = do
ht <- view stream2HeaderBlockFragment
Just bytes <- liftIO $ H.lookup ht global_stream_id
return $ Bl.toStrict $ Bu.toLazyByteString bytes
isAboutHeaders :: InputFrame -> Maybe (GlobalStreamId, B.ByteString)
isAboutHeaders (NH2.Frame (NH2.FrameHeader _ _ stream_id) ( NH2.HeadersFrame _ block_fragment ) )
= Just (stream_id, block_fragment)
isAboutHeaders (NH2.Frame (NH2.FrameHeader _ _ stream_id) ( NH2.ContinuationFrame block_fragment) )
= Just (stream_id, block_fragment)
isAboutHeaders _
= Nothing
getHeadersPriority :: InputFrame -> Maybe NH2.Priority
getHeadersPriority (NH2.Frame _ ( NH2.HeadersFrame prio _ ) ) = prio
getHeadersPriority _ = Nothing
frameEndsHeaders :: InputFrame -> Bool
frameEndsHeaders (NH2.Frame (NH2.FrameHeader _ flags _) _) = NH2.testEndHeader flags
streamIdFromFrame :: InputFrame -> GlobalStreamId
streamIdFromFrame (NH2.Frame (NH2.FrameHeader _ _ stream_id) _) = stream_id
headersOutputThread :: Chan HeaderOutputMessage
-> MVar SessionOutputChannelAbstraction
-> ReaderT SessionData IO ()
headersOutputThread input_chan session_output_mvar = forever $ do
use_chunk_length <- view $ sessionsContext . sessionsConfig . dataFrameSize
header_output_request <- liftIO $ readChan input_chan
case header_output_request of
NormalResponse_HM (stream_id, headers_ready_mvar, headers, effect) -> do
encode_dyn_table_mvar <- view toEncodeHeaders
encode_dyn_table <- liftIO $ takeMVar encode_dyn_table_mvar
(new_dyn_table, data_to_send ) <- liftIO $ HP.encodeHeader HP.defaultEncodeStrategy encode_dyn_table headers
liftIO $ putMVar encode_dyn_table_mvar new_dyn_table
let
bs_chunks = bytestringChunk use_chunk_length data_to_send
liftIO $ bs_chunks `deepseq` E.bracket
(takeMVar session_output_mvar)
(putMVar session_output_mvar )
(\ session_output -> do
writeIndividualHeaderFrames session_output stream_id bs_chunks True effect
putMVar headers_ready_mvar HeadersSent
)
PushPromise_HM (parent_stream_id, child_stream_id, promise_headers, effect) -> do
encode_dyn_table_mvar <- view toEncodeHeaders
encode_dyn_table <- liftIO $ takeMVar encode_dyn_table_mvar
(new_dyn_table, data_to_send ) <- liftIO $ HP.encodeHeader HP.defaultEncodeStrategy encode_dyn_table promise_headers
liftIO $ putMVar encode_dyn_table_mvar new_dyn_table
bs_chunks <- return $! bytestringChunk use_chunk_length data_to_send
liftIO $ E.bracket
(takeMVar session_output_mvar)
(putMVar session_output_mvar )
(\ session_output -> do
writePushPromiseFrames session_output parent_stream_id child_stream_id bs_chunks True effect
)
where
writeIndividualHeaderFrames ::
SessionOutputChannelAbstraction
-> GlobalStreamId
-> [B.ByteString]
-> Bool
-> Effect
-> IO ()
writeIndividualHeaderFrames session_output stream_id (last_fragment:[]) is_first effect =
writeChan session_output $ Right ( NH2.EncodeInfo {
NH2.encodeFlags = NH2.setEndHeader NH2.defaultFlags
,NH2.encodeStreamId = stream_id
,NH2.encodePadding = Nothing },
(if is_first then NH2.HeadersFrame Nothing last_fragment else NH2.ContinuationFrame last_fragment),
effect
)
writeIndividualHeaderFrames session_output stream_id (fragment:xs) is_first effect = do
writeChan session_output $ Right ( NH2.EncodeInfo {
NH2.encodeFlags = NH2.defaultFlags
,NH2.encodeStreamId = stream_id
,NH2.encodePadding = Nothing },
(if is_first then NH2.HeadersFrame Nothing fragment else NH2.ContinuationFrame fragment),
effect
)
writeIndividualHeaderFrames session_output stream_id xs False effect
writePushPromiseFrames ::
SessionOutputChannelAbstraction
-> GlobalStreamId
-> GlobalStreamId
-> [B.ByteString]
-> Bool
-> Effect
-> IO ()
writePushPromiseFrames session_output parent_stream_id child_stream_id (last_fragment:[]) is_first effect =
writeChan session_output $ Right ( NH2.EncodeInfo {
NH2.encodeFlags = NH2.setEndHeader NH2.defaultFlags
,NH2.encodeStreamId = parent_stream_id
,NH2.encodePadding = Nothing },
(if is_first
then NH2.PushPromiseFrame child_stream_id last_fragment
else NH2.ContinuationFrame last_fragment
),
effect
)
writePushPromiseFrames session_output parent_stream_id child_stream_id (fragment:xs) is_first effect = do
writeChan session_output $ Right ( NH2.EncodeInfo {
NH2.encodeFlags = NH2.defaultFlags
,NH2.encodeStreamId = parent_stream_id
,NH2.encodePadding = Nothing },
(if is_first
then NH2.PushPromiseFrame child_stream_id fragment
else NH2.ContinuationFrame fragment
),
effect
)
writePushPromiseFrames session_output parent_stream_id child_stream_id xs False effect
bytestringChunk :: Int -> B.ByteString -> [B.ByteString]
bytestringChunk len s | (B.length s) < len = [ s ]
bytestringChunk len s = h:(bytestringChunk len xs)
where
(h, xs) = B.splitAt len s
dataOutputThread :: Int
-> MVar DataOutputToConveyor
-> MVar SessionOutputChannelAbstraction
-> IO ()
dataOutputThread use_chunk_length input_chan session_output_mvar = forever $ do
(stream_id, maybe_contents, effect) <- takeMVar input_chan
case maybe_contents of
Nothing -> do
liftIO $ do
withLockedSessionOutput
(\ session_output -> do
writeChan session_output $ Right ( NH2.EncodeInfo {
NH2.encodeFlags = NH2.setEndStream NH2.defaultFlags
,NH2.encodeStreamId = stream_id
,NH2.encodePadding = Nothing },
NH2.DataFrame "",
effect
)
writeChan session_output $ Left . FinishStream_SOC $ stream_id
)
Just contents -> do
let bs_chunks = bytestringChunk use_chunk_length contents
bs_chunks `deepseq` writeContinuations bs_chunks stream_id effect
return ()
where
withLockedSessionOutput = E.bracket
(takeMVar session_output_mvar)
(putMVar session_output_mvar)
writeContinuations :: [B.ByteString] -> GlobalStreamId -> Effect -> IO ()
writeContinuations fragments stream_id effect =
mapM_ (\ fragment ->
withLockedSessionOutput
(\ session_output -> writeChan session_output $ Right (
NH2.EncodeInfo {
NH2.encodeFlags = NH2.defaultFlags
,NH2.encodeStreamId = stream_id
,NH2.encodePadding = Nothing
},
NH2.DataFrame fragment,
effect )
)
)
fragments