# 1 "hs-src/SecondTransfer/Http2/Framer.cpphs"
# 1 "<command-line>"
# 12 "<command-line>"
# 1 "/usr/include/stdc-predef.h" 1 3 4
# 17 "/usr/include/stdc-predef.h" 3 4
# 12 "<command-line>" 2
# 1 "./dist/build/autogen/cabal_macros.h" 1
# 12 "<command-line>" 2
# 1 "hs-src/SecondTransfer/Http2/Framer.cpphs"
module SecondTransfer.Http2.Framer (
BadPrefaceException,
wrapSession,
http2FrameLength,
closeAction
) where
import Control.Concurrent
import Control.Exception
import qualified Control.Exception as E
import Control.Lens (view)
import qualified Control.Lens as L
import Control.Monad.IO.Class (liftIO)
import qualified Control.Monad.Catch as C
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Reader
import Data.Binary (decode)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
import Data.Conduit
import Data.Foldable (find)
import qualified Network.HTTP2 as NH2
import System.Log.Logger
import qualified Data.HashTable.IO as H
import SecondTransfer.Http2.Session
import SecondTransfer.MainLoop.CoherentWorker (CoherentWorker)
import qualified SecondTransfer.MainLoop.Framer as F
import SecondTransfer.MainLoop.PushPullType (Attendant, CloseAction,
PullAction, PushAction, IOProblem)
import SecondTransfer.Utils (Word24, word24ToInt)
import SecondTransfer.Exception
# 1 "macros/Logging.cpphs" 1
# 49 "hs-src/SecondTransfer/Http2/Framer.cpphs" 2
http2PrefixLength :: Int
http2PrefixLength = B.length NH2.connectionPreface
type HashTable k v = H.CuckooHashTable k v
type GlobalStreamId = Int
data FlowControlCommand =
AddBytes_FCM Int
type Stream2AvailSpace = HashTable GlobalStreamId (Chan FlowControlCommand)
data CanOutput = CanOutput
data NoHeadersInChannel = NoHeadersInChannel
data FramerSessionData = FramerSessionData {
_stream2flow :: Stream2AvailSpace
, _stream2outputBytes :: HashTable GlobalStreamId (Chan LB.ByteString)
, _defaultStreamWindow :: MVar Int
, _canOutput :: MVar CanOutput
, _outputIsForbidden :: MVar Bool
, _noHeadersInChannel :: MVar NoHeadersInChannel
, _pushAction :: PushAction
, _closeAction :: CloseAction
, _sessionId :: Int
, _sessionsContext :: SessionsContext
, _lastStream :: MVar Int
}
L.makeLenses ''FramerSessionData
type FramerSession = ReaderT FramerSessionData IO
wrapSession :: CoherentWorker -> SessionsContext -> Attendant
wrapSession coherent_worker sessions_context push_action pull_action close_action = do
let
session_id_mvar = view nextSessionId sessions_context
new_session_id <- modifyMVarMasked
session_id_mvar
(\ session_id -> return (session_id+1, session_id))
(session_input, session_output) <- (http2Session
coherent_worker new_session_id sessions_context)
s2f <- H.new
s2o <- H.new
default_stream_size_mvar <- newMVar 65536
can_output <- newMVar CanOutput
no_headers_in_channel <- newMVar NoHeadersInChannel
last_stream_id <- newMVar 0
output_is_forbidden <- newMVar False
let framer_session_data = FramerSessionData {
_stream2flow = s2f
,_stream2outputBytes = s2o
,_defaultStreamWindow = default_stream_size_mvar
,_canOutput = can_output
,_noHeadersInChannel = no_headers_in_channel
,_pushAction = push_action
,_closeAction = close_action
,_sessionId = new_session_id
,_sessionsContext = sessions_context
,_lastStream = last_stream_id
,_outputIsForbidden = output_is_forbidden
}
let
close_on_error session_id session_context comp =
E.finally (
E.catch
comp
(exc_handler session_id session_context)
)
close_action
exc_handler :: Int -> SessionsContext -> FramerException -> IO ()
exc_handler x y e = do
modifyMVar_ output_is_forbidden (\ _ -> return True)
sessionExceptionHandler Framer_SessionComponent x y e
forkIO
$ close_on_error new_session_id sessions_context
$ runReaderT (inputGatherer pull_action session_input ) framer_session_data
forkIO
$ close_on_error new_session_id sessions_context
$ runReaderT (outputGatherer session_output ) framer_session_data
return ()
http2FrameLength :: F.LengthCallback
http2FrameLength bs | (B.length bs) >= 3 = let
word24 = decode input_as_lbs :: Word24
input_as_lbs = LB.fromStrict bs
in
Just $ (word24ToInt word24) + 9
http2FrameLength _ = Nothing
addCapacity ::
GlobalStreamId ->
Int ->
FramerSession ()
addCapacity stream_id delta_cap = do
if stream_id == 0
then
return ()
else do
table <- view stream2flow
val <- liftIO $ H.lookup table stream_id
case val of
Nothing -> do
liftIO $ putStrLn $ "Tried to update window of unexistent stream (creating): " ++ (show stream_id)
(_, command_chan) <- startStreamOutputQueue stream_id
liftIO $ writeChan command_chan $ AddBytes_FCM delta_cap
Just command_chan -> do
liftIO $ writeChan command_chan $ AddBytes_FCM delta_cap
inputGatherer :: PullAction -> SessionInput -> FramerSession ()
inputGatherer pull_action session_input = do
(return ())
(prefix, remaining) <- liftIO $ F.readLength http2PrefixLength pull_action
if prefix /= NH2.connectionPreface
then do
sendGoAwayFrame NH2.ProtocolError
liftIO $ do
(return ())
throwIO BadPrefaceException
else
(return ())
let
source::Source FramerSession B.ByteString
source = transPipe liftIO $ F.readNextChunk http2FrameLength remaining pull_action
( source $$ consume)
where
consume :: Sink B.ByteString FramerSession ()
consume = do
maybe_bytes <- await
case maybe_bytes of
Just bytes -> do
let
error_or_frame = NH2.decodeFrame some_settings bytes
some_settings = NH2.defaultSettings
case error_or_frame of
Left _ -> do
liftIO $ errorM "HTTP2.Framer" "CouldNotDecodeFrame"
lift $ sendGoAwayFrame NH2.ProtocolError
liftIO $ sendCommandToSession session_input CancelSession_SIC
lift $ releaseFramer
Right right_frame -> do
case right_frame of
(NH2.Frame (NH2.FrameHeader _ _ stream_id) (NH2.WindowUpdateFrame credit) ) -> do
lift $ addCapacity (NH2.fromStreamIdentifier stream_id) (fromIntegral credit)
return ()
frame@(NH2.Frame _ (NH2.SettingsFrame settings_list) ) -> do
case find (\(i,_) -> i == NH2.SettingsInitialWindowSize) settings_list of
Just (_, new_default_stream_size) -> do
old_default_stream_size_mvar <- view defaultStreamWindow
old_default_stream_size <- liftIO $ takeMVar old_default_stream_size_mvar
let general_delta = new_default_stream_size old_default_stream_size
stream_to_flow <- view stream2flow
liftIO $
H.mapM_ (
\ (k,v) -> if k /=0
then writeChan v (AddBytes_FCM general_delta)
else return () )
stream_to_flow
liftIO $ putMVar old_default_stream_size_mvar new_default_stream_size
Nothing ->
return ()
liftIO $ sendFrameToSession session_input frame
a_frame@(NH2.Frame (NH2.FrameHeader _ _ stream_id) _ ) -> do
lift $ updateLastStream $ NH2.fromStreamIdentifier stream_id
liftIO $ sendFrameToSession session_input a_frame
consume
Nothing ->
return ()
outputGatherer :: SessionOutput -> FramerSession ()
outputGatherer session_output = do
pushFrame
(NH2.EncodeInfo NH2.defaultFlags (NH2.toStreamIdentifier 0) Nothing)
(NH2.SettingsFrame [])
loopPart
where
dataForFrame p1 p2 =
LB.fromStrict $ NH2.encodeFrame p1 p2
loopPart :: FramerSession ()
loopPart = do
command_or_frame <- liftIO $ getFrameFromSession session_output
case command_or_frame of
Left CancelSession_SOC -> do
releaseFramer
Right ( p1@(NH2.EncodeInfo _ stream_idii _), p2@(NH2.DataFrame _) ) -> do
let stream_id = NH2.fromStreamIdentifier stream_idii
s2o <- view stream2outputBytes
lookup_result <- liftIO $ H.lookup s2o stream_id
stream_bytes_chan <- case lookup_result of
Nothing -> do
(bc, _) <- startStreamOutputQueue stream_id
return bc
Just bytes_chan -> return bytes_chan
liftIO $ writeChan stream_bytes_chan $ dataForFrame p1 p2
loopPart
Right (p1, p2@(NH2.HeadersFrame _ _) ) -> do
handleHeadersOfStream p1 p2
loopPart
Right (p1, p2@(NH2.ContinuationFrame _) ) -> do
handleHeadersOfStream p1 p2
loopPart
Right (p1, p2) -> do
no_headers <- view noHeadersInChannel
liftIO $ takeMVar no_headers
pushFrame p1 p2
liftIO $ putMVar no_headers NoHeadersInChannel
loopPart
updateLastStream :: GlobalStreamId -> FramerSession ()
updateLastStream stream_id = do
last_stream_id_mvar <- view lastStream
liftIO $ modifyMVar_ last_stream_id_mvar (\ x -> return $ max x stream_id)
startStreamOutputQueue :: Int -> FramerSession (Chan LB.ByteString, Chan FlowControlCommand)
startStreamOutputQueue stream_id = do
bytes_chan <- liftIO newChan
command_chan <- liftIO newChan
s2o <- view stream2outputBytes
liftIO $ H.insert s2o stream_id bytes_chan
s2c <- view stream2flow
liftIO $ H.insert s2c stream_id command_chan
initial_cap_mvar <- view defaultStreamWindow
initial_cap <- liftIO $ readMVar initial_cap_mvar
close_action <- view closeAction
sessions_context <- view sessionsContext
session_id' <- view SecondTransfer.Http2.Framer.sessionId
output_is_forbidden_mvar <- view outputIsForbidden
let
close_on_error session_id session_context comp =
E.finally
(E.catch
comp
(exc_handler session_id session_context)
)
close_action
exc_handler :: Int -> SessionsContext -> IOProblem -> IO ()
exc_handler x y e = do
modifyMVar_ output_is_forbidden_mvar ( \ _ -> return True)
sessionExceptionHandler Framer_SessionComponent x y e
read_state <- ask
liftIO $ forkIO $ close_on_error session_id' sessions_context $ runReaderT
(flowControlOutput stream_id initial_cap "" command_chan bytes_chan)
read_state
return (bytes_chan , command_chan)
handleHeadersOfStream :: NH2.EncodeInfo -> NH2.FramePayload -> FramerSession ()
handleHeadersOfStream p1@(NH2.EncodeInfo _ _ _) frame_payload
| (frameIsHeaderOfStream frame_payload) && (not $ frameEndsHeaders p1 frame_payload) = do
no_headers <- view noHeadersInChannel
liftIO $ takeMVar no_headers
pushFrame p1 frame_payload
| (frameIsHeaderOfStream frame_payload) && (frameEndsHeaders p1 frame_payload) = do
no_headers <- view noHeadersInChannel
liftIO $ takeMVar no_headers
pushFrame p1 frame_payload
liftIO $ putMVar no_headers NoHeadersInChannel
| frameEndsHeaders p1 frame_payload = do
no_headers <- view noHeadersInChannel
liftIO $ putMVar no_headers NoHeadersInChannel
return ()
frameIsHeaderOfStream :: NH2.FramePayload -> Bool
frameIsHeaderOfStream (NH2.HeadersFrame _ _ )
= True
frameIsHeaderOfStream _
= False
frameEndsHeaders :: NH2.EncodeInfo -> NH2.FramePayload -> Bool
frameEndsHeaders (NH2.EncodeInfo flags _ _) (NH2.HeadersFrame _ _) = NH2.testEndHeader flags
frameEndsHeaders (NH2.EncodeInfo flags _ _) (NH2.ContinuationFrame _) = NH2.testEndHeader flags
frameEndsHeaders _ _ = False
pushFrame :: NH2.EncodeInfo
-> NH2.FramePayload -> FramerSession ()
pushFrame p1 p2 = do
let bs = LB.fromStrict $ NH2.encodeFrame p1 p2
sendBytes bs
sendGoAwayFrame :: NH2.ErrorCodeId -> FramerSession ()
sendGoAwayFrame error_code = do
last_stream_id_mvar <- view lastStream
last_stream_id <- liftIO $ readMVar last_stream_id_mvar
pushFrame (NH2.EncodeInfo NH2.defaultFlags (NH2.toStreamIdentifier 0) Nothing)
(NH2.GoAwayFrame (NH2.toStreamIdentifier last_stream_id) error_code "")
sendBytes :: LB.ByteString -> FramerSession ()
sendBytes bs = do
push_action <- view pushAction
can_output <- view canOutput
liftIO $ do
bs `seq`
(C.bracket
(takeMVar can_output)
(\ _ -> push_action bs)
(\ c -> putMVar can_output c)
)
flowControlOutput :: Int -> Int -> LB.ByteString -> (Chan FlowControlCommand) -> (Chan LB.ByteString) -> FramerSession ()
flowControlOutput stream_id capacity leftovers commands_chan bytes_chan = do
if leftovers == ""
then do
bytes_to_send <- liftIO $ readChan bytes_chan
flowControlOutput stream_id capacity bytes_to_send commands_chan bytes_chan
else do
let amount = fromIntegral $ ((LB.length leftovers) 9)
if amount <= capacity
then do
no_headers <- view noHeadersInChannel
C.bracket
(liftIO $ takeMVar no_headers)
(\ _ -> liftIO $ putMVar no_headers NoHeadersInChannel)
(\ _ -> sendBytes leftovers )
flowControlOutput stream_id (capacity amount) "" commands_chan bytes_chan
else do
command <- liftIO $ readChan commands_chan
case command of
AddBytes_FCM delta_cap -> do
flowControlOutput stream_id (capacity + delta_cap) leftovers commands_chan bytes_chan
releaseFramer :: FramerSession ()
releaseFramer = do
return ()