{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
module Network.HTTP2.H2.Stream where
import Control.Monad
import Control.Monad.STM (throwSTM)
import Data.IORef
import Data.Maybe (fromMaybe)
import Network.Control
import Network.HTTP.Semantics
import Network.HTTP.Semantics.IO
import UnliftIO.Concurrent
import UnliftIO.Exception
import UnliftIO.STM
import Network.HTTP2.Frame
import Network.HTTP2.H2.StreamTable
import Network.HTTP2.H2.Types
isIdle :: StreamState -> Bool
isIdle :: StreamState -> Bool
isIdle StreamState
Idle = Bool
True
isIdle StreamState
_ = Bool
False
isOpen :: StreamState -> Bool
isOpen :: StreamState -> Bool
isOpen Open{} = Bool
True
isOpen StreamState
_ = Bool
False
isHalfClosedRemote :: StreamState -> Bool
isHalfClosedRemote :: StreamState -> Bool
isHalfClosedRemote StreamState
HalfClosedRemote = Bool
True
isHalfClosedRemote (Closed ClosedCode
_) = Bool
True
isHalfClosedRemote StreamState
_ = Bool
False
isHalfClosedLocal :: StreamState -> Bool
isHalfClosedLocal :: StreamState -> Bool
isHalfClosedLocal (Open (Just ClosedCode
_) OpenState
_) = Bool
True
isHalfClosedLocal (Closed ClosedCode
_) = Bool
True
isHalfClosedLocal StreamState
_ = Bool
False
isClosed :: StreamState -> Bool
isClosed :: StreamState -> Bool
isClosed Closed{} = Bool
True
isClosed StreamState
_ = Bool
False
isReserved :: StreamState -> Bool
isReserved :: StreamState -> Bool
isReserved StreamState
Reserved = Bool
True
isReserved StreamState
_ = Bool
False
newOddStream :: StreamId -> WindowSize -> WindowSize -> IO Stream
newOddStream :: StreamId -> StreamId -> StreamId -> IO Stream
newOddStream StreamId
sid StreamId
txwin StreamId
rxwin =
StreamId
-> IORef StreamState
-> MVar (Either SomeException InpObj)
-> TVar TxFlow
-> IORef RxFlow
-> IORef (Maybe RxQ)
-> Stream
Stream StreamId
sid
(IORef StreamState
-> MVar (Either SomeException InpObj)
-> TVar TxFlow
-> IORef RxFlow
-> IORef (Maybe RxQ)
-> Stream)
-> IO (IORef StreamState)
-> IO
(MVar (Either SomeException InpObj)
-> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StreamState -> IO (IORef StreamState)
forall a. a -> IO (IORef a)
newIORef StreamState
Idle
IO
(MVar (Either SomeException InpObj)
-> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (MVar (Either SomeException InpObj))
-> IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (MVar (Either SomeException InpObj))
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (TVar TxFlow)
-> IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TxFlow -> IO (TVar TxFlow)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO (StreamId -> TxFlow
newTxFlow StreamId
txwin)
IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (IORef RxFlow) -> IO (IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> RxFlow -> IO (IORef RxFlow)
forall a. a -> IO (IORef a)
newIORef (StreamId -> RxFlow
newRxFlow StreamId
rxwin)
IO (IORef (Maybe RxQ) -> Stream)
-> IO (IORef (Maybe RxQ)) -> IO Stream
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe RxQ -> IO (IORef (Maybe RxQ))
forall a. a -> IO (IORef a)
newIORef Maybe RxQ
forall a. Maybe a
Nothing
newEvenStream :: StreamId -> WindowSize -> WindowSize -> IO Stream
newEvenStream :: StreamId -> StreamId -> StreamId -> IO Stream
newEvenStream StreamId
sid StreamId
txwin StreamId
rxwin =
StreamId
-> IORef StreamState
-> MVar (Either SomeException InpObj)
-> TVar TxFlow
-> IORef RxFlow
-> IORef (Maybe RxQ)
-> Stream
Stream StreamId
sid
(IORef StreamState
-> MVar (Either SomeException InpObj)
-> TVar TxFlow
-> IORef RxFlow
-> IORef (Maybe RxQ)
-> Stream)
-> IO (IORef StreamState)
-> IO
(MVar (Either SomeException InpObj)
-> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> StreamState -> IO (IORef StreamState)
forall a. a -> IO (IORef a)
newIORef StreamState
Reserved
IO
(MVar (Either SomeException InpObj)
-> TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (MVar (Either SomeException InpObj))
-> IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO (MVar (Either SomeException InpObj))
forall (m :: * -> *) a. MonadIO m => m (MVar a)
newEmptyMVar
IO (TVar TxFlow -> IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (TVar TxFlow)
-> IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> TxFlow -> IO (TVar TxFlow)
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO (StreamId -> TxFlow
newTxFlow StreamId
txwin)
IO (IORef RxFlow -> IORef (Maybe RxQ) -> Stream)
-> IO (IORef RxFlow) -> IO (IORef (Maybe RxQ) -> Stream)
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> RxFlow -> IO (IORef RxFlow)
forall a. a -> IO (IORef a)
newIORef (StreamId -> RxFlow
newRxFlow StreamId
rxwin)
IO (IORef (Maybe RxQ) -> Stream)
-> IO (IORef (Maybe RxQ)) -> IO Stream
forall a b. IO (a -> b) -> IO a -> IO b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe RxQ -> IO (IORef (Maybe RxQ))
forall a. a -> IO (IORef a)
newIORef Maybe RxQ
forall a. Maybe a
Nothing
{-# INLINE readStreamState #-}
readStreamState :: Stream -> IO StreamState
readStreamState :: Stream -> IO StreamState
readStreamState Stream{IORef StreamState
streamState :: IORef StreamState
streamState :: Stream -> IORef StreamState
streamState} = IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamState
closeAllStreams
:: TVar OddStreamTable -> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams :: TVar OddStreamTable
-> TVar EvenStreamTable -> Maybe SomeException -> IO ()
closeAllStreams TVar OddStreamTable
ovar TVar EvenStreamTable
evar Maybe SomeException
mErr' = do
IntMap Stream
ostrms <- TVar OddStreamTable -> IO (IntMap Stream)
clearOddStreamTable TVar OddStreamTable
ovar
(Stream -> IO ()) -> IntMap Stream -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Stream -> IO ()
finalize IntMap Stream
ostrms
IntMap Stream
estrms <- TVar EvenStreamTable -> IO (IntMap Stream)
clearEvenStreamTable TVar EvenStreamTable
evar
(Stream -> IO ()) -> IntMap Stream -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ Stream -> IO ()
finalize IntMap Stream
estrms
where
finalize :: Stream -> IO ()
finalize Stream
strm = do
StreamState
st <- Stream -> IO StreamState
readStreamState Stream
strm
IO Bool -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Bool -> IO ()) -> IO Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ MVar (Either SomeException InpObj)
-> Either SomeException InpObj -> IO Bool
forall (m :: * -> *) a. MonadIO m => MVar a -> a -> m Bool
tryPutMVar (Stream -> MVar (Either SomeException InpObj)
streamInput Stream
strm) Either SomeException InpObj
forall a. Either SomeException a
err
case StreamState
st of
Open Maybe ClosedCode
_ (Body RxQ
q Maybe StreamId
_ IORef StreamId
_ IORef (Maybe TokenHeaderTable)
_) ->
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ RxQ -> Either SomeException (ByteString, Bool) -> STM ()
forall a. TQueue a -> a -> STM ()
writeTQueue RxQ
q (Either SomeException (ByteString, Bool) -> STM ())
-> Either SomeException (ByteString, Bool) -> STM ()
forall a b. (a -> b) -> a -> b
$ Either SomeException (ByteString, Bool)
-> (SomeException -> Either SomeException (ByteString, Bool))
-> Maybe SomeException
-> Either SomeException (ByteString, Bool)
forall b a. b -> (a -> b) -> Maybe a -> b
maybe ((ByteString, Bool) -> Either SomeException (ByteString, Bool)
forall a b. b -> Either a b
Right (ByteString
forall a. Monoid a => a
mempty, Bool
True)) SomeException -> Either SomeException (ByteString, Bool)
forall a b. a -> Either a b
Left Maybe SomeException
mErr
StreamState
_otherwise ->
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
mErr :: Maybe SomeException
mErr :: Maybe SomeException
mErr = case Maybe SomeException
mErr' of
Just SomeException
e
| Just HTTP2Error
ConnectionIsClosed <- SomeException -> Maybe HTTP2Error
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e ->
Maybe SomeException
forall a. Maybe a
Nothing
Maybe SomeException
_otherwise ->
Maybe SomeException
mErr'
err :: Either SomeException a
err :: forall a. Either SomeException a
err =
SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> SomeException -> Either SomeException a
forall a b. (a -> b) -> a -> b
$
SomeException -> Maybe SomeException -> SomeException
forall a. a -> Maybe a -> a
fromMaybe (HTTP2Error -> SomeException
forall e. Exception e => e -> SomeException
toException HTTP2Error
ConnectionIsClosed) (Maybe SomeException -> SomeException)
-> Maybe SomeException -> SomeException
forall a b. (a -> b) -> a -> b
$
Maybe SomeException
mErr
data StreamTerminated
= StreamPushedFinal
| StreamCancelled
| StreamOutOfScope
deriving (StreamId -> StreamTerminated -> ShowS
[StreamTerminated] -> ShowS
StreamTerminated -> String
(StreamId -> StreamTerminated -> ShowS)
-> (StreamTerminated -> String)
-> ([StreamTerminated] -> ShowS)
-> Show StreamTerminated
forall a.
(StreamId -> a -> ShowS)
-> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: StreamId -> StreamTerminated -> ShowS
showsPrec :: StreamId -> StreamTerminated -> ShowS
$cshow :: StreamTerminated -> String
show :: StreamTerminated -> String
$cshowList :: [StreamTerminated] -> ShowS
showList :: [StreamTerminated] -> ShowS
Show)
deriving anyclass (Show StreamTerminated
Typeable StreamTerminated
(Typeable StreamTerminated, Show StreamTerminated) =>
(StreamTerminated -> SomeException)
-> (SomeException -> Maybe StreamTerminated)
-> (StreamTerminated -> String)
-> Exception StreamTerminated
SomeException -> Maybe StreamTerminated
StreamTerminated -> String
StreamTerminated -> SomeException
forall e.
(Typeable e, Show e) =>
(e -> SomeException)
-> (SomeException -> Maybe e) -> (e -> String) -> Exception e
$ctoException :: StreamTerminated -> SomeException
toException :: StreamTerminated -> SomeException
$cfromException :: SomeException -> Maybe StreamTerminated
fromException :: SomeException -> Maybe StreamTerminated
$cdisplayException :: StreamTerminated -> String
displayException :: StreamTerminated -> String
Exception)
withOutBodyIface
:: TBQueue StreamingChunk
-> (forall a. IO a -> IO a)
-> (OutBodyIface -> IO r)
-> IO r
withOutBodyIface :: forall r.
TBQueue StreamingChunk
-> (forall a. IO a -> IO a) -> (OutBodyIface -> IO r) -> IO r
withOutBodyIface TBQueue StreamingChunk
tbq forall a. IO a -> IO a
unmask OutBodyIface -> IO r
k = do
TVar (Maybe StreamTerminated)
terminated <- Maybe StreamTerminated -> IO (TVar (Maybe StreamTerminated))
forall (m :: * -> *) a. MonadIO m => a -> m (TVar a)
newTVarIO Maybe StreamTerminated
forall a. Maybe a
Nothing
let whenNotTerminated :: STM b -> STM b
whenNotTerminated STM b
act = do
Maybe StreamTerminated
mTerminated <- TVar (Maybe StreamTerminated) -> STM (Maybe StreamTerminated)
forall a. TVar a -> STM a
readTVar TVar (Maybe StreamTerminated)
terminated
case Maybe StreamTerminated
mTerminated of
Just StreamTerminated
reason ->
StreamTerminated -> STM b
forall e a. Exception e => e -> STM a
throwSTM StreamTerminated
reason
Maybe StreamTerminated
Nothing ->
STM b
act
terminateWith :: StreamTerminated -> STM () -> STM ()
terminateWith StreamTerminated
reason STM ()
act = do
Maybe StreamTerminated
mTerminated <- TVar (Maybe StreamTerminated) -> STM (Maybe StreamTerminated)
forall a. TVar a -> STM a
readTVar TVar (Maybe StreamTerminated)
terminated
case Maybe StreamTerminated
mTerminated of
Just StreamTerminated
_ ->
() -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Maybe StreamTerminated
Nothing -> do
TVar (Maybe StreamTerminated) -> Maybe StreamTerminated -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe StreamTerminated)
terminated (StreamTerminated -> Maybe StreamTerminated
forall a. a -> Maybe a
Just StreamTerminated
reason)
STM ()
act
iface :: OutBodyIface
iface =
OutBodyIface
{ outBodyUnmask :: forall a. IO a -> IO a
outBodyUnmask = IO x -> IO x
forall a. IO a -> IO a
unmask
, outBodyPush :: Builder -> IO ()
outBodyPush = \Builder
b ->
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
STM () -> STM ()
forall {b}. STM b -> STM b
whenNotTerminated (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$
Builder -> IsEndOfStream -> StreamingChunk
StreamingBuilder Builder
b IsEndOfStream
NotEndOfStream
, outBodyPushFinal :: Builder -> IO ()
outBodyPushFinal = \Builder
b ->
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> STM ()
forall {b}. STM b -> STM b
whenNotTerminated (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ do
TVar (Maybe StreamTerminated) -> Maybe StreamTerminated -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar (Maybe StreamTerminated)
terminated (StreamTerminated -> Maybe StreamTerminated
forall a. a -> Maybe a
Just StreamTerminated
StreamPushedFinal)
TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$ Builder -> IsEndOfStream -> StreamingChunk
StreamingBuilder Builder
b (Maybe (IO ()) -> IsEndOfStream
EndOfStream Maybe (IO ())
forall a. Maybe a
Nothing)
TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$ Maybe (IO ()) -> StreamingChunk
StreamingFinished Maybe (IO ())
forall a. Maybe a
Nothing
, outBodyFlush :: IO ()
outBodyFlush =
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
STM () -> STM ()
forall {b}. STM b -> STM b
whenNotTerminated (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq StreamingChunk
StreamingFlush
, outBodyCancel :: Maybe SomeException -> IO ()
outBodyCancel = \Maybe SomeException
mErr ->
STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$
StreamTerminated -> STM () -> STM ()
terminateWith StreamTerminated
StreamCancelled (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (Maybe SomeException -> StreamingChunk
StreamingCancelled Maybe SomeException
mErr)
}
finished :: IO ()
finished = STM () -> IO ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
StreamTerminated -> STM () -> STM ()
terminateWith StreamTerminated
StreamOutOfScope (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
TBQueue StreamingChunk -> StreamingChunk -> STM ()
forall a. TBQueue a -> a -> STM ()
writeTBQueue TBQueue StreamingChunk
tbq (StreamingChunk -> STM ()) -> StreamingChunk -> STM ()
forall a b. (a -> b) -> a -> b
$
Maybe (IO ()) -> StreamingChunk
StreamingFinished Maybe (IO ())
forall a. Maybe a
Nothing
OutBodyIface -> IO r
k OutBodyIface
iface IO r -> IO () -> IO r
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m a
`finally` IO ()
finished
nextForStreaming
:: TBQueue StreamingChunk
-> DynaNext
nextForStreaming :: TBQueue StreamingChunk -> DynaNext
nextForStreaming TBQueue StreamingChunk
tbq =
let takeQ :: IO (Maybe StreamingChunk)
takeQ = STM (Maybe StreamingChunk) -> IO (Maybe StreamingChunk)
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM (Maybe StreamingChunk) -> IO (Maybe StreamingChunk))
-> STM (Maybe StreamingChunk) -> IO (Maybe StreamingChunk)
forall a b. (a -> b) -> a -> b
$ TBQueue StreamingChunk -> STM (Maybe StreamingChunk)
forall a. TBQueue a -> STM (Maybe a)
tryReadTBQueue TBQueue StreamingChunk
tbq
next :: DynaNext
next = IO (Maybe StreamingChunk) -> DynaNext
fillStreamBodyGetNext IO (Maybe StreamingChunk)
takeQ
in DynaNext
next