module SecondTransfer.Http2.Framer (
BadPrefaceException,
wrapSession,
http2FrameLength,
closeAction
) where
import Control.Concurrent hiding (yield)
import qualified Control.Concurrent as CC(yield)
import qualified Control.Concurrent.MSem as MS
import Control.Concurrent.STM.TMVar
import qualified Control.Concurrent.STM as STM
import Control.Exception
import qualified Control.Exception as E
import Control.Lens (view, (^.) )
import qualified Control.Lens as L
import Control.Monad (unless, when, replicateM_)
import Control.Monad.IO.Class (liftIO)
import qualified Control.Monad.Catch as C
import Control.Monad.Trans.Class (lift)
import Control.DeepSeq (($!!))
import Control.Monad.Trans.Reader
import Data.Binary (decode)
import qualified Data.ByteString as B
import Data.ByteString.Char8 (pack)
import qualified Data.ByteString.Lazy as LB
import qualified Data.ByteString.Builder as Bu
import Data.Conduit
import Data.Foldable (find)
import qualified Data.PQueue.Min as PQ
import Data.Maybe (fromMaybe)
import qualified Network.HTTP2 as NH2
import System.Log.Logger
import qualified Data.HashTable.IO as H
import System.Clock (Clock(..),getTime)
import SecondTransfer.Sessions.Internal (
sessionExceptionHandler,
nextSessionId,
sessionsConfig,
SessionsContext)
import SecondTransfer.Sessions.Config
import SecondTransfer.Http2.Session
import SecondTransfer.MainLoop.CoherentWorker (AwareWorker, fragmentDeliveryCallback_Ef, priorityEffect_Ef)
import qualified SecondTransfer.MainLoop.Framer as F
import SecondTransfer.MainLoop.PushPullType
import SecondTransfer.Utils (Word24, word24ToInt)
import SecondTransfer.Exception
import SecondTransfer.MainLoop.Logging (logWithExclusivity, logit)
import Debug.Trace (trace)
#include "Logging.cpphs"
http2PrefixLength :: Int
http2PrefixLength = B.length NH2.connectionPreface
type HashTable k v = H.CuckooHashTable k v
type GlobalStreamId = Int
data FlowControlCommand =
AddBytes_FCM Int
|Finish_FCM
type Stream2AvailSpace = HashTable GlobalStreamId (MVar FlowControlCommand)
data CanOutput = CanOutput
data NoHeadersInChannel = NoHeadersInChannel
maxPacketsInQueue :: Int
maxPacketsInQueue = 32
newtype PrioPacket = PrioPacket ( (Int , Int, Int ), LB.ByteString)
deriving Show
instance Eq PrioPacket where
(==) (PrioPacket (a,_)) (PrioPacket (b,_)) = a == b
instance Ord PrioPacket where
compare (PrioPacket (a,_)) (PrioPacket (b,_)) = compare a b
data PrioritySendState = PrioritySendState {
_semToSend :: MS.MSem Int
,_prioQ :: TMVar (PQ.MinQueue PrioPacket)
}
data FramerSessionData = FramerSessionData {
_stream2flow :: MVar Stream2AvailSpace
, _stream2outputBytes :: MVar ( HashTable GlobalStreamId (MVar LB.ByteString, MVar Int) )
, _defaultStreamWindow :: MVar Int
, _canOutput :: MVar CanOutput
, _outputIsForbidden :: MVar Bool
, _noHeadersInChannel :: MVar NoHeadersInChannel
, _pushAction :: PushAction
, _closeAction :: CloseAction
, _sessionIdAtFramer :: Int
, _sessionsContext :: SessionsContext
, _lastStream :: MVar Int
, _prioritySendState :: PrioritySendState
}
L.makeLenses ''FramerSessionData
type FramerSession = ReaderT FramerSessionData IO
wrapSession :: AwareWorker -> SessionsContext -> Attendant
wrapSession aware_worker sessions_context attendant_callbacks = do
let
session_id_mvar = view nextSessionId sessions_context
push_action = attendant_callbacks ^. pushAction_AtC
pull_action = attendant_callbacks ^. pullAction_AtC
close_action = attendant_callbacks ^. closeAction_AtC
best_effort_pull_action = attendant_callbacks ^. bestEffortPullAction_AtC
new_session_id <- modifyMVarMasked
session_id_mvar $
\ session_id -> return (session_id+1, session_id)
(session_input, session_output) <- http2Session
aware_worker
new_session_id
sessions_context
s2f <- H.new
stream2flow_mvar <- newMVar s2f
s2o <- H.new
stream2output_bytes_mvar <- newMVar s2o
default_stream_size_mvar <- newMVar 65536
can_output <- newMVar CanOutput
no_headers_in_channel <- newMVar NoHeadersInChannel
last_stream_id <- newMVar 0
output_is_forbidden <- newMVar False
prio_mvar <- STM.atomically $ newTMVar PQ.empty
sem_to_send <- MS.new maxPacketsInQueue
let framer_session_data = FramerSessionData {
_stream2flow = stream2flow_mvar
,_stream2outputBytes = stream2output_bytes_mvar
,_defaultStreamWindow = default_stream_size_mvar
,_canOutput = can_output
,_noHeadersInChannel = no_headers_in_channel
,_pushAction = push_action
,_closeAction = close_action
,_sessionIdAtFramer = new_session_id
,_sessionsContext = sessions_context
,_lastStream = last_stream_id
,_outputIsForbidden = output_is_forbidden
,_prioritySendState = PrioritySendState {
_semToSend = sem_to_send,
_prioQ = prio_mvar
}
}
let
close_on_error session_id session_context comp =
E.finally (
E.catch
(
E.catch
comp
(exc_handler session_id session_context)
)
(io_exc_handler session_id session_context)
)
close_action
exc_handler :: Int -> SessionsContext -> FramerException -> IO ()
exc_handler x y e = do
modifyMVar_ output_is_forbidden (\ _ -> return True)
INSTRUMENTATION( errorM "HTTP2.Framer" "Exception went up" )
sessionExceptionHandler Framer_HTTP2SessionComponent x y e
io_exc_handler :: Int -> SessionsContext -> IOProblem -> IO ()
io_exc_handler _x _y _e =
modifyMVar_ output_is_forbidden (\ _ -> return True)
forkIO
$ close_on_error new_session_id sessions_context
$ runReaderT (inputGatherer pull_action session_input ) framer_session_data
forkIO
$ close_on_error new_session_id sessions_context
$ runReaderT (outputGatherer session_output ) framer_session_data
forkIO
$ close_on_error new_session_id sessions_context
$ runReaderT sendReordering framer_session_data
return ()
http2FrameLength :: F.LengthCallback
http2FrameLength bs
| B.length bs >= 3 = let
word24 = decode input_as_lbs :: Word24
input_as_lbs = LB.fromStrict bs
in
Just $ word24ToInt word24 + 9
| otherwise = Nothing
addCapacity ::
GlobalStreamId ->
Int ->
FramerSession Bool
addCapacity 0 delta_cap =
return True
addCapacity stream_id delta_cap =
do
table_mvar <- view stream2flow
val <- liftIO $ withMVar table_mvar $ \ table ->
H.lookup table stream_id
last_stream_mvar <- view lastStream
last_stream <- liftIO . readMVar $ last_stream_mvar
case val of
Nothing | stream_id > last_stream ->
return False
| otherwise ->
return True
Just command_chan -> do
liftIO $ putMVar command_chan $ AddBytes_FCM delta_cap
return True
finishFlowControlForStream :: GlobalStreamId -> FramerSession ()
finishFlowControlForStream stream_id =
do
table_mvar <- view stream2flow
liftIO . withMVar table_mvar $ \ table -> do
val <- H.lookup table stream_id
case val of
Nothing -> return ()
Just command_chan -> do
liftIO $
H.delete table stream_id
return ()
table2_mvar <- view stream2outputBytes
liftIO . withMVar table2_mvar $ \ table -> do
val <- H.lookup table stream_id
case val of
Nothing -> return ()
Just _ ->
liftIO $ H.delete table stream_id
readNextFrame :: Monad m =>
(Int -> m B.ByteString)
-> Source m (Maybe NH2.Frame)
readNextFrame pull_action = do
frame_header_bs <- lift $ pull_action 9
let
(frame_type_id, frame_header) = NH2.decodeFrameHeader frame_header_bs
NH2.FrameHeader payload_length _ _ = frame_header
payload_bs <- lift $ pull_action payload_length
let
either_frame = NH2.decodeFramePayload frame_type_id frame_header payload_bs
case either_frame of
Right frame_payload -> do
yield . Just $ NH2.Frame frame_header frame_payload
readNextFrame pull_action
Left _ ->
yield Nothing
inputGatherer :: PullAction -> SessionInput -> FramerSession ()
inputGatherer pull_action session_input = do
prefix <- liftIO $ pull_action http2PrefixLength
if prefix /= NH2.connectionPreface
then do
sendGoAwayFrame NH2.ProtocolError
liftIO $
throwIO BadPrefaceException
else
INSTRUMENTATION( debugM "HTTP2.Framer" "Prologue validated" )
let
source::Source FramerSession (Maybe NH2.Frame)
source = transPipe liftIO $ readNextFrame pull_action
source $$ consume True
where
sendToSession :: Bool -> InputFrame -> IO ()
sendToSession starting frame =
if starting
then
sendFirstFrameToSession session_input frame
else
sendMiddleFrameToSession session_input frame
abortSession :: Sink a FramerSession ()
abortSession =
lift $ do
sendGoAwayFrame NH2.ProtocolError
liftIO $ sendCommandToSession session_input CancelSession_SIC
releaseFramer
consume_continue = consume False
consume :: Bool -> Sink (Maybe NH2.Frame) FramerSession ()
consume starting = do
maybe_maybe_frame <- await
case maybe_maybe_frame of
Just Nothing ->
abortSession
Just (Just right_frame) -> do
case right_frame of
(NH2.Frame (NH2.FrameHeader _ _ stream_id) (NH2.WindowUpdateFrame credit) ) -> do
succeeded <- lift $ addCapacity stream_id (fromIntegral credit)
unless succeeded abortSession
frame@(NH2.Frame _ (NH2.SettingsFrame settings_list) ) -> do
case find (\(i,_) -> i == NH2.SettingsInitialWindowSize) settings_list of
Just (_, new_default_stream_size) -> do
old_default_stream_size_mvar <- view defaultStreamWindow
old_default_stream_size <- liftIO $ takeMVar old_default_stream_size_mvar
let general_delta = new_default_stream_size old_default_stream_size
stream_to_flow <- view stream2flow
liftIO . withMVar stream_to_flow $ \ stream_to_flow' ->
H.mapM_ (\ (k,v) ->
when (k /=0 ) $ putMVar v (AddBytes_FCM $! general_delta)
)
stream_to_flow'
liftIO $ putMVar old_default_stream_size_mvar $! new_default_stream_size
Nothing ->
return ()
liftIO $ sendToSession starting $! frame
a_frame@(NH2.Frame (NH2.FrameHeader _ _ stream_id) _ ) -> do
lift . updateLastStream $ stream_id
liftIO $ sendToSession starting a_frame
consume_continue
Nothing ->
return ()
outputGatherer :: SessionOutput -> FramerSession ()
outputGatherer session_output = do
frame_sent_report_callback <- view $
sessionsContext .
sessionsConfig .
sessionsCallbacks .
dataDeliveryCallback_SC
session_id <- view sessionIdAtFramer
let
dataForFrame p1 p2 =
LB.fromStrict $ NH2.encodeFrame p1 p2
cont = loopPart session_id frame_sent_report_callback
loopPart :: Int -> Maybe DataFrameDeliveryCallback -> FramerSession ()
loopPart session_id frame_sent_report_callback = do
command_or_frame <- liftIO $ getFrameFromSession session_output
case command_or_frame of
Left CancelSession_SOC -> do
INSTRUMENTATION( debugM "HTTP2.Framer" "CancelSession_SOC processed")
releaseFramer
Left (FinishStream_SOC stream_id ) ->
cont
Right ( p1@(NH2.EncodeInfo _ stream_idii _), p2@(NH2.DataFrame _), ef ) -> do
let
stream_id = stream_idii
priority = fromMaybe stream_id $ ef ^. priorityEffect_Ef
startStreamOutputQueueIfNotExists stream_id $ priority
stream2output_mvar <- view stream2outputBytes
lookup_result <- liftIO $ withMVar stream2output_mvar $ \ s2o -> H.lookup s2o stream_id
(stream_bytes_chan, frame_ordinal_mvar) <- case lookup_result of
Nothing ->
error "It is the end of the world at Framer.hs"
Just x -> return x
case (frame_sent_report_callback, ef ^. fragmentDeliveryCallback_Ef ) of
(Just c1, Just c2) -> liftIO $ do
when_delivered <- getTime Monotonic
ordinal <- modifyMVar frame_ordinal_mvar $ \ o -> return (o+1, o)
c1 session_id stream_id ordinal when_delivered
c2 ordinal when_delivered
(Nothing, Just c2) -> liftIO $ do
when_delivered <- getTime Monotonic
ordinal <- modifyMVar frame_ordinal_mvar $ \ o -> return (o+1, o)
c2 ordinal when_delivered
(Just c1, Nothing) -> liftIO $ do
when_delivered <- getTime Monotonic
ordinal <- modifyMVar frame_ordinal_mvar $ \ o -> return (o+1, o)
c1 session_id stream_id ordinal when_delivered
(Nothing, Nothing) -> return ()
liftIO $ putMVar stream_bytes_chan $! dataForFrame p1 p2
cont
Right (p1, p2@(NH2.PushPromiseFrame _ _), _effect ) -> do
handleHeadersOfStream p1 p2
cont
Right (p1, p2@(NH2.HeadersFrame _ _), _effect ) -> do
handleHeadersOfStream p1 p2
cont
Right (p1, p2@(NH2.ContinuationFrame _), _effect ) -> do
handleHeadersOfStream p1 p2
cont
Right (p1, p2, _effect) -> do
no_headers <- view noHeadersInChannel
liftIO $ takeMVar no_headers
pushFrame p1 p2
liftIO $ putMVar no_headers NoHeadersInChannel
cont
pushFrame
(NH2.EncodeInfo NH2.defaultFlags 0 Nothing)
(NH2.SettingsFrame [])
loopPart session_id frame_sent_report_callback
updateLastStream :: GlobalStreamId -> FramerSession ()
updateLastStream stream_id = do
last_stream_id_mvar <- view lastStream
liftIO $ modifyMVar_ last_stream_id_mvar (\ x -> return $ max x stream_id)
startStreamOutputQueueIfNotExists :: GlobalStreamId -> Int -> FramerSession ()
startStreamOutputQueueIfNotExists stream_id priority = do
table_mvar <- view stream2flow
val <- liftIO . withMVar table_mvar $ \ table -> H.lookup table stream_id
case val of
Nothing | stream_id /= 0 -> do
startStreamOutputQueue stream_id priority
return ()
_ ->
return ()
startStreamOutputQueue :: Int -> Int -> FramerSession (MVar LB.ByteString, MVar FlowControlCommand)
startStreamOutputQueue stream_id priority = do
bytes_chan <- liftIO newEmptyMVar
ordinal_num <- liftIO $ newMVar 0
command_chan <- liftIO newEmptyMVar
s2o_mvar <- view stream2outputBytes
liftIO . withMVar s2o_mvar $ \ s2o -> H.insert s2o stream_id (bytes_chan, ordinal_num)
stream2flow_mvar <- view stream2flow
liftIO . withMVar stream2flow_mvar $ \ s2c -> H.insert s2c stream_id command_chan
initial_cap_mvar <- view defaultStreamWindow
initial_cap <- liftIO $ readMVar initial_cap_mvar
close_action <- view closeAction
sessions_context <- view sessionsContext
session_id' <- view sessionIdAtFramer
output_is_forbidden_mvar <- view outputIsForbidden
let
close_on_error session_id session_context comp =
E.finally
(E.catch
comp
(exc_handler session_id session_context)
)
close_action
exc_handler :: Int -> SessionsContext -> IOProblem -> IO ()
exc_handler x y e = do
modifyMVar_ output_is_forbidden_mvar ( \ _ -> return True)
sessionExceptionHandler Framer_HTTP2SessionComponent x y e
read_state <- ask
liftIO $ forkIO $ close_on_error session_id' sessions_context $ runReaderT
(flowControlOutput stream_id priority initial_cap 0 "" command_chan bytes_chan)
read_state
return (bytes_chan , command_chan)
handleHeadersOfStream :: NH2.EncodeInfo -> NH2.FramePayload -> FramerSession ()
handleHeadersOfStream p1@(NH2.EncodeInfo {}) frame_payload
| frameIsHeadersAndOpensStream frame_payload && not (frameEndsHeaders p1 frame_payload) = do
no_headers <- view noHeadersInChannel
liftIO $ takeMVar no_headers
pushFrame p1 frame_payload
| frameIsHeadersAndOpensStream frame_payload && frameEndsHeaders p1 frame_payload = do
no_headers <- view noHeadersInChannel
liftIO $ takeMVar no_headers
pushFrame p1 frame_payload
liftIO $ putMVar no_headers NoHeadersInChannel
| frameEndsHeaders p1 frame_payload = do
no_headers <- view noHeadersInChannel
pushFrame p1 frame_payload
liftIO $ putMVar no_headers NoHeadersInChannel
return ()
| otherwise =
pushFrame p1 frame_payload
frameIsHeadersAndOpensStream :: NH2.FramePayload -> Bool
frameIsHeadersAndOpensStream (NH2.HeadersFrame _ _ ) = True
frameIsHeadersAndOpensStream (NH2.PushPromiseFrame _ _) = True
frameIsHeadersAndOpensStream _ = False
frameEndsHeaders :: NH2.EncodeInfo -> NH2.FramePayload -> Bool
frameEndsHeaders (NH2.EncodeInfo flags _ _) (NH2.HeadersFrame _ _) = NH2.testEndHeader flags
frameEndsHeaders (NH2.EncodeInfo flags _ _) (NH2.ContinuationFrame _) = NH2.testEndHeader flags
frameEndsHeaders (NH2.EncodeInfo flags _ _) (NH2.PushPromiseFrame _ _) = NH2.testEndHeader flags
frameEndsHeaders _ _ = False
pushFrame :: NH2.EncodeInfo
-> NH2.FramePayload -> FramerSession ()
pushFrame p1 p2 = do
let bs = LB.fromStrict $ NH2.encodeFrame p1 p2
sendBytes bs
sendGoAwayFrame :: NH2.ErrorCodeId -> FramerSession ()
sendGoAwayFrame error_code = do
last_stream_id_mvar <- view lastStream
last_stream_id <- liftIO $ readMVar last_stream_id_mvar
pushFrame
(NH2.EncodeInfo NH2.defaultFlags 0 Nothing)
(NH2.GoAwayFrame last_stream_id error_code "")
sendBytes :: LB.ByteString -> FramerSession ()
sendBytes bs = do
push_action <- view pushAction
can_output <- view canOutput
liftIO $
bs `seq`
C.bracket
(takeMVar can_output)
(const $ push_action bs)
(putMVar can_output)
flowControlOutput :: Int -> Int -> Int -> Int -> LB.ByteString -> MVar FlowControlCommand -> MVar LB.ByteString -> FramerSession ()
flowControlOutput stream_id priority capacity ordinal leftovers commands_chan bytes_chan =
if leftovers == ""
then do
bytes_to_send <- liftIO $ takeMVar bytes_chan
flowControlOutput stream_id priority capacity ordinal bytes_to_send commands_chan bytes_chan
else do
let amount = fromIntegral (LB.length leftovers 9)
if amount <= capacity
then do
withPrioritySend priority stream_id ordinal leftovers
flowControlOutput stream_id priority (capacity amount) (ordinal+1) "" commands_chan bytes_chan
else do
command <- liftIO $ takeMVar commands_chan
case command of
AddBytes_FCM delta_cap ->
flowControlOutput stream_id priority (capacity + delta_cap) ordinal leftovers commands_chan bytes_chan
releaseFramer :: FramerSession ()
releaseFramer =
return ()
withPrioritySend :: Int -> Int -> Int -> LB.ByteString -> FramerSession ()
withPrioritySend priority stream_id packet_ordinal datum = do
PrioritySendState {_semToSend = s, _prioQ = pqm } <- view prioritySendState
let
new_record = PrioPacket ( (priority, stream_id, packet_ordinal), datum)
liftIO $ do
MS.wait s
STM.atomically $ do
pq <- takeTMVar pqm
putTMVar pqm $ PQ.insert new_record pq
sendReordering :: FramerSession ()
sendReordering = do
PrioritySendState {_semToSend = s, _prioQ = pqm } <- view prioritySendState
no_headers <- view noHeadersInChannel
PrioPacket ( (priority_, stream_id, packed_ordinal_), datum) <- liftIO . STM.atomically $ do
pq <- takeTMVar pqm
if PQ.null pq
then STM.retry
else do
let
(record, thinner) = PQ.deleteFindMin pq
putTMVar pqm thinner
return record
liftIO $ MS.signal s
C.bracket
(liftIO $ takeMVar no_headers)
(\ _ -> liftIO $ putMVar no_headers NoHeadersInChannel)
(\ _ -> sendBytes datum )
liftIO $ replicateM_ 10 CC.yield
sendReordering