{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.Client.Run where

import Control.Concurrent.STM (check)
import Control.Exception
import Data.ByteString.Builder (Builder)
import Data.IORef
import Network.Control (RxFlow (..), defaultMaxData)
import Network.Socket (SockAddr)
import UnliftIO.Async
import UnliftIO.Concurrent
import UnliftIO.STM

import Imports
import Network.HTTP2.Client.Types
import Network.HTTP2.Frame
import Network.HTTP2.H2

-- | Client configuration
data ClientConfig = ClientConfig
    { ClientConfig -> Scheme
scheme :: Scheme
    -- ^ https or http
    , ClientConfig -> Scheme
authority :: Authority
    -- ^ Server name
    , ClientConfig -> StreamId
cacheLimit :: Int
    -- ^ The maximum number of incoming streams on the net
    , ClientConfig -> StreamId
connectionWindowSize :: WindowSize
    -- ^ The window size of connection.
    , ClientConfig -> Settings
settings :: Settings
    -- ^ Settings
    }
    deriving (ClientConfig -> ClientConfig -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ClientConfig -> ClientConfig -> Bool
$c/= :: ClientConfig -> ClientConfig -> Bool
== :: ClientConfig -> ClientConfig -> Bool
$c== :: ClientConfig -> ClientConfig -> Bool
Eq, StreamId -> ClientConfig -> ShowS
[ClientConfig] -> ShowS
ClientConfig -> String
forall a.
(StreamId -> a -> ShowS)
-> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ClientConfig] -> ShowS
$cshowList :: [ClientConfig] -> ShowS
show :: ClientConfig -> String
$cshow :: ClientConfig -> String
showsPrec :: StreamId -> ClientConfig -> ShowS
$cshowsPrec :: StreamId -> ClientConfig -> ShowS
Show)

-- | The default client config.
--
-- The @authority@ field will be used to set the HTTP2 @:authority@
-- pseudo-header. In most cases you will want to override it to be equal to
-- @host@.
--
-- Further background on @authority@:
-- [RFC 3986](https://datatracker.ietf.org/doc/html/rfc3986#section-3.2) also
-- allows @host:port@, and most servers will accept this too. However, when
-- using TLS, many servers will expect the TLS SNI server name and the
-- @:authority@ pseudo-header to be equal, and for TLS SNI the server name
-- should not include the port. Note that HTTP2 explicitly /disallows/ using
-- @userinfo\@@ as part of the authority.
--
-- >>> defaultClientConfig
-- ClientConfig {scheme = "http", authority = "localhost", cacheLimit = 64, connectionWindowSize = 1048576, settings = Settings {headerTableSize = 4096, enablePush = True, maxConcurrentStreams = Just 64, initialWindowSize = 262144, maxFrameSize = 16384, maxHeaderListSize = Nothing}}
defaultClientConfig :: ClientConfig
defaultClientConfig :: ClientConfig
defaultClientConfig =
    ClientConfig
        { scheme :: Scheme
scheme = Scheme
"http"
        , authority :: Scheme
authority = Scheme
"localhost"
        , cacheLimit :: StreamId
cacheLimit = StreamId
64
        , connectionWindowSize :: StreamId
connectionWindowSize = StreamId
defaultMaxData
        , settings :: Settings
settings = Settings
defaultSettings
        }

-- | Running HTTP/2 client.
run :: ClientConfig -> Config -> Client a -> IO a
run :: forall a. ClientConfig -> Config -> Client a -> IO a
run cconf :: ClientConfig
cconf@ClientConfig{StreamId
Scheme
Settings
settings :: Settings
connectionWindowSize :: StreamId
cacheLimit :: StreamId
authority :: Scheme
scheme :: Scheme
settings :: ClientConfig -> Settings
connectionWindowSize :: ClientConfig -> StreamId
cacheLimit :: ClientConfig -> StreamId
authority :: ClientConfig -> Scheme
scheme :: ClientConfig -> Scheme
..} Config
conf Client a
client = do
    (Context
ctx, Manager
mgr) <- ClientConfig -> Config -> IO (Context, Manager)
setup ClientConfig
cconf Config
conf
    forall a. Config -> Context -> Manager -> IO a -> IO a
runH2 Config
conf Context
ctx Manager
mgr forall a b. (a -> b) -> a -> b
$ Context -> Manager -> IO a
runClient Context
ctx Manager
mgr
  where
    serverMaxStreams :: Context -> IO StreamId
serverMaxStreams Context
ctx = do
        Maybe StreamId
mx <- Settings -> Maybe StreamId
maxConcurrentStreams forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef (Context -> IORef Settings
peerSettings Context
ctx)
        case Maybe StreamId
mx of
            Maybe StreamId
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Bounded a => a
maxBound
            Just StreamId
x -> forall (m :: * -> *) a. Monad m => a -> m a
return StreamId
x
    possibleClientStream :: Context -> IO StreamId
possibleClientStream Context
ctx = do
        StreamId
x <- Context -> IO StreamId
serverMaxStreams Context
ctx
        StreamId
n <- OddStreamTable -> StreamId
oddConc forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadIO m => TVar a -> m a
readTVarIO (Context -> TVar OddStreamTable
oddStreamTable Context
ctx)
        forall (m :: * -> *) a. Monad m => a -> m a
return (StreamId
x forall a. Num a => a -> a -> a
- StreamId
n)
    aux :: Context -> Aux
aux Context
ctx =
        Aux
            { auxPossibleClientStreams :: IO StreamId
auxPossibleClientStreams = Context -> IO StreamId
possibleClientStream Context
ctx
            }
    clientCore :: Context -> Manager -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx Manager
mgr Request
req Response -> IO b
processResponse = do
        Stream
strm <- Context -> Manager -> Scheme -> Scheme -> Request -> IO Stream
sendRequest Context
ctx Manager
mgr Scheme
scheme Scheme
authority Request
req
        Response
rsp <- Stream -> IO Response
getResponse Stream
strm
        Response -> IO b
processResponse Response
rsp
    runClient :: Context -> Manager -> IO a
runClient Context
ctx Manager
mgr = do
        a
x <- Client a
client (forall {b}.
Context -> Manager -> Request -> (Response -> IO b) -> IO b
clientCore Context
ctx Manager
mgr) forall a b. (a -> b) -> a -> b
$ Context -> Aux
aux Context
ctx
        Manager -> IO ()
waitCounter0 Manager
mgr
        let frame :: Scheme
frame = StreamId -> ErrorCode -> Scheme -> Scheme
goawayFrame StreamId
0 ErrorCode
NoError Scheme
"graceful closing"
        MVar ()
mvar <- forall (m :: * -> *) a. MonadIO m => a -> m (MVar a)
newMVar ()
        TQueue Control -> Control -> IO ()
enqueueControl (Context -> TQueue Control
controlQ Context
ctx) forall a b. (a -> b) -> a -> b
$ Scheme -> MVar () -> Control
CGoaway Scheme
frame MVar ()
mvar
        forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar MVar ()
mvar
        forall (m :: * -> *) a. Monad m => a -> m a
return a
x

-- | Launching a receiver and a sender.
runIO :: ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO :: forall a. ClientConfig -> Config -> (ClientIO -> IO (IO a)) -> IO a
runIO cconf :: ClientConfig
cconf@ClientConfig{StreamId
Scheme
Settings
settings :: Settings
connectionWindowSize :: StreamId
cacheLimit :: StreamId
authority :: Scheme
scheme :: Scheme
settings :: ClientConfig -> Settings
connectionWindowSize :: ClientConfig -> StreamId
cacheLimit :: ClientConfig -> StreamId
authority :: ClientConfig -> Scheme
scheme :: ClientConfig -> Scheme
..} conf :: Config
conf@Config{StreamId
Buffer
Manager
SockAddr
StreamId -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confPeerSockAddr :: Config -> SockAddr
confMySockAddr :: Config -> SockAddr
confTimeoutManager :: Config -> Manager
confPositionReadMaker :: Config -> PositionReadMaker
confReadN :: Config -> StreamId -> IO Scheme
confSendAll :: Config -> Scheme -> IO ()
confBufferSize :: Config -> StreamId
confWriteBuffer :: Config -> Buffer
confPeerSockAddr :: SockAddr
confMySockAddr :: SockAddr
confTimeoutManager :: Manager
confPositionReadMaker :: PositionReadMaker
confReadN :: StreamId -> IO Scheme
confSendAll :: Scheme -> IO ()
confBufferSize :: StreamId
confWriteBuffer :: Buffer
..} ClientIO -> IO (IO a)
action = do
    (ctx :: Context
ctx@Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
outputQStreamID :: Context -> TVar StreamId
outputQ :: Context -> TQueue (Output Stream)
outputBufferLimit :: Context -> IORef StreamId
peerStreamId :: Context -> IORef StreamId
myStreamId :: Context -> TVar StreamId
continued :: Context -> IORef (Maybe StreamId)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
rstRate :: Rate
emptyFrameRate :: Rate
settingsRate :: Rate
pingRate :: Rate
rxFlow :: IORef RxFlow
txFlow :: TVar TxFlow
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQStreamID :: TVar StreamId
outputQ :: TQueue (Output Stream)
outputBufferLimit :: IORef StreamId
peerStreamId :: IORef StreamId
myStreamId :: TVar StreamId
continued :: IORef (Maybe StreamId)
evenStreamTable :: TVar EvenStreamTable
oddStreamTable :: TVar OddStreamTable
peerSettings :: IORef Settings
myFirstSettings :: IORef Bool
mySettings :: Settings
roleInfo :: RoleInfo
role :: Role
controlQ :: Context -> TQueue Control
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
..}, Manager
mgr) <- ClientConfig -> Config -> IO (Context, Manager)
setup ClientConfig
cconf Config
conf
    let putB :: Scheme -> IO ()
putB Scheme
bs = TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ forall a b. (a -> b) -> a -> b
$ Maybe SettingsList -> [Scheme] -> Control
CFrames forall a. Maybe a
Nothing [Scheme
bs]
        putR :: Request -> IO (StreamId, Stream)
putR Request
req = do
            Stream
strm <- Context -> Manager -> Scheme -> Scheme -> Request -> IO Stream
sendRequest Context
ctx Manager
mgr Scheme
scheme Scheme
authority Request
req
            forall (m :: * -> *) a. Monad m => a -> m a
return (Stream -> StreamId
streamNumber Stream
strm, Stream
strm)
        get :: Stream -> IO Response
get = Stream -> IO Response
getResponse
        create :: IO (StreamId, Stream)
create = Context -> IO (StreamId, Stream)
openOddStreamWait Context
ctx
    IO a
runClient <-
        ClientIO -> IO (IO a)
action forall a b. (a -> b) -> a -> b
$ SockAddr
-> SockAddr
-> (Request -> IO (StreamId, Stream))
-> (Stream -> IO Response)
-> (Scheme -> IO ())
-> IO (StreamId, Stream)
-> ClientIO
ClientIO SockAddr
confMySockAddr SockAddr
confPeerSockAddr Request -> IO (StreamId, Stream)
putR Stream -> IO Response
get Scheme -> IO ()
putB IO (StreamId, Stream)
create
    forall a. Config -> Context -> Manager -> IO a -> IO a
runH2 Config
conf Context
ctx Manager
mgr IO a
runClient

getResponse :: Stream -> IO Response
getResponse :: Stream -> IO Response
getResponse Stream
strm = do
    Either SomeException InpObj
mRsp <- forall (m :: * -> *) a. MonadIO m => MVar a -> m a
takeMVar forall a b. (a -> b) -> a -> b
$ Stream -> MVar (Either SomeException InpObj)
streamInput Stream
strm
    case Either SomeException InpObj
mRsp of
        Left SomeException
err -> forall e a. Exception e => e -> IO a
throwIO SomeException
err
        Right InpObj
rsp -> forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ InpObj -> Response
Response InpObj
rsp

setup :: ClientConfig -> Config -> IO (Context, Manager)
setup :: ClientConfig -> Config -> IO (Context, Manager)
setup ClientConfig{StreamId
Scheme
Settings
settings :: Settings
connectionWindowSize :: StreamId
cacheLimit :: StreamId
authority :: Scheme
scheme :: Scheme
settings :: ClientConfig -> Settings
connectionWindowSize :: ClientConfig -> StreamId
cacheLimit :: ClientConfig -> StreamId
authority :: ClientConfig -> Scheme
scheme :: ClientConfig -> Scheme
..} conf :: Config
conf@Config{StreamId
Buffer
Manager
SockAddr
StreamId -> IO Scheme
PositionReadMaker
Scheme -> IO ()
confPeerSockAddr :: SockAddr
confMySockAddr :: SockAddr
confTimeoutManager :: Manager
confPositionReadMaker :: PositionReadMaker
confReadN :: StreamId -> IO Scheme
confSendAll :: Scheme -> IO ()
confBufferSize :: StreamId
confWriteBuffer :: Buffer
confPeerSockAddr :: Config -> SockAddr
confMySockAddr :: Config -> SockAddr
confTimeoutManager :: Config -> Manager
confPositionReadMaker :: Config -> PositionReadMaker
confReadN :: Config -> StreamId -> IO Scheme
confSendAll :: Config -> Scheme -> IO ()
confBufferSize :: Config -> StreamId
confWriteBuffer :: Config -> Buffer
..} = do
    let clientInfo :: RoleInfo
clientInfo = Scheme -> Scheme -> RoleInfo
newClientInfo Scheme
scheme Scheme
authority
    Context
ctx <-
        RoleInfo
-> Config -> StreamId -> StreamId -> Settings -> IO Context
newContext
            RoleInfo
clientInfo
            Config
conf
            StreamId
cacheLimit
            StreamId
connectionWindowSize
            Settings
settings
    Manager
mgr <- Manager -> IO Manager
start Manager
confTimeoutManager
    Context -> IO ()
exchangeSettings Context
ctx
    forall (m :: * -> *) a. Monad m => a -> m a
return (Context
ctx, Manager
mgr)

runH2 :: Config -> Context -> Manager -> IO a -> IO a
runH2 :: forall a. Config -> Context -> Manager -> IO a -> IO a
runH2 Config
conf Context
ctx Manager
mgr IO a
runClient =
    forall a b.
Manager -> IO a -> (Either SomeException a -> IO b) -> IO b
stopAfter Manager
mgr (forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race IO ()
runBackgroundThreads IO a
runClient) forall a b. (a -> b) -> a -> b
$ \Either SomeException (Either () a)
res -> do
        TVar OddStreamTable
-> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams (Context -> TVar OddStreamTable
oddStreamTable Context
ctx) (Context -> TVar EvenStreamTable
evenStreamTable Context
ctx) forall a b. (a -> b) -> a -> b
$
            forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> Maybe a
Just (forall a b. a -> b -> a
const forall a. Maybe a
Nothing) Either SomeException (Either () a)
res
        case Either SomeException (Either () a)
res of
            Left SomeException
err ->
                forall e a. Exception e => e -> IO a
throwIO SomeException
err
            Right (Left ()) ->
                forall a. HasCallStack => a
undefined -- never reach
            Right (Right a
x) ->
                forall (m :: * -> *) a. Monad m => a -> m a
return a
x
  where
    runReceiver :: IO ()
runReceiver = Context -> Config -> IO ()
frameReceiver Context
ctx Config
conf
    runSender :: IO ()
runSender = Context -> Config -> Manager -> IO ()
frameSender Context
ctx Config
conf Manager
mgr
    runBackgroundThreads :: IO ()
runBackgroundThreads = forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
concurrently_ IO ()
runReceiver IO ()
runSender

sendRequest
    :: Context
    -> Manager
    -> Scheme
    -> Authority
    -> Request
    -> IO Stream
sendRequest :: Context -> Manager -> Scheme -> Scheme -> Request -> IO Stream
sendRequest ctx :: Context
ctx@Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
rstRate :: Rate
emptyFrameRate :: Rate
settingsRate :: Rate
pingRate :: Rate
rxFlow :: IORef RxFlow
txFlow :: TVar TxFlow
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQStreamID :: TVar StreamId
outputQ :: TQueue (Output Stream)
outputBufferLimit :: IORef StreamId
peerStreamId :: IORef StreamId
myStreamId :: TVar StreamId
continued :: IORef (Maybe StreamId)
evenStreamTable :: TVar EvenStreamTable
oddStreamTable :: TVar OddStreamTable
peerSettings :: IORef Settings
myFirstSettings :: IORef Bool
mySettings :: Settings
roleInfo :: RoleInfo
role :: Role
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
outputQStreamID :: Context -> TVar StreamId
outputQ :: Context -> TQueue (Output Stream)
outputBufferLimit :: Context -> IORef StreamId
peerStreamId :: Context -> IORef StreamId
myStreamId :: Context -> TVar StreamId
continued :: Context -> IORef (Maybe StreamId)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
controlQ :: Context -> TQueue Control
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
..} Manager
mgr Scheme
scheme Scheme
auth (Request OutObj
req) = do
    -- Checking push promises
    let hdr0 :: [Header]
hdr0 = OutObj -> [Header]
outObjHeaders OutObj
req
        method :: Scheme
method = forall a. a -> Maybe a -> a
fromMaybe (forall a. HasCallStack => String -> a
error String
"sendRequest:method") forall a b. (a -> b) -> a -> b
$ forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":method" [Header]
hdr0
        path :: Scheme
path = forall a. a -> Maybe a -> a
fromMaybe (forall a. HasCallStack => String -> a
error String
"sendRequest:path") forall a b. (a -> b) -> a -> b
$ forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup HeaderName
":path" [Header]
hdr0
    Maybe Stream
mstrm0 <- TVar EvenStreamTable -> Scheme -> Scheme -> IO (Maybe Stream)
lookupEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
    case Maybe Stream
mstrm0 of
        Just Stream
strm0 -> do
            TVar EvenStreamTable -> Scheme -> Scheme -> IO ()
deleteEvenCache TVar EvenStreamTable
evenStreamTable Scheme
method Scheme
path
            forall (m :: * -> *) a. Monad m => a -> m a
return Stream
strm0
        Maybe Stream
Nothing -> do
            -- Arch/Sender is originally implemented for servers where
            -- the ordering of responses can be out-of-order.
            -- But for clients, the ordering must be maintained.
            -- To implement this, 'outputQStreamID' is used.
            -- Also, for 'OutBodyStreaming', TBQ must not be empty
            -- when its 'Output' is enqueued into 'outputQ'.
            -- Otherwise, it would be re-enqueue because of empty
            -- resulting in out-of-order.
            -- To implement this, 'tbqNonEmpty' is used.
            let hdr1 :: [Header]
hdr1
                    | Scheme
scheme forall a. Eq a => a -> a -> Bool
/= Scheme
"" = (HeaderName
":scheme", Scheme
scheme) forall a. a -> [a] -> [a]
: [Header]
hdr0
                    | Bool
otherwise = [Header]
hdr0
                hdr2 :: [Header]
hdr2
                    | Scheme
auth forall a. Eq a => a -> a -> Bool
/= Scheme
"" = (HeaderName
":authority", Scheme
auth) forall a. a -> [a] -> [a]
: [Header]
hdr1
                    | Bool
otherwise = [Header]
hdr1
                req' :: OutObj
req' = OutObj
req{outObjHeaders :: [Header]
outObjHeaders = [Header]
hdr2}
            -- FLOW CONTROL: SETTINGS_MAX_CONCURRENT_STREAMS: send: respecting peer's limit
            (StreamId
sid, Stream
newstrm) <- Context -> IO (StreamId, Stream)
openOddStreamWait Context
ctx
            case OutObj -> OutBody
outObjBody OutObj
req of
                OutBodyStreaming (Builder -> IO ()) -> IO () -> IO ()
strmbdy ->
                    Context
-> Manager
-> OutObj
-> StreamId
-> Stream
-> ((forall x. IO x -> IO x)
    -> (Builder -> IO ()) -> IO () -> IO ())
-> IO ()
sendStreaming Context
ctx Manager
mgr OutObj
req' StreamId
sid Stream
newstrm forall a b. (a -> b) -> a -> b
$ \forall x. IO x -> IO x
unmask Builder -> IO ()
push IO ()
flush ->
                        forall x. IO x -> IO x
unmask forall a b. (a -> b) -> a -> b
$ (Builder -> IO ()) -> IO () -> IO ()
strmbdy Builder -> IO ()
push IO ()
flush
                OutBodyStreamingUnmask (forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ()
strmbdy ->
                    Context
-> Manager
-> OutObj
-> StreamId
-> Stream
-> ((forall x. IO x -> IO x)
    -> (Builder -> IO ()) -> IO () -> IO ())
-> IO ()
sendStreaming Context
ctx Manager
mgr OutObj
req' StreamId
sid Stream
newstrm (forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ()
strmbdy
                OutBody
_ -> forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
                    StreamId
sidOK <- forall a. TVar a -> STM a
readTVar TVar StreamId
outputQStreamID
                    Bool -> STM ()
check (StreamId
sidOK forall a. Eq a => a -> a -> Bool
== StreamId
sid)
                    forall a. TVar a -> a -> STM ()
writeTVar TVar StreamId
outputQStreamID (StreamId
sid forall a. Num a => a -> a -> a
+ StreamId
2)
                    forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Output Stream)
outputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output Stream
newstrm OutObj
req' OutputType
OObj forall a. Maybe a
Nothing (forall (m :: * -> *) a. Monad m => a -> m a
return ())
            forall (m :: * -> *) a. Monad m => a -> m a
return Stream
newstrm

sendStreaming
    :: Context
    -> Manager
    -> OutObj
    -> StreamId
    -> Stream
    -> ((forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ())
    -> IO ()
sendStreaming :: Context
-> Manager
-> OutObj
-> StreamId
-> Stream
-> ((forall x. IO x -> IO x)
    -> (Builder -> IO ()) -> IO () -> IO ())
-> IO ()
sendStreaming Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
rstRate :: Rate
emptyFrameRate :: Rate
settingsRate :: Rate
pingRate :: Rate
rxFlow :: IORef RxFlow
txFlow :: TVar TxFlow
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQStreamID :: TVar StreamId
outputQ :: TQueue (Output Stream)
outputBufferLimit :: IORef StreamId
peerStreamId :: IORef StreamId
myStreamId :: TVar StreamId
continued :: IORef (Maybe StreamId)
evenStreamTable :: TVar EvenStreamTable
oddStreamTable :: TVar OddStreamTable
peerSettings :: IORef Settings
myFirstSettings :: IORef Bool
mySettings :: Settings
roleInfo :: RoleInfo
role :: Role
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
outputQStreamID :: Context -> TVar StreamId
outputQ :: Context -> TQueue (Output Stream)
outputBufferLimit :: Context -> IORef StreamId
peerStreamId :: Context -> IORef StreamId
myStreamId :: Context -> TVar StreamId
continued :: Context -> IORef (Maybe StreamId)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
controlQ :: Context -> TQueue Control
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
..} Manager
mgr OutObj
req StreamId
sid Stream
newstrm (forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ()
strmbdy = do
    TBQueue StreamingChunk
tbq <- forall (m :: * -> *) a. MonadIO m => Natural -> m (TBQueue a)
newTBQueueIO Natural
10 -- fixme: hard coding: 10
    TVar Bool
tbqNonEmpty <- forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Bool
False
    Manager -> ((forall x. IO x -> IO x) -> IO ()) -> IO ()
forkManagedUnmask Manager
mgr forall a b. (a -> b) -> a -> b
$ \forall x. IO x -> IO x
unmask -> do
        let push :: Builder -> m ()
push Builder
b = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
                forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Builder -> StreamingChunk
StreamingBuilder Builder
b)
                forall a. TVar a -> a -> STM ()
writeTVar TVar Bool
tbqNonEmpty Bool
True
            flush :: IO ()
flush = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
            finished :: IO ()
finished = forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq forall a b. (a -> b) -> a -> b
$ IO () -> StreamingChunk
StreamingFinished (Manager -> IO ()
decCounter Manager
mgr)
        Manager -> IO ()
incCounter Manager
mgr
        (forall x. IO x -> IO x) -> (Builder -> IO ()) -> IO () -> IO ()
strmbdy forall x. IO x -> IO x
unmask forall {m :: * -> *}. MonadIO m => Builder -> m ()
push IO ()
flush forall a b. IO a -> IO b -> IO a
`finally` IO ()
finished
    forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall a b. (a -> b) -> a -> b
$ do
        StreamId
sidOK <- forall a. TVar a -> STM a
readTVar TVar StreamId
outputQStreamID
        Bool
ready <- forall a. TVar a -> STM a
readTVar TVar Bool
tbqNonEmpty
        Bool -> STM ()
check (StreamId
sidOK forall a. Eq a => a -> a -> Bool
== StreamId
sid Bool -> Bool -> Bool
&& Bool
ready)
        forall a. TVar a -> a -> STM ()
writeTVar TVar StreamId
outputQStreamID (StreamId
sid forall a. Num a => a -> a -> a
+ StreamId
2)
        forall a. TQueue a -> a -> STM ()
writeTQueue TQueue (Output Stream)
outputQ forall a b. (a -> b) -> a -> b
$ forall a.
a
-> OutObj
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ()
-> Output a
Output Stream
newstrm OutObj
req OutputType
OObj (forall a. a -> Maybe a
Just TBQueue StreamingChunk
tbq) (forall (m :: * -> *) a. Monad m => a -> m a
return ())

exchangeSettings :: Context -> IO ()
exchangeSettings :: Context -> IO ()
exchangeSettings Context{TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue (Output Stream)
DynamicTable
Settings
RoleInfo
Role
peerSockAddr :: SockAddr
mySockAddr :: SockAddr
rstRate :: Rate
emptyFrameRate :: Rate
settingsRate :: Rate
pingRate :: Rate
rxFlow :: IORef RxFlow
txFlow :: TVar TxFlow
decodeDynamicTable :: DynamicTable
encodeDynamicTable :: DynamicTable
controlQ :: TQueue Control
outputQStreamID :: TVar StreamId
outputQ :: TQueue (Output Stream)
outputBufferLimit :: IORef StreamId
peerStreamId :: IORef StreamId
myStreamId :: TVar StreamId
continued :: IORef (Maybe StreamId)
evenStreamTable :: TVar EvenStreamTable
oddStreamTable :: TVar OddStreamTable
peerSettings :: IORef Settings
myFirstSettings :: IORef Bool
mySettings :: Settings
roleInfo :: RoleInfo
role :: Role
peerSockAddr :: Context -> SockAddr
mySockAddr :: Context -> SockAddr
rstRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
settingsRate :: Context -> Rate
pingRate :: Context -> Rate
rxFlow :: Context -> IORef RxFlow
txFlow :: Context -> TVar TxFlow
decodeDynamicTable :: Context -> DynamicTable
encodeDynamicTable :: Context -> DynamicTable
outputQStreamID :: Context -> TVar StreamId
outputQ :: Context -> TQueue (Output Stream)
outputBufferLimit :: Context -> IORef StreamId
peerStreamId :: Context -> IORef StreamId
myStreamId :: Context -> TVar StreamId
continued :: Context -> IORef (Maybe StreamId)
evenStreamTable :: Context -> TVar EvenStreamTable
myFirstSettings :: Context -> IORef Bool
mySettings :: Context -> Settings
roleInfo :: Context -> RoleInfo
role :: Context -> Role
controlQ :: Context -> TQueue Control
oddStreamTable :: Context -> TVar OddStreamTable
peerSettings :: Context -> IORef Settings
..} = do
    StreamId
connRxWS <- RxFlow -> StreamId
rxfWindow forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. IORef a -> IO a
readIORef IORef RxFlow
rxFlow
    let frames :: [Scheme]
frames = Settings -> StreamId -> [Scheme]
makeNegotiationFrames Settings
mySettings StreamId
connRxWS
        setframe :: Control
setframe = Maybe SettingsList -> [Scheme] -> Control
CFrames forall a. Maybe a
Nothing (Scheme
connectionPreface forall a. a -> [a] -> [a]
: [Scheme]
frames)
    forall a. IORef a -> a -> IO ()
writeIORef IORef Bool
myFirstSettings Bool
True
    TQueue Control -> Control -> IO ()
enqueueControl TQueue Control
controlQ Control
setframe

data ClientIO = ClientIO
    { ClientIO -> SockAddr
cioMySockAddr :: SockAddr
    , ClientIO -> SockAddr
cioPeerSockAddr :: SockAddr
    , ClientIO -> Request -> IO (StreamId, Stream)
cioWriteRequest :: Request -> IO (StreamId, Stream)
    , ClientIO -> Stream -> IO Response
cioReadResponse :: Stream -> IO Response
    , ClientIO -> Scheme -> IO ()
cioWriteBytes :: ByteString -> IO ()
    , ClientIO -> IO (StreamId, Stream)
cioCreateStream :: IO (StreamId, Stream)
    }