{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.H2.Sync (prepareSync, syncWithSender) where

import Control.Concurrent
import Network.HTTP.Semantics.IO
import UnliftIO.STM

import Network.HTTP2.H2.Context
import Network.HTTP2.H2.Queue
import Network.HTTP2.H2.Types
import Network.HTTP2.H2.Window

-- | Two assumptions about how this function is used:
--
-- 1. A separate thread will be running 'syncWithSender' using the @var@ and
--    @sync@ values constructed here.
-- 2. The 'Output' will be enqueued in the 'outputQ' of some 'Context'
--
-- The returned @sync@ function then has the following usage constraints:
--
-- 1. It may only be called with a 'Just' 'OutputType' if there is no 'Output'
--    already enqueued in the 'outputQ' for the given stream.
-- 2. If the function returns 'False', no other 'Output' may be enqueued for
--    this stream (until one has been dequeued).
prepareSync
    :: Stream
    -> OutputType
    -> Maybe (TBQueue StreamingChunk)
    -> IO ((MVar Sync, Maybe OutputType -> IO Bool), Output)
prepareSync :: Stream
-> OutputType
-> Maybe (TBQueue StreamingChunk)
-> IO ((MVar Sync, Maybe OutputType -> IO Bool), Output)
prepareSync Stream
strm OutputType
otyp Maybe (TBQueue StreamingChunk)
mtbq = do
    MVar Sync
var <- IO (MVar Sync)
forall a. IO (MVar a)
newEmptyMVar
    let sync :: Maybe OutputType -> IO Bool
sync = Stream
-> Maybe (TBQueue StreamingChunk)
-> MVar Sync
-> Maybe OutputType
-> IO Bool
makeSync Stream
strm Maybe (TBQueue StreamingChunk)
mtbq MVar Sync
var
        out :: Output
out = Stream -> OutputType -> (Maybe OutputType -> IO Bool) -> Output
Output Stream
strm OutputType
otyp Maybe OutputType -> IO Bool
sync
    ((MVar Sync, Maybe OutputType -> IO Bool), Output)
-> IO ((MVar Sync, Maybe OutputType -> IO Bool), Output)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ((MVar Sync
var, Maybe OutputType -> IO Bool
sync), Output
out)

syncWithSender
    :: Context
    -> Stream
    -> MVar Sync
    -- ^ Precondition: When this is filled with an 'Output' for a particular
    -- stream, the 'outputQ' in the 'Context' /must not/ already contain an
    -- 'Output' for that stream.
    -> (Maybe OutputType -> IO Bool)
    -> IO ()
syncWithSender :: Context
-> Stream -> MVar Sync -> (Maybe OutputType -> IO Bool) -> IO ()
syncWithSender Context{TVar Bool
TVar StreamId
TVar TxFlow
TVar EvenStreamTable
TVar OddStreamTable
IORef Bool
IORef StreamId
IORef (Maybe StreamId)
IORef RxFlow
IORef Settings
SockAddr
Rate
TQueue Control
TQueue Output
DynamicTable
Manager
Settings
RoleInfo
Role
outputQ :: Context -> TQueue Output
role :: Role
roleInfo :: RoleInfo
mySettings :: Settings
myFirstSettings :: IORef Bool
peerSettings :: IORef Settings
oddStreamTable :: TVar OddStreamTable
evenStreamTable :: TVar EvenStreamTable
continued :: IORef (Maybe StreamId)
myStreamId :: TVar StreamId
peerStreamId :: IORef StreamId
outputBufferLimit :: IORef StreamId
outputQ :: TQueue Output
outputQStreamID :: TVar StreamId
controlQ :: TQueue Control
encodeDynamicTable :: DynamicTable
decodeDynamicTable :: DynamicTable
txFlow :: TVar TxFlow
rxFlow :: IORef RxFlow
pingRate :: Rate
settingsRate :: Rate
emptyFrameRate :: Rate
rstRate :: Rate
mySockAddr :: SockAddr
peerSockAddr :: SockAddr
threadManager :: Manager
senderDone :: TVar Bool
role :: Context -> Role
roleInfo :: Context -> RoleInfo
mySettings :: Context -> Settings
myFirstSettings :: Context -> IORef Bool
peerSettings :: Context -> IORef Settings
oddStreamTable :: Context -> TVar OddStreamTable
evenStreamTable :: Context -> TVar EvenStreamTable
continued :: Context -> IORef (Maybe StreamId)
myStreamId :: Context -> TVar StreamId
peerStreamId :: Context -> IORef StreamId
outputBufferLimit :: Context -> IORef StreamId
outputQStreamID :: Context -> TVar StreamId
controlQ :: Context -> TQueue Control
encodeDynamicTable :: Context -> DynamicTable
decodeDynamicTable :: Context -> DynamicTable
txFlow :: Context -> TVar TxFlow
rxFlow :: Context -> IORef RxFlow
pingRate :: Context -> Rate
settingsRate :: Context -> Rate
emptyFrameRate :: Context -> Rate
rstRate :: Context -> Rate
mySockAddr :: Context -> SockAddr
peerSockAddr :: Context -> SockAddr
threadManager :: Context -> Manager
senderDone :: Context -> TVar Bool
..} Stream
strm MVar Sync
var Maybe OutputType -> IO Bool
sync = IO ()
loop
  where
    loop :: IO ()
loop = do
        Sync
s <- MVar Sync -> IO Sync
forall a. MVar a -> IO a
takeMVar MVar Sync
var
        case Sync
s of
            Sync
Done -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Cont IO ()
wait OutputType
newotyp -> do
                IO ()
wait
                -- This is justified by the precondition above
                TQueue Output -> Output -> IO ()
enqueueOutput TQueue Output
outputQ (Output -> IO ()) -> Output -> IO ()
forall a b. (a -> b) -> a -> b
$ Stream -> OutputType -> (Maybe OutputType -> IO Bool) -> Output
Output Stream
strm OutputType
newotyp Maybe OutputType -> IO Bool
sync
                IO ()
loop

-- | Postcondition: This will only write to the 'MVar' if:
--
-- 1. You pass 'Just' an 'OutputType'
-- 2. The return value is 'False'
makeSync
    :: Stream
    -> Maybe (TBQueue StreamingChunk)
    -> MVar Sync
    -> Maybe OutputType
    -> IO Bool
makeSync :: Stream
-> Maybe (TBQueue StreamingChunk)
-> MVar Sync
-> Maybe OutputType
-> IO Bool
makeSync Stream
_ Maybe (TBQueue StreamingChunk)
_ MVar Sync
var Maybe OutputType
Nothing = MVar Sync -> Sync -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Sync
var Sync
Done IO () -> IO Bool -> IO Bool
forall a b. IO a -> IO b -> IO b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
makeSync Stream
strm Maybe (TBQueue StreamingChunk)
mtbq MVar Sync
var (Just OutputType
otyp) = do
    Maybe (IO ())
mwait <- Stream -> Maybe (TBQueue StreamingChunk) -> IO (Maybe (IO ()))
checkOpen Stream
strm Maybe (TBQueue StreamingChunk)
mtbq
    case Maybe (IO ())
mwait of
        Maybe (IO ())
Nothing -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Just IO ()
wait -> do
            MVar Sync -> Sync -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Sync
var (Sync -> IO ()) -> Sync -> IO ()
forall a b. (a -> b) -> a -> b
$ IO () -> OutputType -> Sync
Cont IO ()
wait OutputType
otyp
            Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

checkOpen :: Stream -> Maybe (TBQueue StreamingChunk) -> IO (Maybe (IO ()))
checkOpen :: Stream -> Maybe (TBQueue StreamingChunk) -> IO (Maybe (IO ()))
checkOpen Stream
strm Maybe (TBQueue StreamingChunk)
mtbq = case Maybe (TBQueue StreamingChunk)
mtbq of
    Maybe (TBQueue StreamingChunk)
Nothing -> IO (Maybe (IO ()))
checkStreamWindowSize
    Just TBQueue StreamingChunk
tbq -> TBQueue StreamingChunk -> IO (Maybe (IO ()))
forall {a}. TBQueue a -> IO (Maybe (IO ()))
checkStreaming TBQueue StreamingChunk
tbq
  where
    checkStreaming :: TBQueue a -> IO (Maybe (IO ()))
checkStreaming TBQueue a
tbq = do
        Bool
isEmpty <- STM Bool -> IO Bool
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM Bool -> IO Bool) -> STM Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue a
tbq
        if Bool
isEmpty
            then do
                Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (IO ()) -> IO (Maybe (IO ())))
-> Maybe (IO ()) -> IO (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$ IO () -> Maybe (IO ())
forall a. a -> Maybe a
Just (TBQueue a -> IO ()
forall a. TBQueue a -> IO ()
waitStreaming TBQueue a
tbq)
            else IO (Maybe (IO ()))
checkStreamWindowSize
    -- FLOW CONTROL: WINDOW_UPDATE: send: respecting peer's limit
    checkStreamWindowSize :: IO (Maybe (IO ()))
checkStreamWindowSize = do
        StreamId
sws <- Stream -> IO StreamId
getStreamWindowSize Stream
strm
        if StreamId
sws StreamId -> StreamId -> Bool
forall a. Ord a => a -> a -> Bool
<= StreamId
0
            then Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe (IO ()) -> IO (Maybe (IO ())))
-> Maybe (IO ()) -> IO (Maybe (IO ()))
forall a b. (a -> b) -> a -> b
$ IO () -> Maybe (IO ())
forall a. a -> Maybe a
Just (Stream -> IO ()
waitStreamWindowSize Stream
strm)
            else Maybe (IO ()) -> IO (Maybe (IO ()))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (IO ())
forall a. Maybe a
Nothing

{-# INLINE waitStreaming #-}
waitStreaming :: TBQueue a -> IO ()
waitStreaming :: forall a. TBQueue a -> IO ()
waitStreaming TBQueue a
tbq = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    Bool
isEmpty <- TBQueue a -> STM Bool
forall a. TBQueue a -> STM Bool
isEmptyTBQueue TBQueue a
tbq
    Bool -> STM ()
checkSTM (Bool -> Bool
not Bool
isEmpty)