{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}

module Network.QUIC.Stream.Reass (
    takeRecvStreamQwithSize
  , putRxStreamData
  , tryReassemble
  ) where

import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import qualified Data.ByteString as BS

import Network.QUIC.Imports
-- import Network.QUIC.Logger
import Network.QUIC.Stream.Frag
import Network.QUIC.Stream.Misc
import Network.QUIC.Stream.Queue
import qualified Network.QUIC.Stream.Skew as Skew
import Network.QUIC.Stream.Types
import Network.QUIC.Types

----------------------------------------------------------------

getEndOfStream :: Stream -> IO Bool
getEndOfStream :: Stream -> IO Bool
getEndOfStream Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
..} = IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (IORef Bool -> IO Bool) -> IORef Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ

setEndOfStream :: Stream -> IO ()
setEndOfStream :: Stream -> IO ()
setEndOfStream Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
..} = IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ) Bool
True

readPendingData :: Stream -> IO (Maybe ByteString)
readPendingData :: Stream -> IO (Maybe ByteString)
readPendingData Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
..} = IORef (Maybe ByteString) -> IO (Maybe ByteString)
forall a. IORef a -> IO a
readIORef (IORef (Maybe ByteString) -> IO (Maybe ByteString))
-> IORef (Maybe ByteString) -> IO (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef (Maybe ByteString)
pendingData RecvStreamQ
streamRecvQ

writePendingData :: Stream -> ByteString -> IO ()
writePendingData :: Stream -> ByteString -> IO ()
writePendingData Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
..} ByteString
bs = IORef (Maybe ByteString) -> Maybe ByteString -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe ByteString)
pendingData RecvStreamQ
streamRecvQ) (Maybe ByteString -> IO ()) -> Maybe ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
bs

clearPendingData :: Stream -> IO ()
clearPendingData :: Stream -> IO ()
clearPendingData Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
..} = IORef (Maybe ByteString) -> Maybe ByteString -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe ByteString)
pendingData RecvStreamQ
streamRecvQ) Maybe ByteString
forall a. Maybe a
Nothing

----------------------------------------------------------------

takeRecvStreamQwithSize :: Stream -> Int -> IO ByteString
takeRecvStreamQwithSize :: Stream -> StreamId -> IO ByteString
takeRecvStreamQwithSize Stream
strm StreamId
siz0 = do
    Bool
eos <- Stream -> IO Bool
getEndOfStream Stream
strm
    if Bool
eos then
        ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
""
      else do
        Maybe ByteString
mb <- Stream -> IO (Maybe ByteString)
readPendingData Stream
strm
        case Maybe ByteString
mb of
          Maybe ByteString
Nothing -> do
              ByteString
b0 <- Stream -> IO ByteString
takeRecvStreamQ Stream
strm
              if ByteString
b0 ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
"" then do
                  Stream -> IO ()
setEndOfStream Stream
strm
                  ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
""
                else do
                  let len :: StreamId
len = ByteString -> StreamId
BS.length ByteString
b0
                  case StreamId
len StreamId -> StreamId -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` StreamId
siz0 of
                      Ordering
LT -> StreamId -> ([ByteString] -> [ByteString]) -> IO ByteString
tryRead (StreamId
siz0 StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
- StreamId
len) (ByteString
b0 ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
:)
                      Ordering
EQ -> ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
b0
                      Ordering
GT -> do
                          let (ByteString
b1,ByteString
b2) = StreamId -> ByteString -> (ByteString, ByteString)
BS.splitAt StreamId
siz0 ByteString
b0
                          Stream -> ByteString -> IO ()
writePendingData Stream
strm ByteString
b2
                          ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
b1
          Just ByteString
b0 -> do
              Stream -> IO ()
clearPendingData Stream
strm
              let len :: StreamId
len = ByteString -> StreamId
BS.length ByteString
b0
              StreamId -> ([ByteString] -> [ByteString]) -> IO ByteString
tryRead (StreamId
siz0 StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
- StreamId
len) (ByteString
b0 ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
:)
  where
    tryRead :: StreamId -> ([ByteString] -> [ByteString]) -> IO ByteString
tryRead StreamId
siz [ByteString] -> [ByteString]
build = do
        Maybe ByteString
mb <- Stream -> IO (Maybe ByteString)
tryTakeRecvStreamQ Stream
strm
        case Maybe ByteString
mb of
          Maybe ByteString
Nothing -> ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
BS.concat ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> [ByteString]
build []
          Just ByteString
b  -> do
              if ByteString
b ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
"" then do
                  Stream -> IO ()
setEndOfStream Stream
strm
                  ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
BS.concat ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> [ByteString]
build []
                else do
                  let len :: StreamId
len = ByteString -> StreamId
BS.length ByteString
b
                  case StreamId
len StreamId -> StreamId -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` StreamId
siz of
                    Ordering
LT -> StreamId -> ([ByteString] -> [ByteString]) -> IO ByteString
tryRead (StreamId
siz StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
- StreamId
len) ([ByteString] -> [ByteString]
build ([ByteString] -> [ByteString])
-> ([ByteString] -> [ByteString]) -> [ByteString] -> [ByteString]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString
b ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
:))
                    Ordering
EQ -> ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
BS.concat ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> [ByteString]
build [ByteString
b]
                    Ordering
GT -> do
                        let (ByteString
b1,ByteString
b2) = StreamId -> ByteString -> (ByteString, ByteString)
BS.splitAt StreamId
siz ByteString
b
                        Stream -> ByteString -> IO ()
writePendingData Stream
strm ByteString
b2
                        ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
BS.concat ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> [ByteString]
build [ByteString
b1]

----------------------------------------------------------------
----------------------------------------------------------------

putRxStreamData :: Stream -> RxStreamData -> IO Bool
putRxStreamData :: Stream -> RxStreamData -> IO Bool
putRxStreamData Stream
s rx :: RxStreamData
rx@(RxStreamData ByteString
dat StreamId
off StreamId
_ Bool
_) = do
    StreamId
lim <- Stream -> IO StreamId
getRxMaxStreamData Stream
s
    if ByteString -> StreamId
BS.length ByteString
dat StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ StreamId
off StreamId -> StreamId -> Bool
forall a. Ord a => a -> a -> Bool
> StreamId
lim then
        Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
      else do
        Bool
_ <- Stream -> RxStreamData -> (ByteString -> IO ()) -> IO () -> IO Bool
tryReassemble Stream
s RxStreamData
rx ByteString -> IO ()
put IO ()
putFin
        Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
  where
    put :: ByteString -> IO ()
put ByteString
"" = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    put ByteString
d  = Stream -> ByteString -> IO ()
putRecvStreamQ Stream
s ByteString
d
    putFin :: IO ()
putFin = Stream -> ByteString -> IO ()
putRecvStreamQ Stream
s ByteString
""

-- fin of StreamState off fin means see-fin-already.
-- return value indicates duplication
tryReassemble :: Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble :: Stream -> RxStreamData -> (ByteString -> IO ()) -> IO () -> IO Bool
tryReassemble Stream{}   (RxStreamData ByteString
"" StreamId
_  StreamId
_ Bool
False) ByteString -> IO ()
_ IO ()
_ = Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
tryReassemble Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
..} x :: RxStreamData
x@(RxStreamData ByteString
"" StreamId
off StreamId
_ Bool
True) ByteString -> IO ()
_ IO ()
putFin = do
    si0 :: StreamState
si0@(StreamState StreamId
off0 Bool
fin0) <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    let si1 :: StreamState
si1 = StreamState
si0 { streamFin :: Bool
streamFin = Bool
True }
    if Bool
fin0 then do
        -- stdoutLogger "Illegal Fin" -- fixme
        Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
      else case StreamId
off StreamId -> StreamId -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` StreamId
off0 of
        Ordering
LT -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Ordering
EQ -> do
            IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
            IO ()
putFin
            Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        Ordering
GT -> do
            IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
            IORef (Skew RxStreamData)
-> (Skew RxStreamData -> Skew RxStreamData) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (RxStreamData -> Skew RxStreamData -> Skew RxStreamData
forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
            Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
tryReassemble Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
..} x :: RxStreamData
x@(RxStreamData ByteString
dat StreamId
off StreamId
len Bool
False) ByteString -> IO ()
put IO ()
putFin = do
    si0 :: StreamState
si0@(StreamState StreamId
off0 Bool
_) <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    case StreamId
off StreamId -> StreamId -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` StreamId
off0 of
      Ordering
LT -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
      Ordering
EQ -> do
          ByteString -> IO ()
put ByteString
dat
          StreamState -> StreamId -> IO ()
loop StreamState
si0 (StreamId
off0 StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ StreamId
len)
          Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
      Ordering
GT -> do
          IORef (Skew RxStreamData)
-> (Skew RxStreamData -> Skew RxStreamData) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (RxStreamData -> Skew RxStreamData -> Skew RxStreamData
forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
          Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
  where
    loop :: StreamState -> StreamId -> IO ()
loop StreamState
si0 StreamId
xff = do
        Maybe (Seq RxStreamData)
mrxs <- IORef (Skew RxStreamData)
-> (Skew RxStreamData
    -> (Skew RxStreamData, Maybe (Seq RxStreamData)))
-> IO (Maybe (Seq RxStreamData))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Skew RxStreamData)
streamReass (StreamId
-> Skew RxStreamData
-> (Skew RxStreamData, Maybe (Seq RxStreamData))
forall a. Frag a => StreamId -> Skew a -> (Skew a, Maybe (Seq a))
Skew.deleteMinIf StreamId
xff)
        case Maybe (Seq RxStreamData)
mrxs of
           Maybe (Seq RxStreamData)
Nothing   -> IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si0 { streamOffset :: StreamId
streamOffset = StreamId
xff }
           Just Seq RxStreamData
rxs -> do
               (RxStreamData -> IO ()) -> Seq RxStreamData -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ByteString -> IO ()
put (ByteString -> IO ())
-> (RxStreamData -> ByteString) -> RxStreamData -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RxStreamData -> ByteString
rxstrmData) Seq RxStreamData
rxs
               let xff1 :: StreamId
xff1 = Seq RxStreamData -> StreamId
forall a. Frag a => a -> StreamId
nextOff Seq RxStreamData
rxs
               if Seq RxStreamData -> Bool
hasFin Seq RxStreamData
rxs then do
                   IO ()
putFin
                 else do
                   StreamState -> StreamId -> IO ()
loop StreamState
si0 StreamId
xff1

tryReassemble Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
..} x :: RxStreamData
x@(RxStreamData ByteString
dat StreamId
off StreamId
len Bool
True) ByteString -> IO ()
put IO ()
putFin = do
    si0 :: StreamState
si0@(StreamState StreamId
off0 Bool
fin0) <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
    let si1 :: StreamState
si1 = StreamState
si0 { streamFin :: Bool
streamFin = Bool
True }
    if Bool
fin0 then do
        -- stdoutLogger "Illegal Fin" -- fixme
        Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
      else case StreamId
off StreamId -> StreamId -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` StreamId
off0 of
        Ordering
LT -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        Ordering
EQ -> do
            let off1 :: StreamId
off1 = StreamId
off0 StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ StreamId
len
            IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1 { streamOffset :: StreamId
streamOffset = StreamId
off1 }
            ByteString -> IO ()
put ByteString
dat
            IO ()
putFin
            Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
        Ordering
GT -> do
            IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
            IORef (Skew RxStreamData)
-> (Skew RxStreamData -> Skew RxStreamData) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (RxStreamData -> Skew RxStreamData -> Skew RxStreamData
forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
            Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False

hasFin :: Seq RxStreamData -> Bool
hasFin :: Seq RxStreamData -> Bool
hasFin Seq RxStreamData
s = case Seq RxStreamData -> ViewR RxStreamData
forall a. Seq a -> ViewR a
Seq.viewr Seq RxStreamData
s of
  ViewR RxStreamData
Seq.EmptyR -> Bool
False
  Seq RxStreamData
_ Seq.:> RxStreamData
x -> RxStreamData -> Bool
rxstrmFin RxStreamData
x