module SecondTransfer.Http2.Framer (
BadPrefaceException,
wrapSession,
http2FrameLength,
closeAction
) where
import Control.Concurrent
import qualified Control.Concurrent.MSem as MS
import Control.Concurrent.STM.TMVar
import qualified Control.Concurrent.STM as STM
import Control.Exception
import qualified Control.Exception as E
import Control.Lens (view)
import qualified Control.Lens as L
import Control.Monad (unless, when)
import Control.Monad.IO.Class (liftIO)
import qualified Control.Monad.Catch as C
import Control.Monad.Trans.Class (lift)
import Control.DeepSeq (($!!))
import Control.Monad.Trans.Reader
import Data.Binary (decode)
import qualified Data.ByteString as B
import Data.ByteString.Char8 (pack)
import qualified Data.ByteString.Lazy as LB
import qualified Data.ByteString.Builder as Bu
import Data.Conduit
import Data.Foldable (find)
import qualified Data.PQueue.Min as PQ
import qualified Network.HTTP2 as NH2
import System.Log.Logger
import qualified Data.HashTable.IO as H
import SecondTransfer.Sessions.Internal (sessionExceptionHandler, nextSessionId, SessionsContext)
import SecondTransfer.Http2.Session
import SecondTransfer.MainLoop.CoherentWorker (CoherentWorker)
import qualified SecondTransfer.MainLoop.Framer as F
import SecondTransfer.MainLoop.PushPullType (Attendant, CloseAction,
PullAction, PushAction)
import SecondTransfer.Utils (Word24, word24ToInt)
import SecondTransfer.Exception
import SecondTransfer.MainLoop.Logging (logWithExclusivity, logit)
#include "Logging.cpphs"
http2PrefixLength :: Int
http2PrefixLength = B.length NH2.connectionPreface
type HashTable k v = H.CuckooHashTable k v
type GlobalStreamId = Int
data FlowControlCommand =
AddBytes_FCM Int
|Finish_FCM
type Stream2AvailSpace = HashTable GlobalStreamId (MVar FlowControlCommand)
data CanOutput = CanOutput
data NoHeadersInChannel = NoHeadersInChannel
maxPacketsInQueue :: Int
maxPacketsInQueue = 1024
newtype PrioPacket = PrioPacket ( (Int,Int), LB.ByteString)
deriving Show
instance Eq PrioPacket where
(==) (PrioPacket (a,_)) (PrioPacket (b,_)) = a == b
instance Ord PrioPacket where
compare (PrioPacket (a,_)) (PrioPacket (b,_)) = compare a b
data PrioritySendState = PrioritySendState {
_semToSend :: MS.MSem Int
,_prioQ :: TMVar (PQ.MinQueue PrioPacket)
}
data FramerSessionData = FramerSessionData {
_stream2flow :: MVar Stream2AvailSpace
, _stream2outputBytes :: MVar ( HashTable GlobalStreamId (MVar LB.ByteString) )
, _defaultStreamWindow :: MVar Int
, _canOutput :: MVar CanOutput
, _outputIsForbidden :: MVar Bool
, _noHeadersInChannel :: MVar NoHeadersInChannel
, _pushAction :: PushAction
, _closeAction :: CloseAction
, _sessionId :: Int
, _sessionsContext :: SessionsContext
, _lastStream :: MVar Int
, _prioritySendState :: PrioritySendState
}
L.makeLenses ''FramerSessionData
type FramerSession = ReaderT FramerSessionData IO
wrapSession :: CoherentWorker -> SessionsContext -> Attendant
wrapSession coherent_worker sessions_context push_action pull_action close_action = do
let
session_id_mvar = view nextSessionId sessions_context
new_session_id <- modifyMVarMasked
session_id_mvar $
\ session_id -> return (session_id+1, session_id)
(session_input, session_output) <- http2Session
coherent_worker
new_session_id
sessions_context
s2f <- H.new
stream2flow_mvar <- newMVar s2f
s2o <- H.new
stream2output_bytes_mvar <- newMVar s2o
default_stream_size_mvar <- newMVar 65536
can_output <- newMVar CanOutput
no_headers_in_channel <- newMVar NoHeadersInChannel
last_stream_id <- newMVar 0
output_is_forbidden <- newMVar False
prio_mvar <- STM.atomically $ newTMVar PQ.empty
sem_to_send <- MS.new maxPacketsInQueue
let framer_session_data = FramerSessionData {
_stream2flow = stream2flow_mvar
,_stream2outputBytes = stream2output_bytes_mvar
,_defaultStreamWindow = default_stream_size_mvar
,_canOutput = can_output
,_noHeadersInChannel = no_headers_in_channel
,_pushAction = push_action
,_closeAction = close_action
,_sessionId = new_session_id
,_sessionsContext = sessions_context
,_lastStream = last_stream_id
,_outputIsForbidden = output_is_forbidden
,_prioritySendState = PrioritySendState {
_semToSend = sem_to_send,
_prioQ = prio_mvar
}
}
let
close_on_error session_id session_context comp =
E.finally (
E.catch
(
E.catch
comp
(exc_handler session_id session_context)
)
(io_exc_handler session_id session_context)
)
close_action
exc_handler :: Int -> SessionsContext -> FramerException -> IO ()
exc_handler x y e = do
modifyMVar_ output_is_forbidden (\ _ -> return True)
INSTRUMENTATION( errorM "HTTP2.Framer" "Exception went up" )
sessionExceptionHandler Framer_HTTP2SessionComponent x y e
io_exc_handler :: Int -> SessionsContext -> IOProblem -> IO ()
io_exc_handler _x _y _e =
modifyMVar_ output_is_forbidden (\ _ -> return True)
forkIO
$ close_on_error new_session_id sessions_context
$ runReaderT (inputGatherer pull_action session_input ) framer_session_data
forkIO
$ close_on_error new_session_id sessions_context
$ runReaderT (outputGatherer session_output ) framer_session_data
forkIO
$ close_on_error new_session_id sessions_context
$ runReaderT sendReordering framer_session_data
return ()
http2FrameLength :: F.LengthCallback
http2FrameLength bs
| B.length bs >= 3 = let
word24 = decode input_as_lbs :: Word24
input_as_lbs = LB.fromStrict bs
in
Just $ word24ToInt word24 + 9
| otherwise = Nothing
addCapacity ::
GlobalStreamId ->
Int ->
FramerSession Bool
addCapacity 0 delta_cap =
return True
addCapacity stream_id delta_cap =
do
table_mvar <- view stream2flow
val <- liftIO $ withMVar table_mvar $ \ table ->
H.lookup table stream_id
last_stream_mvar <- view lastStream
last_stream <- liftIO . readMVar $ last_stream_mvar
case val of
Nothing | stream_id > last_stream ->
return False
| otherwise ->
return True
Just command_chan -> do
liftIO $ putMVar command_chan $ AddBytes_FCM delta_cap
return True
finishFlowControlForStream :: GlobalStreamId -> FramerSession ()
finishFlowControlForStream stream_id =
do
table_mvar <- view stream2flow
liftIO . withMVar table_mvar $ \ table -> do
val <- H.lookup table stream_id
case val of
Nothing -> return ()
Just command_chan -> do
liftIO $
H.delete table stream_id
return ()
table2_mvar <- view stream2outputBytes
liftIO . withMVar table2_mvar $ \ table -> do
val <- H.lookup table stream_id
case val of
Nothing -> return ()
Just _ ->
liftIO $ H.delete table stream_id
inputGatherer :: PullAction -> SessionInput -> FramerSession ()
inputGatherer pull_action session_input = do
(prefix, remaining) <- liftIO $ F.readLength http2PrefixLength pull_action
if prefix /= NH2.connectionPreface
then do
sendGoAwayFrame NH2.ProtocolError
liftIO $
throwIO BadPrefaceException
else
INSTRUMENTATION( debugM "HTTP2.Framer" "Prologue validated" )
let
source::Source FramerSession B.ByteString
source = transPipe liftIO $ F.readNextChunk http2FrameLength remaining pull_action
source $$ consume True
where
sendToSession :: Bool -> InputFrame -> IO ()
sendToSession starting frame =
if starting
then
sendFirstFrameToSession session_input frame
else
sendMiddleFrameToSession session_input frame
abortSession :: Sink B.ByteString FramerSession ()
abortSession =
lift $ do
sendGoAwayFrame NH2.ProtocolError
liftIO $ sendCommandToSession session_input CancelSession_SIC
releaseFramer
consume_continue = consume False
consume :: Bool -> Sink B.ByteString FramerSession ()
consume starting = do
maybe_bytes <- await
case maybe_bytes of
Just bytes -> do
let
error_or_frame = NH2.decodeFrame some_settings bytes
some_settings = NH2.defaultSettings
case error_or_frame of
Left _ -> do
INSTRUMENTATION( errorM "HTTP2.Framer" "CouldNotDecodeFrame" )
abortSession
Right right_frame -> do
case right_frame of
(NH2.Frame (NH2.FrameHeader _ _ stream_id) (NH2.WindowUpdateFrame credit) ) -> do
succeeded <- lift $ addCapacity (NH2.fromStreamIdentifier stream_id) (fromIntegral credit)
unless succeeded abortSession
frame@(NH2.Frame _ (NH2.SettingsFrame settings_list) ) -> do
case find (\(i,_) -> i == NH2.SettingsInitialWindowSize) settings_list of
Just (_, new_default_stream_size) -> do
old_default_stream_size_mvar <- view defaultStreamWindow
old_default_stream_size <- liftIO $ takeMVar old_default_stream_size_mvar
let general_delta = new_default_stream_size old_default_stream_size
stream_to_flow <- view stream2flow
liftIO . withMVar stream_to_flow $ \ stream_to_flow' ->
H.mapM_ (\ (k,v) ->
when (k /=0 ) $ putMVar v (AddBytes_FCM $! general_delta)
)
stream_to_flow'
liftIO $ putMVar old_default_stream_size_mvar $! new_default_stream_size
Nothing ->
return ()
liftIO $ sendToSession starting $! frame
a_frame@(NH2.Frame (NH2.FrameHeader _ _ stream_id) _ ) -> do
lift . startStreamOutputQueueIfNotExists $ NH2.fromStreamIdentifier stream_id
lift . updateLastStream $ NH2.fromStreamIdentifier stream_id
liftIO $ sendToSession starting a_frame
consume_continue
Nothing ->
return ()
outputGatherer :: SessionOutput -> FramerSession ()
outputGatherer session_output = do
pushFrame
(NH2.EncodeInfo NH2.defaultFlags (NH2.toStreamIdentifier 0) Nothing)
(NH2.SettingsFrame [])
loopPart
where
dataForFrame p1 p2 =
LB.fromStrict $ NH2.encodeFrame p1 p2
loopPart :: FramerSession ()
loopPart = do
command_or_frame <- liftIO $ getFrameFromSession session_output
case command_or_frame of
Left CancelSession_SOC -> do
INSTRUMENTATION( debugM "HTTP2.Framer" "CancelSession_SOC processed")
releaseFramer
Left (FinishStream_SOC stream_id ) ->
loopPart
Right ( p1@(NH2.EncodeInfo _ stream_idii _), p2@(NH2.DataFrame _) ) -> do
let stream_id = NH2.fromStreamIdentifier stream_idii
stream2output_mvar <- view stream2outputBytes
lookup_result <- liftIO $ withMVar stream2output_mvar $ \ s2o -> H.lookup s2o stream_id
stream_bytes_chan <- case lookup_result of
Nothing ->
error "It is the end of the world at Framer.hs"
Just bytes_chan -> return bytes_chan
liftIO $ putMVar stream_bytes_chan $! dataForFrame p1 p2
loopPart
Right (p1, p2@(NH2.HeadersFrame _ _) ) -> do
handleHeadersOfStream p1 p2
loopPart
Right (p1, p2@(NH2.ContinuationFrame _) ) -> do
handleHeadersOfStream p1 p2
loopPart
Right (p1, p2) -> do
no_headers <- view noHeadersInChannel
liftIO $ takeMVar no_headers
pushFrame p1 p2
liftIO $ putMVar no_headers NoHeadersInChannel
loopPart
updateLastStream :: GlobalStreamId -> FramerSession ()
updateLastStream stream_id = do
last_stream_id_mvar <- view lastStream
liftIO $ modifyMVar_ last_stream_id_mvar (\ x -> return $ max x stream_id)
startStreamOutputQueueIfNotExists :: GlobalStreamId -> FramerSession ()
startStreamOutputQueueIfNotExists stream_id = do
table_mvar <- view stream2flow
val <- liftIO . withMVar table_mvar $ \ table -> H.lookup table stream_id
case val of
Nothing | stream_id /= 0 -> do
startStreamOutputQueue stream_id
return ()
_ ->
return ()
startStreamOutputQueue :: Int -> FramerSession (MVar LB.ByteString, MVar FlowControlCommand)
startStreamOutputQueue stream_id = do
bytes_chan <- liftIO newEmptyMVar
command_chan <- liftIO newEmptyMVar
s2o_mvar <- view stream2outputBytes
liftIO . withMVar s2o_mvar $ \ s2o -> H.insert s2o stream_id bytes_chan
stream2flow_mvar <- view stream2flow
liftIO . withMVar stream2flow_mvar $ \ s2c -> H.insert s2c stream_id command_chan
initial_cap_mvar <- view defaultStreamWindow
initial_cap <- liftIO $ readMVar initial_cap_mvar
close_action <- view closeAction
sessions_context <- view sessionsContext
session_id' <- view SecondTransfer.Http2.Framer.sessionId
output_is_forbidden_mvar <- view outputIsForbidden
let
close_on_error session_id session_context comp =
E.finally
(E.catch
comp
(exc_handler session_id session_context)
)
close_action
exc_handler :: Int -> SessionsContext -> IOProblem -> IO ()
exc_handler x y e = do
modifyMVar_ output_is_forbidden_mvar ( \ _ -> return True)
sessionExceptionHandler Framer_HTTP2SessionComponent x y e
read_state <- ask
liftIO $ forkIO $ close_on_error session_id' sessions_context $ runReaderT
(flowControlOutput stream_id initial_cap 0 "" command_chan bytes_chan)
read_state
return (bytes_chan , command_chan)
handleHeadersOfStream :: NH2.EncodeInfo -> NH2.FramePayload -> FramerSession ()
handleHeadersOfStream p1@(NH2.EncodeInfo {}) frame_payload
| frameIsHeadersAndOpensStream frame_payload && not (frameEndsHeaders p1 frame_payload) = do
no_headers <- view noHeadersInChannel
liftIO $ takeMVar no_headers
pushFrame p1 frame_payload
| frameIsHeadersAndOpensStream frame_payload && frameEndsHeaders p1 frame_payload = do
no_headers <- view noHeadersInChannel
liftIO $ takeMVar no_headers
pushFrame p1 frame_payload
liftIO $ putMVar no_headers NoHeadersInChannel
| frameEndsHeaders p1 frame_payload = do
no_headers <- view noHeadersInChannel
pushFrame p1 frame_payload
liftIO $ putMVar no_headers NoHeadersInChannel
return ()
| otherwise =
pushFrame p1 frame_payload
frameIsHeadersAndOpensStream :: NH2.FramePayload -> Bool
frameIsHeadersAndOpensStream (NH2.HeadersFrame _ _ )
= True
frameIsHeadersAndOpensStream _
= False
frameEndsHeaders :: NH2.EncodeInfo -> NH2.FramePayload -> Bool
frameEndsHeaders (NH2.EncodeInfo flags _ _) (NH2.HeadersFrame _ _) = NH2.testEndHeader flags
frameEndsHeaders (NH2.EncodeInfo flags _ _) (NH2.ContinuationFrame _) = NH2.testEndHeader flags
frameEndsHeaders _ _ = False
pushFrame :: NH2.EncodeInfo
-> NH2.FramePayload -> FramerSession ()
pushFrame p1 p2 = do
let bs = LB.fromStrict $ NH2.encodeFrame p1 p2
sendBytes bs
sendGoAwayFrame :: NH2.ErrorCodeId -> FramerSession ()
sendGoAwayFrame error_code = do
last_stream_id_mvar <- view lastStream
last_stream_id <- liftIO $ readMVar last_stream_id_mvar
pushFrame (NH2.EncodeInfo NH2.defaultFlags (NH2.toStreamIdentifier 0) Nothing)
(NH2.GoAwayFrame (NH2.toStreamIdentifier last_stream_id) error_code "")
sendBytes :: LB.ByteString -> FramerSession ()
sendBytes bs = do
push_action <- view pushAction
can_output <- view canOutput
liftIO $
bs `seq`
C.bracket
(takeMVar can_output)
(const $ push_action bs)
(putMVar can_output)
flowControlOutput :: Int -> Int -> Int -> LB.ByteString -> MVar FlowControlCommand -> MVar LB.ByteString -> FramerSession ()
flowControlOutput stream_id capacity ordinal leftovers commands_chan bytes_chan =
if leftovers == ""
then do
bytes_to_send <- liftIO $ takeMVar bytes_chan
flowControlOutput stream_id capacity ordinal bytes_to_send commands_chan bytes_chan
else do
let amount = fromIntegral (LB.length leftovers 9)
if amount <= capacity
then do
withPrioritySend stream_id ordinal leftovers
flowControlOutput stream_id (capacity amount) (ordinal+1) "" commands_chan bytes_chan
else do
command <- liftIO $ takeMVar commands_chan
case command of
AddBytes_FCM delta_cap ->
flowControlOutput stream_id (capacity + delta_cap) ordinal leftovers commands_chan bytes_chan
releaseFramer :: FramerSession ()
releaseFramer =
return ()
withPrioritySend :: GlobalStreamId -> Int -> LB.ByteString -> FramerSession ()
withPrioritySend stream_id packet_ordinal datum = do
PrioritySendState {_semToSend = s, _prioQ = pqm } <- view prioritySendState
let
new_record = PrioPacket $!! ( (stream_id,packet_ordinal),datum)
liftIO $ do
MS.wait s
STM.atomically $ do
pq <- takeTMVar pqm
putTMVar pqm $ PQ.insert new_record pq
sendReordering :: FramerSession ()
sendReordering = do
PrioritySendState {_semToSend = s, _prioQ = pqm } <- view prioritySendState
no_headers <- view noHeadersInChannel
PrioPacket (_, datum) <- liftIO . STM.atomically $ do
pq <- takeTMVar pqm
if PQ.null pq
then STM.retry
else do
let
(record, thinner) = PQ.deleteFindMin pq
putTMVar pqm thinner
return record
liftIO $ MS.signal s
C.bracket
(liftIO $ takeMVar no_headers)
(\ _ -> liftIO $ putMVar no_headers NoHeadersInChannel)
(\ _ -> sendBytes datum )
sendReordering