module Network.HTTP2.Client (
newHttp2Client
, withHttp2Stream
, headers
, sendData
, Http2Client(..)
, PushPromiseHandler
, StreamDefinition(..)
, StreamStarter
, TooMuchConcurrency(..)
, StreamThread
, Http2Stream(..)
, IncomingFlowControl(..)
, OutgoingFlowControl(..)
, FlagSetter
, wrapFrameClient
, _gtfo
, module Network.HTTP2.Client.FrameConnection
, module Network.Socket
, module Network.TLS
) where
import Control.Exception (bracket, throw)
import Control.Concurrent.Async (race)
import Control.Concurrent.MVar (newMVar, takeMVar, putMVar)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.Chan (Chan, newChan, dupChan, readChan, writeChan)
import Control.Monad (forever, void, when, forM_)
import Data.ByteString (ByteString)
import qualified Data.ByteString as ByteString
import Data.IORef (IORef, newIORef, atomicModifyIORef', readIORef)
import Data.Maybe (fromMaybe)
import Data.Monoid ((<>))
import GHC.Exception (Exception)
import Network.HPACK as HPACK
import Network.HTTP2 as HTTP2
import Network.Socket (HostName, PortNumber)
import Network.TLS (ClientParams)
import Network.HTTP2.Client.FrameConnection
data IncomingFlowControl = IncomingFlowControl {
_addCredit :: WindowSize -> IO ()
, _consumeCredit :: WindowSize -> IO Int
, _updateWindow :: IO Bool
}
data OutgoingFlowControl = OutgoingFlowControl {
_receiveCredit :: WindowSize -> IO ()
, _withdrawCredit :: WindowSize -> IO WindowSize
}
data StreamDefinition a = StreamDefinition {
_initStream :: IO StreamThread
, _handleStream :: IncomingFlowControl -> OutgoingFlowControl -> IO a
}
type StreamStarter a =
(Http2Stream -> StreamDefinition a) -> IO (Either TooMuchConcurrency a)
newtype TooMuchConcurrency = TooMuchConcurrency { _getStreamRoomNeeded :: Int }
deriving Show
data Http2Client = Http2Client {
_ping :: ByteString -> IO (IO (FrameHeader, FramePayload))
, _settings :: SettingsList -> IO (IO (FrameHeader, FramePayload))
, _goaway :: ErrorCodeId -> ByteString -> IO ()
, _startStream :: forall a. StreamStarter a
, _incomingFlowControl :: IncomingFlowControl
, _outgoingFlowControl :: OutgoingFlowControl
, _paylodSplitter :: IO PayloadSplitter
}
data ConnectionSettings = ConnectionSettings {
_clientSettings :: !Settings
, _serverSettings :: !Settings
}
defaultConnectionSettings :: ConnectionSettings
defaultConnectionSettings =
ConnectionSettings defaultSettings defaultSettings
_gtfo :: Http2Client -> ErrorCodeId -> ByteString -> IO ()
_gtfo = _goaway
data StreamThread = CST
data Http2Stream = Http2Stream {
_headers :: HPACK.HeaderList
-> (FrameFlags -> FrameFlags)
-> IO StreamThread
, _prio :: Priority -> IO ()
, _rst :: ErrorCodeId -> IO ()
, _waitHeaders :: IO (FrameHeader, StreamId, Either ErrorCode HeaderList)
, _waitData :: IO (FrameHeader, Either ErrorCode ByteString)
, _sendDataChunk :: (FrameFlags -> FrameFlags) -> ByteString -> IO ()
, _waitPushPromise :: PushPromiseHandler -> IO ()
}
type PushPromiseHandler =
StreamId -> Http2Stream -> HeaderList -> IncomingFlowControl -> OutgoingFlowControl -> IO ()
data HpackEncoderContext = HpackEncoderContext {
_encodeHeaders :: HeaderList -> IO HeaderBlockFragment
, _applySettings :: Int -> IO ()
}
newHttp2Client :: HostName
-> PortNumber
-> Int
-> Int
-> ClientParams
-> SettingsList
-> IO Http2Client
newHttp2Client host port encoderBufSize decoderBufSize tlsParams initSettings = do
conn <- newHttp2FrameConnection host port tlsParams
wrapFrameClient conn encoderBufSize decoderBufSize initSettings
withHttp2Stream :: Http2Client -> StreamStarter a
withHttp2Stream = _startStream
type FlagSetter = FrameFlags -> FrameFlags
headers :: Http2Stream -> HeaderList -> FlagSetter -> IO StreamThread
headers = _headers
wrapFrameClient
:: Http2FrameConnection
-> Int
-> Int
-> SettingsList
-> IO Http2Client
wrapFrameClient conn encoderBufSize decoderBufSize initSettings = do
hpackEncoder <- do
let strategy = (HPACK.defaultEncodeStrategy { HPACK.useHuffman = True })
dt <- HPACK.newDynamicTableForEncoding HPACK.defaultDynamicTableSize
let _encodeHeaders = HPACK.encodeHeader strategy encoderBufSize dt
let _applySettings n = HPACK.setLimitForEncoding n dt
return HpackEncoderContext{..}
clientStreamIdMutex <- newMVar 0
let withClientStreamId h = bracket (takeMVar clientStreamIdMutex)
(putMVar clientStreamIdMutex . succ)
(\k -> h (2 * k + 1))
let controlStream = makeFrameClientStream conn 0
let ackPing = sendPingFrame controlStream HTTP2.setAck
let ackSettings = sendSettingsFrame controlStream HTTP2.setAck []
maxReceivedStreamId <- newIORef 0
serverFrames <- newChan
_ <- forkIO $ incomingFramesLoop conn serverFrames maxReceivedStreamId
settings <- newIORef defaultConnectionSettings
controlFrames <- dupChan serverFrames
_ <- forkIO $ incomingControlFramesLoop controlFrames settings hpackEncoder ackPing ackSettings
serverStreamFrames <- dupChan serverFrames
serverHeaders <- newChan
hpackDecoder <- do
dt <- newDynamicTableForDecoding HPACK.defaultDynamicTableSize decoderBufSize
return dt
creditFrames <- dupChan serverFrames
_outgoingFlowControl <- newOutgoingFlowControl settings 0 creditFrames
_incomingFlowControl <- newIncomingFlowControl settings controlStream
serverPushPromises <- newChan
_ <- forkIO $ incomingHPACKFramesLoop serverStreamFrames
serverHeaders
serverPushPromises
hpackDecoder
dataFrames <- dupChan serverFrames
_ <- forkIO $ creditDataFramesLoop _incomingFlowControl dataFrames
conccurentStreams <- newIORef 0
let _startStream getWork = do
maxConcurrency <- fromMaybe 100 . maxConcurrentStreams . _serverSettings
<$> readIORef settings
roomNeeded <- atomicModifyIORef' conccurentStreams
(\n -> if n < maxConcurrency then (n + 1, 0) else (n, 1 + n maxConcurrency))
if roomNeeded > 0
then
return $ Left $ TooMuchConcurrency roomNeeded
else do
cont <- withClientStreamId $ \sid -> do
initializeStream conn
settings
serverFrames
serverHeaders
serverPushPromises
hpackEncoder
sid
getWork
v <- cont
atomicModifyIORef' conccurentStreams (\n -> (n 1, ()))
return $ Right v
let _ping dat = do
pingFrames <- dupChan serverFrames
sendPingFrame controlStream id dat
return $ waitFrame (isPingReply dat) pingFrames
let _settings settslist = do
pingFrames <- dupChan serverFrames
sendSettingsFrame controlStream id settslist
return $ do
ret <- waitFrame isSettingsReply pingFrames
atomicModifyIORef' settings
(\(ConnectionSettings cli srv) ->
(ConnectionSettings (HTTP2.updateSettings cli settslist) srv, ()))
return ret
let _goaway err errStr = do
sId <- readIORef maxReceivedStreamId
sendGTFOFrame controlStream sId err errStr
let _paylodSplitter = settingsPayloadSplitter <$> readIORef settings
_ <- forkIO . void =<< _settings initSettings
return $ Http2Client{..}
initializeStream
:: Exception e
=> Http2FrameConnection
-> IORef ConnectionSettings
-> FramesChan e
-> HeadersChan
-> PushPromisesChan e
-> HpackEncoderContext
-> StreamId
-> (Http2Stream -> StreamDefinition a)
-> IO (IO a)
initializeStream conn settings serverFrames serverHeaders serverPushPromises hpackEncoder sid getWork = do
let frameStream = makeFrameClientStream conn sid
frames <- dupChan serverFrames
credits <- dupChan serverFrames
headersFrames <- dupChan serverHeaders
pushPromises <- dupChan serverPushPromises
incomingStreamFlowControl <- newIncomingFlowControl settings frameStream
outgoingStreamFlowControl <- newOutgoingFlowControl settings sid credits
let _headers headersList flags = do
splitter <- settingsPayloadSplitter <$> readIORef settings
sendHeaders frameStream hpackEncoder headersList splitter flags
let _waitHeaders = waitHeadersWithStreamId sid headersFrames
let _waitData = do
(fh, fp) <- waitFrameWithTypeIdForStreamId sid [FrameRSTStream, FrameData] frames
case fp of
DataFrame dat -> do
_ <- _consumeCredit incomingStreamFlowControl (HTTP2.payloadLength fh)
_addCredit incomingStreamFlowControl (HTTP2.payloadLength fh)
return (fh, Right dat)
RSTStreamFrame err -> do
return (fh, Left $ HTTP2.fromErrorCodeId err)
_ -> error "waitFrameWithTypeIdForStreamId returned an unknown frame"
let _sendDataChunk = sendDataFrame frameStream
let _rst = sendResetFrame frameStream
let _prio = sendPriorityFrame frameStream
let _waitPushPromise ppHandler = do
(_,ppFrames,ppHeaders,ppSid,ppReadHeaders) <- waitPushPromiseWithParentStreamId sid pushPromises
let mkStreamActions stream = StreamDefinition (return CST) (ppHandler sid stream ppReadHeaders)
ppCont <- initializeStream conn
settings
ppFrames
ppHeaders
serverPushPromises
hpackEncoder
ppSid
mkStreamActions
ppCont
let streamActions = getWork $ Http2Stream{..}
_ <- _initStream streamActions
return $ _handleStream streamActions incomingStreamFlowControl outgoingStreamFlowControl
incomingFramesLoop
:: Http2FrameConnection
-> Chan (FrameHeader, Either HTTP2Error FramePayload)
-> IORef StreamId
-> IO ()
incomingFramesLoop conn frames maxReceivedStreamId = forever $ do
frame@(fh, _) <- next conn
atomicModifyIORef' maxReceivedStreamId (\n -> (max n (streamId fh), ()))
writeChan frames frame
incomingControlFramesLoop
:: Exception e
=> Chan (FrameHeader, Either e FramePayload)
-> IORef ConnectionSettings
-> HpackEncoderContext
-> (ByteString -> IO ())
-> IO ()
-> IO ()
incomingControlFramesLoop frames settings hpackEncoder ackPing ackSettings = forever $ do
controlFrame@(fh, payload) <- waitFrameWithStreamId 0 frames
case payload of
(SettingsFrame settsList)
| not . testAck . flags $ fh -> do
atomicModifyIORef' settings
(\(ConnectionSettings cli srv) ->
(ConnectionSettings cli (HTTP2.updateSettings srv settsList), ()))
maybe (return ())
(_applySettings hpackEncoder)
(lookup SettingsHeaderTableSize settsList)
ackSettings
| otherwise -> do
ignore "TODO: settings ack should be taken into account only after reception, we should return a waitSettingsAck in the _settings function"
(PingFrame pingMsg)
| not . testAck . flags $ fh ->
ackPing pingMsg
| otherwise -> do
ignore "PingFrame replies waited for in the requestor thread"
(WindowUpdateFrame _ ) ->
ignore "connection-wide WindowUpdateFrame waited for in OutgoingFlowControl threads"
_ -> putStrLn ("UNHANDLED frame: " <> show controlFrame)
where
ignore :: String -> IO ()
ignore _ = return ()
creditDataFramesLoop
:: Exception e
=> IncomingFlowControl
-> Chan (FrameHeader, Either e FramePayload)
-> IO ()
creditDataFramesLoop flowControl frames = forever $ do
(fh,_) <- waitFrameWithTypeId [FrameData] frames
_ <- _consumeCredit flowControl (HTTP2.payloadLength fh)
_addCredit flowControl (HTTP2.payloadLength fh)
data HPACKLoopDecision =
ForwardHeader !StreamId
| OpenPushPromise !StreamId !StreamId
type FramesChan e = Chan (FrameHeader, Either e FramePayload)
type HeadersChan = Chan (FrameHeader, StreamId, Either ErrorCode HeaderList)
type PushPromisesChanContent e = (StreamId, FramesChan e, HeadersChan, StreamId, HeaderList)
type PushPromisesChan e = Chan (PushPromisesChanContent e)
incomingHPACKFramesLoop
:: Exception e
=> FramesChan e
-> HeadersChan
-> PushPromisesChan e
-> DynamicTable
-> IO ()
incomingHPACKFramesLoop frames hdrs pushPromises hpackDecoder = forever $ do
(fh, fp) <- waitFrameWithTypeId [ FrameRSTStream
, FramePushPromise
, FrameHeaders
]
frames
let sid = HTTP2.streamId fh
(decision, pattern) <- case fp of
PushPromiseFrame ppSid hbf -> do
return (OpenPushPromise sid ppSid, Right hbf)
HeadersFrame _ hbf ->
return (ForwardHeader sid, Right hbf)
RSTStreamFrame err ->
return (ForwardHeader sid, Left err)
_ -> error "wrong TypeId"
let go curFh (Right buffer) =
if not $ HTTP2.testEndHeader (HTTP2.flags curFh)
then do
(lastFh, lastFp) <- waitFrameWithTypeId [ FrameRSTStream
, FrameContinuation
]
frames
case lastFp of
ContinuationFrame chbf ->
go lastFh (Right (ByteString.append buffer chbf))
RSTStreamFrame err ->
go lastFh (Left err)
_ -> error "waitFrameWithTypeIdForStreamId returned an unknown frame"
else do
newHdrs <- decodeHeader hpackDecoder buffer
case decision of
ForwardHeader sId ->
writeChan hdrs (curFh, sId, Right newHdrs)
OpenPushPromise parentSid newSid -> do
ppChan <- dupChan frames
ppHeaders <- dupChan hdrs
writeChan pushPromises (parentSid, ppChan, ppHeaders, newSid, newHdrs)
go curFh (Left err) =
writeChan hdrs (curFh, sid, (Left $ HTTP2.fromErrorCodeId err))
go fh pattern
newIncomingFlowControl
:: IORef ConnectionSettings
-> Http2FrameClientStream
-> IO IncomingFlowControl
newIncomingFlowControl settings stream = do
creditAdded <- newIORef 0
creditConsumed <- newIORef 0
let _addCredit n = atomicModifyIORef' creditAdded (\c -> (c + n, ()))
let _consumeCredit n = do
conso <- atomicModifyIORef' creditConsumed (\c -> (c + n, c + n))
base <- initialWindowSize . _clientSettings <$> readIORef settings
extra <- readIORef creditAdded
return $ base + extra conso
let _updateWindow = do
base <- initialWindowSize . _clientSettings <$> readIORef settings
added <- readIORef creditAdded
consumed <- readIORef creditConsumed
let transferred = min added (HTTP2.maxWindowSize base + consumed)
let shouldUpdate = transferred > 0
_addCredit (negate transferred)
_ <- _consumeCredit (negate transferred)
when shouldUpdate (sendWindowUpdateFrame stream transferred)
return shouldUpdate
return $ IncomingFlowControl _addCredit _consumeCredit _updateWindow
newOutgoingFlowControl ::
Exception e
=> IORef ConnectionSettings
-> StreamId
-> Chan (FrameHeader, Either e FramePayload)
-> IO OutgoingFlowControl
newOutgoingFlowControl settings sid frames = do
credit <- newIORef 0
let receive n = atomicModifyIORef' credit (\c -> (c + n, ()))
let withdraw 0 = return 0
withdraw n = do
base <- initialWindowSize . _serverSettings <$> readIORef settings
got <- atomicModifyIORef' credit (\c ->
if base + c >= n
then (c n, n)
else (0 base, base + c))
if got > 0
then return got
else do
amount <- race (waitSettingsChange base) waitSomeCredit
receive (either (const 0) id amount)
withdraw n
return $ OutgoingFlowControl receive withdraw
where
waitSettingsChange prev = do
new <- initialWindowSize . _serverSettings <$> readIORef settings
if new == prev then threadDelay 1000000 >> waitSettingsChange prev else return ()
waitSomeCredit = do
(_, fp) <- waitFrameWithTypeIdForStreamId sid [FrameWindowUpdate] frames
case fp of
WindowUpdateFrame amt -> return amt
_ -> error "waitFrameWithTypeIdForStreamId returned an unknown frame"
sendHeaders
:: Http2FrameClientStream
-> HpackEncoderContext
-> HeaderList
-> PayloadSplitter
-> (FrameFlags -> FrameFlags)
-> IO StreamThread
sendHeaders s enc hdrs blockSplitter flagmod = do
headerBlockFragments <- blockSplitter <$> _encodeHeaders enc hdrs
let framers = (HeadersFrame Nothing) : repeat ContinuationFrame
let frames = zipWith ($) framers headerBlockFragments
let modifiersReversed = (HTTP2.setEndHeader . flagmod) : repeat id
let arrangedFrames = reverse $ zip modifiersReversed (reverse frames)
sendBackToBack s arrangedFrames
return CST
type PayloadSplitter = ByteString -> [ByteString]
settingsPayloadSplitter :: ConnectionSettings -> PayloadSplitter
settingsPayloadSplitter (ConnectionSettings _ srv) =
fixedSizeChunks (maxFrameSize srv)
fixedSizeChunks :: Int -> ByteString -> [ByteString]
fixedSizeChunks 0 _ = error "cannot chunk by zero-length blocks"
fixedSizeChunks _ "" = []
fixedSizeChunks len bstr =
let
(chunk, rest) = ByteString.splitAt len bstr
in
chunk : fixedSizeChunks len rest
sendData :: Http2Client -> Http2Stream -> FlagSetter -> ByteString -> IO ()
sendData conn stream flagmod dat = do
splitter <- _paylodSplitter conn
let chunks = splitter dat
let pairs = reverse $ zip (flagmod : repeat id) (reverse chunks)
when (null chunks) $ _sendDataChunk stream flagmod ""
forM_ pairs $ \(flags, chunk) -> _sendDataChunk stream flags chunk
sendDataFrame
:: Http2FrameClientStream
-> (FrameFlags -> FrameFlags) -> ByteString -> IO ()
sendDataFrame s flagmod dat = do
sendOne s flagmod (DataFrame dat)
sendResetFrame :: Http2FrameClientStream -> ErrorCodeId -> IO ()
sendResetFrame s err = do
sendOne s id (RSTStreamFrame err)
sendGTFOFrame
:: Http2FrameClientStream
-> StreamId -> ErrorCodeId -> ByteString -> IO ()
sendGTFOFrame s lastStreamId err errStr = do
sendOne s id (GoAwayFrame lastStreamId err errStr)
rfcError :: String -> a
rfcError msg = error (msg ++ "draft-ietf-httpbis-http2-17")
sendPingFrame
:: Http2FrameClientStream
-> (FrameFlags -> FrameFlags)
-> ByteString
-> IO ()
sendPingFrame s flags dat
| _getStreamId s /= 0 =
rfcError "PING frames are not associated with any individual stream."
| ByteString.length dat /= 8 =
rfcError "PING frames MUST contain 8 octets"
| otherwise = sendOne s flags (PingFrame dat)
sendWindowUpdateFrame
:: Http2FrameClientStream -> WindowSize -> IO ()
sendWindowUpdateFrame s amount = do
let payload = WindowUpdateFrame amount
sendOne s id payload
return ()
sendSettingsFrame
:: Http2FrameClientStream
-> (FrameFlags -> FrameFlags) -> SettingsList -> IO ()
sendSettingsFrame s flags setts
| _getStreamId s /= 0 =
rfcError "The stream identifier for a SETTINGS frame MUST be zero (0x0)."
| otherwise = do
let payload = SettingsFrame setts
sendOne s flags payload
return ()
sendPriorityFrame :: Http2FrameClientStream -> Priority -> IO ()
sendPriorityFrame s p = do
let payload = PriorityFrame p
sendOne s id payload
return ()
waitFrameWithStreamId
:: Exception e =>
StreamId -> Chan (FrameHeader, Either e FramePayload) -> IO (FrameHeader, FramePayload)
waitFrameWithStreamId sid = waitFrame (\h _ -> streamId h == sid)
waitFrameWithTypeId
:: (Exception e)
=> [FrameTypeId]
-> Chan (FrameHeader, Either e FramePayload) -> IO (FrameHeader, FramePayload)
waitFrameWithTypeId tids = waitFrame (\_ p -> HTTP2.framePayloadToFrameTypeId p `elem` tids)
waitFrameWithTypeIdForStreamId
:: (Exception e)
=> StreamId
-> [FrameTypeId]
-> Chan (FrameHeader, Either e FramePayload)
-> IO (FrameHeader, FramePayload)
waitFrameWithTypeIdForStreamId sid tids =
waitFrame (\h p -> streamId h == sid && HTTP2.framePayloadToFrameTypeId p `elem` tids)
waitFrame
:: Exception e
=> (FrameHeader -> FramePayload -> Bool)
-> Chan (FrameHeader, Either e FramePayload)
-> IO (FrameHeader, FramePayload)
waitFrame test chan =
loop
where
loop = do
(fHead, fPayload) <- readChan chan
let dat = either throw id fPayload
if test fHead dat
then return (fHead, dat)
else loop
isPingReply :: ByteString -> FrameHeader -> FramePayload -> Bool
isPingReply datSent _ (PingFrame datRcv) = datSent == datRcv
isPingReply _ _ _ = False
isSettingsReply :: FrameHeader -> FramePayload -> Bool
isSettingsReply fh (SettingsFrame _) = HTTP2.testAck (flags fh)
isSettingsReply _ _ = False
waitHeadersWithStreamId
:: StreamId
-> Chan (FrameHeader, StreamId, t)
-> IO (FrameHeader, StreamId, t)
waitHeadersWithStreamId sid =
waitHeaders (\_ s _ -> s == sid)
waitHeaders
:: (FrameHeader -> StreamId -> t -> Bool)
-> Chan (FrameHeader, StreamId, t)
-> IO (FrameHeader, StreamId, t)
waitHeaders test chan =
loop
where
loop = do
tuple@(fH, sId, hdrs) <- readChan chan
if test fH sId hdrs
then return tuple
else loop
waitPushPromiseWithParentStreamId
:: StreamId
-> PushPromisesChan e
-> IO (PushPromisesChanContent e)
waitPushPromiseWithParentStreamId sid chan =
loop
where
loop = do
tuple@(parentSid,_,_,_,_) <- readChan chan
if parentSid == sid
then return tuple
else loop