{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}

module Network.HTTP2.H2.Stream where

import Control.Monad
import Control.Monad.STM (throwSTM)
import Data.IORef
import Data.Maybe (fromMaybe)
import Network.Control
import Network.HTTP.Semantics
import Network.HTTP.Semantics.IO
import UnliftIO.Concurrent
import UnliftIO.Exception
import UnliftIO.STM

import Network.HTTP2.Frame
import Network.HTTP2.H2.StreamTable
import Network.HTTP2.H2.Types

----------------------------------------------------------------

isIdle :: StreamState -> Bool
isIdle :: StreamState -> Bool
isIdle StreamState
Idle = Bool
True
isIdle StreamState
_ = Bool
False

isOpen :: StreamState -> Bool
isOpen :: StreamState -> Bool
isOpen Open{} = Bool
True
isOpen StreamState
_ = Bool
False

isHalfClosedRemote :: StreamState -> Bool
isHalfClosedRemote :: StreamState -> Bool
isHalfClosedRemote StreamState
HalfClosedRemote = Bool
True
isHalfClosedRemote (Closed ClosedCode
_) = Bool
True
isHalfClosedRemote StreamState
_ = Bool
False

isHalfClosedLocal :: StreamState -> Bool
isHalfClosedLocal :: StreamState -> Bool
isHalfClosedLocal (Open (Just ClosedCode
_) OpenState
_) = Bool
True
isHalfClosedLocal (Closed ClosedCode
_) = Bool
True
isHalfClosedLocal StreamState
_ = Bool
False

isClosed :: StreamState -> Bool
isClosed :: StreamState -> Bool
isClosed Closed{} = Bool
True
isClosed StreamState
_ = Bool
False

isReserved :: StreamState -> Bool
isReserved :: StreamState -> Bool
isReserved StreamState
Reserved = Bool
True
isReserved StreamState
_ = Bool
False

----------------------------------------------------------------

newOddStream :: StreamId -> WindowSize -> WindowSize -> IO Stream
newOddStream :: StreamId -> StreamId -> StreamId -> IO Stream
newOddStream StreamId
sid StreamId
txwin StreamId
rxwin =
    StreamId
-> IORef StreamState
-> MVar (Either SomeException InpObj)
-> TVar TxFlow
-> IORef RxFlow
-> IORef (Maybe RxQ)
-> Stream
Stream StreamId
sid
        (IORef StreamState
 -> MVar (Either SomeException InpObj)
 -> TVar TxFlow
 -> IORef RxFlow
 -> IORef (Maybe RxQ)
 -> Stream)
-> IO (IORef StreamState)
-> IO
     (MVar (Either SomeException InpObj)
      -> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StreamState -> IO (IORef StreamState)
forall a. a -> IO (IORef a)
newIORef StreamState
Idle
        IO
  (MVar (Either SomeException InpObj)
   -> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (MVar (Either SomeException InpObj))
-> IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (MVar (Either SomeException InpObj))
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
        IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (TVar TxFlow)
-> IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TxFlow -> IO (TVar TxFlow)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO (StreamId -> TxFlow
newTxFlow StreamId
txwin)
        IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (IORef RxFlow) -> IO (IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> RxFlow -> IO (IORef RxFlow)
forall a. a -> IO (IORef a)
newIORef (StreamId -> RxFlow
newRxFlow StreamId
rxwin)
        IO (IORef (Maybe RxQ) -> Stream)
-> IO (IORef (Maybe RxQ)) -> IO Stream
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe RxQ -> IO (IORef (Maybe RxQ))
forall a. a -> IO (IORef a)
newIORef Maybe RxQ
forall a. Maybe a
Nothing

newEvenStream :: StreamId -> WindowSize -> WindowSize -> IO Stream
newEvenStream :: StreamId -> StreamId -> StreamId -> IO Stream
newEvenStream StreamId
sid StreamId
txwin StreamId
rxwin =
    StreamId
-> IORef StreamState
-> MVar (Either SomeException InpObj)
-> TVar TxFlow
-> IORef RxFlow
-> IORef (Maybe RxQ)
-> Stream
Stream StreamId
sid
        (IORef StreamState
 -> MVar (Either SomeException InpObj)
 -> TVar TxFlow
 -> IORef RxFlow
 -> IORef (Maybe RxQ)
 -> Stream)
-> IO (IORef StreamState)
-> IO
     (MVar (Either SomeException InpObj)
      -> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StreamState -> IO (IORef StreamState)
forall a. a -> IO (IORef a)
newIORef StreamState
Reserved
        IO
  (MVar (Either SomeException InpObj)
   -> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (MVar (Either SomeException InpObj))
-> IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (MVar (Either SomeException InpObj))
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
        IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (TVar TxFlow)
-> IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TxFlow -> IO (TVar TxFlow)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO (StreamId -> TxFlow
newTxFlow StreamId
txwin)
        IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (IORef RxFlow) -> IO (IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> RxFlow -> IO (IORef RxFlow)
forall a. a -> IO (IORef a)
newIORef (StreamId -> RxFlow
newRxFlow StreamId
rxwin)
        IO (IORef (Maybe RxQ) -> Stream)
-> IO (IORef (Maybe RxQ)) -> IO Stream
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe RxQ -> IO (IORef (Maybe RxQ))
forall a. a -> IO (IORef a)
newIORef Maybe RxQ
forall a. Maybe a
Nothing

----------------------------------------------------------------

{-# INLINE readStreamState #-}
readStreamState :: Stream -> IO StreamState
readStreamState :: Stream -> IO StreamState
readStreamState Stream{IORef StreamState
streamState :: IORef StreamState
streamState :: Stream -> IORef StreamState
streamState} = IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamState

----------------------------------------------------------------

closeAllStreams
    :: TVar OddStreamTable -> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams :: TVar OddStreamTable
-> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams TVar OddStreamTable
ovar TVar EvenStreamTable
evar Maybe SomeException
mErr' = do
    IntMap Stream
ostrms <- TVar OddStreamTable -> IO (IntMap Stream)
clearOddStreamTable TVar OddStreamTable
ovar
    (Stream -> IO ()) -> IntMap Stream -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Stream -> IO ()
finalize IntMap Stream
ostrms
    IntMap Stream
estrms <- TVar EvenStreamTable -> IO (IntMap Stream)
clearEvenStreamTable TVar EvenStreamTable
evar
    (Stream -> IO ()) -> IntMap Stream -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Stream -> IO ()
finalize IntMap Stream
estrms
  where
    finalize :: Stream -> IO ()
finalize Stream
strm = do
        StreamState
st <- Stream -> IO StreamState
readStreamState Stream
strm
        IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar (Either SomeException InpObj)
-> Either SomeException InpObj -> IO Bool
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m Bool
tryPutMVar (Stream -> MVar (Either SomeException InpObj)
streamInput Stream
strm) Either SomeException InpObj
forall a. Either SomeException a
err
        case StreamState
st of
            Open Maybe ClosedCode
_ (Body RxQ
q Maybe StreamId
_ IORef StreamId
_ IORef (Maybe TokenHeaderTable)
_) ->
                STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ RxQ -> Either SomeException (ByteString, Bool) -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue RxQ
q (Either SomeException (ByteString, Bool) -> STM ())
-> Either SomeException (ByteString, Bool) -> STM ()
forall a b. (a -> b) -> a -> b
$ Either SomeException (ByteString, Bool)
-> (SomeException -> Either SomeException (ByteString, Bool))
-> Maybe SomeException
-> Either SomeException (ByteString, Bool)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ((ByteString, Bool) -> Either SomeException (ByteString, Bool)
forall a b. b -> Either a b
Right (ByteString
forall a. Monoid a => a
mempty, Bool
True)) SomeException -> Either SomeException (ByteString, Bool)
forall a b. a -> Either a b
Left Maybe SomeException
mErr
            StreamState
_otherwise ->
                () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()

    mErr :: Maybe SomeException
    mErr :: Maybe SomeException
mErr = case Maybe SomeException
mErr' of
        Just SomeException
e
            | Just HTTP2Error
ConnectionIsClosed <- SomeException -> Maybe HTTP2Error
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e ->
                Maybe SomeException
forall a. Maybe a
Nothing
        Maybe SomeException
_otherwise ->
            Maybe SomeException
mErr'

    err :: Either SomeException a
    err :: forall a. Either SomeException a
err =
        SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> SomeException -> Either SomeException a
forall a b. (a -> b) -> a -> b
$
            SomeException -> Maybe SomeException -> SomeException
forall a. a -> Maybe a -> a
fromMaybe (HTTP2Error -> SomeException
forall e. Exception e => e -> SomeException
toException HTTP2Error
ConnectionIsClosed) (Maybe SomeException -> SomeException)
-> Maybe SomeException -> SomeException
forall a b. (a -> b) -> a -> b
$
                Maybe SomeException
mErr

----------------------------------------------------------------

data StreamTerminated
    = StreamPushedFinal
    | StreamCancelled
    | StreamOutOfScope
    deriving (StreamId -> StreamTerminated -> ShowS
[StreamTerminated] -> ShowS
StreamTerminated -> String
(StreamId -> StreamTerminated -> ShowS)
-> (StreamTerminated -> String)
-> ([StreamTerminated] -> ShowS)
-> Show StreamTerminated
forall a.
(StreamId -> a -> ShowS)
-> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: StreamId -> StreamTerminated -> ShowS
showsPrec :: StreamId -> StreamTerminated -> ShowS
$cshow :: StreamTerminated -> String
show :: StreamTerminated -> String
$cshowList :: [StreamTerminated] -> ShowS
showList :: [StreamTerminated] -> ShowS
Show)
    deriving anyclass (Show StreamTerminated
Typeable StreamTerminated
(Typeable StreamTerminated, Show StreamTerminated) =>
(StreamTerminated -> SomeException)
-> (SomeException -> Maybe StreamTerminated)
-> (StreamTerminated -> String)
-> Exception StreamTerminated
SomeException -> Maybe StreamTerminated
StreamTerminated -> String
StreamTerminated -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: StreamTerminated -> SomeException
toException :: StreamTerminated -> SomeException
$cfromException :: SomeException -> Maybe StreamTerminated
fromException :: SomeException -> Maybe StreamTerminated
$cdisplayException :: StreamTerminated -> String
displayException :: StreamTerminated -> String
Exception)

withOutBodyIface
    :: TBQueue StreamingChunk
    -> (forall a. IO a -> IO a)
    -> (OutBodyIface -> IO r)
    -> IO r
withOutBodyIface :: forall r.
TBQueue StreamingChunk
-> (forall a. IO a -> IO a) -> (OutBodyIface -> IO r) -> IO r
withOutBodyIface TBQueue StreamingChunk
tbq forall a. IO a -> IO a
unmask OutBodyIface -> IO r
k = do
    TVar (Maybe StreamTerminated)
terminated <- Maybe StreamTerminated -> IO (TVar (Maybe StreamTerminated))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Maybe StreamTerminated
forall a. Maybe a
Nothing
    let whenNotTerminated :: STM b -> STM b
whenNotTerminated STM b
act = do
            Maybe StreamTerminated
mTerminated <- TVar (Maybe StreamTerminated) -> STM (Maybe StreamTerminated)
forall a. TVar a -> STM a
readTVar TVar (Maybe StreamTerminated)
terminated
            case Maybe StreamTerminated
mTerminated of
                Just StreamTerminated
reason ->
                    StreamTerminated -> STM b
forall e a. Exception e => e -> STM a
throwSTM StreamTerminated
reason
                Maybe StreamTerminated
Nothing ->
                    STM b
act

        terminateWith :: StreamTerminated -> STM () -> STM ()
terminateWith StreamTerminated
reason STM ()
act = do
            Maybe StreamTerminated
mTerminated <- TVar (Maybe StreamTerminated) -> STM (Maybe StreamTerminated)
forall a. TVar a -> STM a
readTVar TVar (Maybe StreamTerminated)
terminated
            case Maybe StreamTerminated
mTerminated of
                Just StreamTerminated
_ ->
                    -- Already terminated
                    () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Maybe StreamTerminated
Nothing -> do
                    TVar (Maybe StreamTerminated) -> Maybe StreamTerminated -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe StreamTerminated)
terminated (StreamTerminated -> Maybe StreamTerminated
forall a. a -> Maybe a
Just StreamTerminated
reason)
                    STM ()
act

        iface :: OutBodyIface
iface =
            OutBodyIface
                { outBodyUnmask :: forall a. IO a -> IO a
outBodyUnmask = IO x -> IO x
forall a. IO a -> IO a
unmask
                , outBodyPush :: Builder -> IO ()
outBodyPush = \Builder
b ->
                    STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
                        STM () -> STM ()
forall {b}. STM b -> STM b
whenNotTerminated (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
                            TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$
                                Builder -> IsEndOfStream -> StreamingChunk
StreamingBuilder Builder
b IsEndOfStream
NotEndOfStream
                , outBodyPushFinal :: Builder -> IO ()
outBodyPushFinal = \Builder
b ->
                    STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> STM ()
forall {b}. STM b -> STM b
whenNotTerminated (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
                        TVar (Maybe StreamTerminated) -> Maybe StreamTerminated -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe StreamTerminated)
terminated (StreamTerminated -> Maybe StreamTerminated
forall a. a -> Maybe a
Just StreamTerminated
StreamPushedFinal)
                        TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$ Builder -> IsEndOfStream -> StreamingChunk
StreamingBuilder Builder
b (Maybe (IO ()) -> IsEndOfStream
EndOfStream Maybe (IO ())
forall a. Maybe a
Nothing)
                        TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe (IO ()) -> StreamingChunk
StreamingFinished Maybe (IO ())
forall a. Maybe a
Nothing
                , outBodyFlush :: IO ()
outBodyFlush =
                    STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
                        STM () -> STM ()
forall {b}. STM b -> STM b
whenNotTerminated (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
                            TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
                , outBodyCancel :: Maybe SomeException -> IO ()
outBodyCancel = \Maybe SomeException
mErr ->
                    STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
                        StreamTerminated -> STM () -> STM ()
terminateWith StreamTerminated
StreamCancelled (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
                            TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Maybe SomeException -> StreamingChunk
StreamingCancelled Maybe SomeException
mErr)
                }
        finished :: IO ()
finished = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            StreamTerminated -> STM () -> STM ()
terminateWith StreamTerminated
StreamOutOfScope (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
                TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$
                    Maybe (IO ()) -> StreamingChunk
StreamingFinished Maybe (IO ())
forall a. Maybe a
Nothing
    OutBodyIface -> IO r
k OutBodyIface
iface IO r -> IO () -> IO r
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` IO ()
finished

nextForStreaming
    :: TBQueue StreamingChunk
    -> DynaNext
nextForStreaming :: TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
tbq =
    let takeQ :: IO (Maybe StreamingChunk)
takeQ = STM (Maybe StreamingChunk) -> IO (Maybe StreamingChunk)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe StreamingChunk) -> IO (Maybe StreamingChunk))
-> STM (Maybe StreamingChunk) -> IO (Maybe StreamingChunk)
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> STM (Maybe StreamingChunk)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue TBQueue StreamingChunk
tbq
        next :: DynaNext
next = IO (Maybe StreamingChunk) -> DynaNext
fillStreamBodyGetNext IO (Maybe StreamingChunk)
takeQ
     in DynaNext
next