{-# LANGUAGE RecordWildCards #-}

module Network.Control.Flow (
    -- * Flow control

    -- | This is based on the total approach of QUIC rather than
    --   the difference approach of HTTP\/2 because QUIC'one is
    --   considered safer. Please refer to [Using HTTP\/3 Stream Limits in HTTP\/2](https://datatracker.ietf.org/doc/draft-thomson-httpbis-h2-stream-limits/) to understand that QUIC's approaches are better though its topic is about stream concurrency.

    -- ** Constants for flow control.
    defaultMaxStreams,
    defaultMaxStreamData,
    defaultMaxData,

    -- ** Flow control for sending
    TxFlow (..),
    newTxFlow,
    txWindowSize,
    WindowSize,

    -- ** Flow control for receiving
    RxFlow (..),
    newRxFlow,
    rxWindowSize,
    FlowControlType (..),
    maybeOpenRxWindow,
    checkRxLimit,
) where

import Data.Bits

-- | Default max streams. (64)
defaultMaxStreams :: Int
defaultMaxStreams :: Int
defaultMaxStreams = Int
64

-- | Default max data of a stream. (256K bytes)
defaultMaxStreamData :: Int
defaultMaxStreamData :: Int
defaultMaxStreamData = Int
262144

-- | Default max data of a connection.
--
-- By default, this is set to @defaultMaxStreams * defaultMaxStreamData@. This
-- ensures that streams that are not currently handled cannot exhaust the
-- connection window.
--
-- If you use a smaller connection window size, you __must__ ensure that if you
-- are handling fewer concurrent streams than allowed by 'defaultMaxStreams',
-- that the unhandled streams cannot exhaust the connection window, or risk the
-- entire system deadlocking.
defaultMaxData :: Int
defaultMaxData :: Int
defaultMaxData = Int
defaultMaxStreamData Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
defaultMaxStreams

-- | Window size.
type WindowSize = Int

-- | Flow for sending
--
-- @
-- -------------------------------------->
--        ^           ^
--     txfSent    txfLimit
--
--        |-----------| The size which this node can send
--        txWindowSize
-- @
data TxFlow = TxFlow
    { TxFlow -> Int
txfSent :: Int
    -- ^ The total size of sent data.
    , TxFlow -> Int
txfLimit :: Int
    -- ^ The total size of data which can be sent.
    }
    deriving (TxFlow -> TxFlow -> Bool
(TxFlow -> TxFlow -> Bool)
-> (TxFlow -> TxFlow -> Bool) -> Eq TxFlow
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: TxFlow -> TxFlow -> Bool
== :: TxFlow -> TxFlow -> Bool
$c/= :: TxFlow -> TxFlow -> Bool
/= :: TxFlow -> TxFlow -> Bool
Eq, Int -> TxFlow -> ShowS
[TxFlow] -> ShowS
TxFlow -> String
(Int -> TxFlow -> ShowS)
-> (TxFlow -> String) -> ([TxFlow] -> ShowS) -> Show TxFlow
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> TxFlow -> ShowS
showsPrec :: Int -> TxFlow -> ShowS
$cshow :: TxFlow -> String
show :: TxFlow -> String
$cshowList :: [TxFlow] -> ShowS
showList :: [TxFlow] -> ShowS
Show)

-- | Creating TX flow with a receive buffer size.
newTxFlow :: WindowSize -> TxFlow
newTxFlow :: Int -> TxFlow
newTxFlow Int
win = Int -> Int -> TxFlow
TxFlow Int
0 Int
win

-- | 'txfLimit' - 'txfSent'.
txWindowSize :: TxFlow -> WindowSize
txWindowSize :: TxFlow -> Int
txWindowSize TxFlow{Int
txfSent :: TxFlow -> Int
txfLimit :: TxFlow -> Int
txfSent :: Int
txfLimit :: Int
..} = Int
txfLimit Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
txfSent

-- | Flow for receiving.
--
-- The goal of 'RxFlow' is to ensure that our network peer does not send us data
-- faster than we can consume it. We therefore impose a maximum number of
-- unconsumed bytes that we are willing to receive from the peer, which we refer
-- to as the buffer size:
--
-- @
--                    rxfBufSize
--           |---------------------------|
-- -------------------------------------------->
--           ^              ^
--      rxfConsumed    rxvReceived
-- @
--
-- The peer does not know of course how many bytes we have consumed of the data
-- that they sent us, so they keep track of their own limit of how much data
-- they are allowed to send. We keep track of this limit also:
--
-- @
--                    rxfBufSize
--           |---------------------------|
-- -------------------------------------------->
--           ^              ^       ^
--      rxfConsumed    rxvReceived  |
--                               rxfLimit
-- @
--
-- Each time we receive data from the peer, we check that they do not exceed the
-- limit ('checkRxLimit'). When we consume data, we periodically send the peer
-- an update (known as a _window update_) of what their new limit is
-- ('maybeOpenRxWindow'). To decrease overhead, we only this if the window
-- update is at least half the window size.
data RxFlow = RxFlow
    { RxFlow -> Int
rxfBufSize :: Int
    -- ^ Maxinum number of unconsumed bytes the peer can send us
    --
    -- See discussion above for details.
    , RxFlow -> Int
rxfConsumed :: Int
    -- ^ How much of the data that the peer has sent us have we consumed?
    --
    -- This is an absolute number: the total about of bytes consumed over the
    -- lifetime of the connection or stream (i.e., not relative to the window).
    , RxFlow -> Int
rxfReceived :: Int
    -- ^ How much data have we received from the peer?
    --
    -- Like 'rxfConsumed', this is an absolute number.
    , RxFlow -> Int
rxfLimit :: Int
    -- ^ Current limit on how many bytes the peer is allowed to send us.
    --
    -- Like 'rxfConsumed, this is an absolute number.
    }
    deriving (RxFlow -> RxFlow -> Bool
(RxFlow -> RxFlow -> Bool)
-> (RxFlow -> RxFlow -> Bool) -> Eq RxFlow
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
$c== :: RxFlow -> RxFlow -> Bool
== :: RxFlow -> RxFlow -> Bool
$c/= :: RxFlow -> RxFlow -> Bool
/= :: RxFlow -> RxFlow -> Bool
Eq, Int -> RxFlow -> ShowS
[RxFlow] -> ShowS
RxFlow -> String
(Int -> RxFlow -> ShowS)
-> (RxFlow -> String) -> ([RxFlow] -> ShowS) -> Show RxFlow
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RxFlow -> ShowS
showsPrec :: Int -> RxFlow -> ShowS
$cshow :: RxFlow -> String
show :: RxFlow -> String
$cshowList :: [RxFlow] -> ShowS
showList :: [RxFlow] -> ShowS
Show)

-- | Creating RX flow with an initial window size.
newRxFlow :: WindowSize -> RxFlow
newRxFlow :: Int -> RxFlow
newRxFlow Int
win = Int -> Int -> Int -> Int -> RxFlow
RxFlow Int
win Int
0 Int
0 Int
win

-- | 'rxfLimit' - 'rxfReceived'.
--
-- This is the number of bytes the peer is still allowed to send before they
-- must wait for a window update; see 'RxFlow' for details.
rxWindowSize :: RxFlow -> WindowSize
rxWindowSize :: RxFlow -> Int
rxWindowSize RxFlow{Int
rxfBufSize :: RxFlow -> Int
rxfConsumed :: RxFlow -> Int
rxfReceived :: RxFlow -> Int
rxfLimit :: RxFlow -> Int
rxfBufSize :: Int
rxfConsumed :: Int
rxfReceived :: Int
rxfLimit :: Int
..} = Int
rxfLimit Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
rxfReceived

-- | The representation of window size update.
data FlowControlType
    = -- | HTTP\/2 style
      FCTWindowUpdate
    | -- | QUIC style
      FCTMaxData

-- | Record that we have consumed some received data
--
-- May return a window update; see 'RxFlow' for details.
maybeOpenRxWindow
    :: Int
    -- ^ The consumed size.
    -> FlowControlType
    -> RxFlow
    -> (RxFlow, Maybe Int)
    -- ^ 'Just' if the size should be informed to the peer.
maybeOpenRxWindow :: Int -> FlowControlType -> RxFlow -> (RxFlow, Maybe Int)
maybeOpenRxWindow Int
consumed FlowControlType
fct flow :: RxFlow
flow@RxFlow{Int
rxfBufSize :: RxFlow -> Int
rxfConsumed :: RxFlow -> Int
rxfReceived :: RxFlow -> Int
rxfLimit :: RxFlow -> Int
rxfBufSize :: Int
rxfConsumed :: Int
rxfReceived :: Int
rxfLimit :: Int
..}
    | Int
winUpdate Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
threshold =
        let flow' :: RxFlow
flow' =
                RxFlow
flow
                    { rxfConsumed = rxfConsumed'
                    , rxfLimit = rxfLimit'
                    }
            update :: Int
update = case FlowControlType
fct of
                FlowControlType
FCTWindowUpdate -> Int
winUpdate
                FlowControlType
FCTMaxData -> Int
rxfLimit'
         in (RxFlow
flow', Int -> Maybe Int
forall a. a -> Maybe a
Just Int
update)
    | Bool
otherwise =
        let flow' :: RxFlow
flow' = RxFlow
flow{rxfConsumed = rxfConsumed'}
         in (RxFlow
flow', Maybe Int
forall a. Maybe a
Nothing)
  where
    rxfConsumed' :: Int
rxfConsumed' = Int
rxfConsumed Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
consumed

    -- Minimum window update size
    threshold :: Int
threshold = Int
rxfBufSize Int -> Int -> Int
forall a. Bits a => a -> Int -> a
`unsafeShiftR` Int
1

    -- The window update, /if/ we choose to send it
    rxfLimit' :: Int
rxfLimit' = Int
rxfConsumed' Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
rxfBufSize
    winUpdate :: Int
winUpdate = Int
rxfLimit' Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
rxfLimit

-- | Checking if received data is acceptable against the
--   current window.
checkRxLimit
    :: Int
    -- ^ The size of received data.
    -> RxFlow
    -> (RxFlow, Bool)
    -- ^ Acceptable if 'True'.
checkRxLimit :: Int -> RxFlow -> (RxFlow, Bool)
checkRxLimit Int
received flow :: RxFlow
flow@RxFlow{Int
rxfBufSize :: RxFlow -> Int
rxfConsumed :: RxFlow -> Int
rxfReceived :: RxFlow -> Int
rxfLimit :: RxFlow -> Int
rxfBufSize :: Int
rxfConsumed :: Int
rxfReceived :: Int
rxfLimit :: Int
..}
    | Int
received' Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
rxfLimit =
        let flow' :: RxFlow
flow' = RxFlow
flow{rxfReceived = received'}
         in (RxFlow
flow', Bool
True)
    | Bool
otherwise = (RxFlow
flow, Bool
False)
  where
    received' :: Int
received' = Int
rxfReceived Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
received