{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.H2.Sync (prepareSync, syncWithSender) where
import Control.Concurrent
import Control.Concurrent.STM
import Network.HTTP.Semantics.IO
import Network.HTTP2.H2.Context
import Network.HTTP2.H2.Queue
import Network.HTTP2.H2.Types
import Network.HTTP2.H2.Window
prepareSync
:: Stream
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ((MVar Sync, Maybe OutputType -> IO Bool), Output)
prepareSync :: Stream
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ((MVar Sync, Maybe OutputType -> IO Bool), Output)
prepareSync Stream
strm OutputType
otyp Maybe (TBQueue StreamingChunk)
mtbq = do
MVar Sync
var <- IO (MVar Sync)
forall a. IO (MVar a)
newEmptyMVar
let sync :: Maybe OutputType -> IO Bool
sync = Stream
-> Maybe (TBQueue StreamingChunk)
-> MVar Sync
-> Maybe OutputType
-> IO Bool
makeSync Stream
strm Maybe (TBQueue StreamingChunk)
mtbq MVar Sync
var
out :: Output
out = Stream -> OutputType -> (Maybe OutputType -> IO Bool) -> Output
Output Stream
strm OutputType
otyp Maybe OutputType -> IO Bool
sync
((MVar Sync, Maybe OutputType -> IO Bool), Output)
-> IO ((MVar Sync, Maybe OutputType -> IO Bool), Output)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ((MVar Sync
var, Maybe OutputType -> IO Bool
sync), Output
out)
syncWithSender
:: Context
-> Stream
-> MVar Sync
-> (Maybe OutputType -> IO Bool)
-> IO ()
syncWithSender :: Context
-> Stream -> MVar Sync -> (Maybe OutputType -> IO Bool) -> IO ()
syncWithSender Context{TVar Bool
TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
DynamicTable
Manager
Settings
RoleInfo
Role
outputQ :: Context -> TQueue Output
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue Output
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: Manager
senderDone :: TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQStreamID :: Context -> TVar StreamId
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
threadManager :: Context -> Manager
senderDone :: Context -> TVar Bool
..} Stream
strm MVar Sync
var Maybe OutputType -> IO Bool
sync = IO ()
loop
where
loop :: IO ()
loop = do
Sync
s <- MVar Sync -> IO Sync
forall a. MVar a -> IO a
takeMVar MVar Sync
var
case Sync
s of
Sync
Done -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Cont IO ()
wait OutputType
newotyp -> do
IO ()
wait
TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ (Output -> IO ()) -> Output -> IO ()
forall a b. (a -> b) -> a -> b
$ Stream -> OutputType -> (Maybe OutputType -> IO Bool) -> Output
Output Stream
strm OutputType
newotyp Maybe OutputType -> IO Bool
sync
IO ()
loop
makeSync
:: Stream
-> Maybe (TBQueue StreamingChunk)
-> MVar Sync
-> Maybe OutputType
-> IO Bool
makeSync :: Stream
-> Maybe (TBQueue StreamingChunk)
-> MVar Sync
-> Maybe OutputType
-> IO Bool
makeSync Stream
_ Maybe (TBQueue StreamingChunk)
_ MVar Sync
var Maybe OutputType
Nothing = MVar Sync -> Sync -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Sync
var Sync
Done IO () -> IO Bool -> IO Bool
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
makeSync Stream
strm Maybe (TBQueue StreamingChunk)
mtbq MVar Sync
var (Just OutputType
otyp) = do
Maybe (IO ())
mwait <- Stream -> Maybe (TBQueue StreamingChunk) -> IO (Maybe (IO ()))
checkOpen Stream
strm Maybe (TBQueue StreamingChunk)
mtbq
case Maybe (IO ())
mwait of
Maybe (IO ())
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Just IO ()
wait -> do
MVar Sync -> Sync -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Sync
var (Sync -> IO ()) -> Sync -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> OutputType -> Sync
Cont IO ()
wait OutputType
otyp
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
checkOpen :: Stream -> Maybe (TBQueue StreamingChunk) -> IO (Maybe (IO ()))
checkOpen :: Stream -> Maybe (TBQueue StreamingChunk) -> IO (Maybe (IO ()))
checkOpen Stream
strm Maybe (TBQueue StreamingChunk)
mtbq = case Maybe (TBQueue StreamingChunk)
mtbq of
Maybe (TBQueue StreamingChunk)
Nothing -> IO (Maybe (IO ()))
checkStreamWindowSize
Just TBQueue StreamingChunk
tbq -> TBQueue StreamingChunk -> IO (Maybe (IO ()))
forall {a}. TBQueue a -> IO (Maybe (IO ()))
checkStreaming TBQueue StreamingChunk
tbq
where
checkStreaming :: TBQueue a -> IO (Maybe (IO ()))
checkStreaming TBQueue a
tbq = do
Bool
isEmpty <- STM Bool -> IO Bool
forall a. STM a -> IO a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue a
tbq
if Bool
isEmpty
then do
Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (IO ()) -> IO (Maybe (IO ())))
-> Maybe (IO ()) -> IO (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$ IO () -> Maybe (IO ())
forall a. a -> Maybe a
Just (TBQueue a -> IO ()
forall a. TBQueue a -> IO ()
waitStreaming TBQueue a
tbq)
else IO (Maybe (IO ()))
checkStreamWindowSize
checkStreamWindowSize :: IO (Maybe (IO ()))
checkStreamWindowSize = do
StreamId
sws <- Stream -> IO StreamId
getStreamWindowSize Stream
strm
if StreamId
sws StreamId -> StreamId -> Bool
forall a. Ord a => a -> a -> Bool
<= StreamId
0
then Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (IO ()) -> IO (Maybe (IO ())))
-> Maybe (IO ()) -> IO (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$ IO () -> Maybe (IO ())
forall a. a -> Maybe a
Just (Stream -> IO ()
waitStreamWindowSize Stream
strm)
else Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IO ())
forall a. Maybe a
Nothing
{-# INLINE waitStreaming #-}
waitStreaming :: TBQueue a -> IO ()
waitStreaming :: forall a. TBQueue a -> IO ()
waitStreaming TBQueue a
tbq = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Bool
isEmpty <- TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue a
tbq
Bool -> STM ()
check (Bool -> Bool
not Bool
isEmpty)