{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.H2.Sync (prepareSync, syncWithSender) where

import Control.Concurrent
import Network.HTTP.Semantics.IO
import UnliftIO.STM

import Network.HTTP2.H2.Context
import Network.HTTP2.H2.Queue
import Network.HTTP2.H2.Types
import Network.HTTP2.H2.Window

prepareSync
    :: Stream
    -> OutputType
    -> Maybe (TBQueue StreamingChunk)
    -> IO ((MVar Sync, Maybe OutputType -> IO Bool), Output)
prepareSync :: Stream
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ((MVar Sync, Maybe OutputType -> IO Bool), Output)
prepareSync Stream
strm OutputType
otyp Maybe (TBQueue StreamingChunk)
mtbq = do
    MVar Sync
var <- IO (MVar Sync)
forall a. IO (MVar a)
newEmptyMVar
    let sync :: Maybe OutputType -> IO Bool
sync = Stream
-> Maybe (TBQueue StreamingChunk)
-> (Sync -> IO ())
-> Maybe OutputType
-> IO Bool
makeSync Stream
strm Maybe (TBQueue StreamingChunk)
mtbq (MVar Sync -> Sync -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Sync
var)
        out :: Output
out = Stream -> OutputType -> (Maybe OutputType -> IO Bool) -> Output
Output Stream
strm OutputType
otyp Maybe OutputType -> IO Bool
sync
    ((MVar Sync, Maybe OutputType -> IO Bool), Output)
-> IO ((MVar Sync, Maybe OutputType -> IO Bool), Output)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ((MVar Sync
var, Maybe OutputType -> IO Bool
sync), Output
out)

syncWithSender
    :: Context
    -> Stream
    -> (MVar Sync, Maybe OutputType -> IO Bool)
    -> IO ()
syncWithSender :: Context
-> Stream -> (MVar Sync, Maybe OutputType -> IO Bool) -> IO ()
syncWithSender Context{TVar Bool
TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
DynamicTable
Manager
Settings
RoleInfo
Role
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue Output
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: Manager
senderDone :: TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQ :: Context -> TQueue Output
outputQStreamID :: Context -> TVar StreamId
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
threadManager :: Context -> Manager
senderDone :: Context -> TVar Bool
..} Stream
strm (MVar Sync
var, Maybe OutputType -> IO Bool
sync) = IO ()
loop
  where
    loop :: IO ()
loop = do
        Sync
s <- MVar Sync -> IO Sync
forall a. MVar a -> IO a
takeMVar MVar Sync
var
        case Sync
s of
            Sync
Done -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Cont IO ()
wait OutputType
newotyp -> do
                IO ()
wait
                TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ (Output -> IO ()) -> Output -> IO ()
forall a b. (a -> b) -> a -> b
$ Stream -> OutputType -> (Maybe OutputType -> IO Bool) -> Output
Output Stream
strm OutputType
newotyp Maybe OutputType -> IO Bool
sync
                IO ()
loop

makeSync
    :: Stream
    -> Maybe (TBQueue StreamingChunk)
    -> (Sync -> IO ())
    -> Maybe OutputType
    -> IO Bool
makeSync :: Stream
-> Maybe (TBQueue StreamingChunk)
-> (Sync -> IO ())
-> Maybe OutputType
-> IO Bool
makeSync Stream
_ Maybe (TBQueue StreamingChunk)
_ Sync -> IO ()
sync Maybe OutputType
Nothing = Sync -> IO ()
sync Sync
Done IO () -> IO Bool -> IO Bool
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
makeSync Stream
strm Maybe (TBQueue StreamingChunk)
mtbq Sync -> IO ()
sync (Just OutputType
otyp) = do
    Maybe (IO ())
mwait <- Stream -> Maybe (TBQueue StreamingChunk) -> IO (Maybe (IO ()))
checkOpen Stream
strm Maybe (TBQueue StreamingChunk)
mtbq
    case Maybe (IO ())
mwait of
        Maybe (IO ())
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Just IO ()
wait -> do
            Sync -> IO ()
sync (Sync -> IO ()) -> Sync -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> OutputType -> Sync
Cont IO ()
wait OutputType
otyp
            Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

checkOpen :: Stream -> Maybe (TBQueue StreamingChunk) -> IO (Maybe (IO ()))
checkOpen :: Stream -> Maybe (TBQueue StreamingChunk) -> IO (Maybe (IO ()))
checkOpen Stream
strm Maybe (TBQueue StreamingChunk)
mtbq = case Maybe (TBQueue StreamingChunk)
mtbq of
    Maybe (TBQueue StreamingChunk)
Nothing -> IO (Maybe (IO ()))
checkStreamWindowSize
    Just TBQueue StreamingChunk
tbq -> TBQueue StreamingChunk -> IO (Maybe (IO ()))
forall {a}. TBQueue a -> IO (Maybe (IO ()))
checkStreaming TBQueue StreamingChunk
tbq
  where
    checkStreaming :: TBQueue a -> IO (Maybe (IO ()))
checkStreaming TBQueue a
tbq = do
        Bool
isEmpty <- STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue a
tbq
        if Bool
isEmpty
            then do
                Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (IO ()) -> IO (Maybe (IO ())))
-> Maybe (IO ()) -> IO (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$ IO () -> Maybe (IO ())
forall a. a -> Maybe a
Just (TBQueue a -> IO ()
forall a. TBQueue a -> IO ()
waitStreaming TBQueue a
tbq)
            else IO (Maybe (IO ()))
checkStreamWindowSize
    -- FLOW CONTROL: WINDOW_UPDATE: send: respecting peer's limit
    checkStreamWindowSize :: IO (Maybe (IO ()))
checkStreamWindowSize = do
        StreamId
sws <- Stream -> IO StreamId
getStreamWindowSize Stream
strm
        if StreamId
sws StreamId -> StreamId -> Bool
forall a. Ord a => a -> a -> Bool
<= StreamId
0
            then Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (IO ()) -> IO (Maybe (IO ())))
-> Maybe (IO ()) -> IO (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$ IO () -> Maybe (IO ())
forall a. a -> Maybe a
Just (Stream -> IO ()
waitStreamWindowSize Stream
strm)
            else Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IO ())
forall a. Maybe a
Nothing

{-# INLINE waitStreaming #-}
waitStreaming :: TBQueue a -> IO ()
waitStreaming :: forall a. TBQueue a -> IO ()
waitStreaming TBQueue a
tbq = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Bool
isEmpty <- TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue a
tbq
    Bool -> STM ()
checkSTM (Bool -> Bool
not Bool
isEmpty)