{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.QUIC.Stream.Reass (
takeRecvStreamQwithSize,
putRxStreamData,
FlowCntl (..),
tryReassemble,
) where
import qualified Data.ByteString as BS
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import Network.QUIC.Imports
import Network.QUIC.Stream.Frag
import Network.QUIC.Stream.Misc
import Network.QUIC.Stream.Queue
import qualified Network.QUIC.Stream.Skew as Skew
import Network.QUIC.Stream.Types
import Network.QUIC.Types
getEndOfStream :: Stream -> IO Bool
getEndOfStream :: Stream -> IO Bool
getEndOfStream Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
..} = IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (IORef Bool -> IO Bool) -> IORef Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ
setEndOfStream :: Stream -> IO ()
setEndOfStream :: Stream -> IO ()
setEndOfStream Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} = IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ) Bool
True
readPendingData :: Stream -> IO (Maybe ByteString)
readPendingData :: Stream -> IO (Maybe StreamData)
readPendingData Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} = IORef (Maybe StreamData) -> IO (Maybe StreamData)
forall a. IORef a -> IO a
readIORef (IORef (Maybe StreamData) -> IO (Maybe StreamData))
-> IORef (Maybe StreamData) -> IO (Maybe StreamData)
forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ
writePendingData :: Stream -> ByteString -> IO ()
writePendingData :: Stream -> StreamData -> IO ()
writePendingData Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} StreamData
bs = IORef (Maybe StreamData) -> Maybe StreamData -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ) (Maybe StreamData -> IO ()) -> Maybe StreamData -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamData -> Maybe StreamData
forall a. a -> Maybe a
Just StreamData
bs
clearPendingData :: Stream -> IO ()
clearPendingData :: Stream -> IO ()
clearPendingData Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} = IORef (Maybe StreamData) -> Maybe StreamData -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe StreamData)
pendingData RecvStreamQ
streamRecvQ) Maybe StreamData
forall a. Maybe a
Nothing
takeRecvStreamQwithSize :: Stream -> Int -> IO ByteString
takeRecvStreamQwithSize :: Stream -> Offset -> IO StreamData
takeRecvStreamQwithSize Stream
strm Offset
siz0 = do
Bool
eos <- Stream -> IO Bool
getEndOfStream Stream
strm
if Bool
eos
then StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
""
else do
Maybe StreamData
mb <- Stream -> IO (Maybe StreamData)
readPendingData Stream
strm
case Maybe StreamData
mb of
Maybe StreamData
Nothing -> do
StreamData
b0 <- Stream -> IO StreamData
takeRecvStreamQ Stream
strm
if StreamData
b0 StreamData -> StreamData -> Bool
forall a. Eq a => a -> a -> Bool
== StreamData
""
then do
Stream -> IO ()
setEndOfStream Stream
strm
StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
""
else do
let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b0
case Offset
len Offset -> Offset -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Offset
siz0 of
Ordering
LT -> Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz0 Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
- Offset
len) (StreamData
b0 StreamData -> [StreamData] -> [StreamData]
forall a. a -> [a] -> [a]
:)
Ordering
EQ -> StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
b0
Ordering
GT -> do
let (StreamData
b1, StreamData
b2) = Offset -> StreamData -> (StreamData, StreamData)
BS.splitAt Offset
siz0 StreamData
b0
Stream -> StreamData -> IO ()
writePendingData Stream
strm StreamData
b2
StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return StreamData
b1
Just StreamData
b0 -> do
Stream -> IO ()
clearPendingData Stream
strm
let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b0
Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz0 Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
- Offset
len) (StreamData
b0 StreamData -> [StreamData] -> [StreamData]
forall a. a -> [a] -> [a]
:)
where
tryRead :: Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead Offset
siz [StreamData] -> [StreamData]
build = do
Maybe StreamData
mb <- Stream -> IO (Maybe StreamData)
tryTakeRecvStreamQ Stream
strm
case Maybe StreamData
mb of
Maybe StreamData
Nothing -> StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamData -> IO StreamData) -> StreamData -> IO StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat ([StreamData] -> StreamData) -> [StreamData] -> StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build []
Just StreamData
b -> do
if StreamData
b StreamData -> StreamData -> Bool
forall a. Eq a => a -> a -> Bool
== StreamData
""
then do
Stream -> IO ()
setEndOfStream Stream
strm
StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamData -> IO StreamData) -> StreamData -> IO StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat ([StreamData] -> StreamData) -> [StreamData] -> StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build []
else do
let len :: Offset
len = StreamData -> Offset
BS.length StreamData
b
case Offset
len Offset -> Offset -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Offset
siz of
Ordering
LT -> Offset -> ([StreamData] -> [StreamData]) -> IO StreamData
tryRead (Offset
siz Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
- Offset
len) ([StreamData] -> [StreamData]
build ([StreamData] -> [StreamData])
-> ([StreamData] -> [StreamData]) -> [StreamData] -> [StreamData]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (StreamData
b StreamData -> [StreamData] -> [StreamData]
forall a. a -> [a] -> [a]
:))
Ordering
EQ -> StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamData -> IO StreamData) -> StreamData -> IO StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat ([StreamData] -> StreamData) -> [StreamData] -> StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build [StreamData
b]
Ordering
GT -> do
let (StreamData
b1, StreamData
b2) = Offset -> StreamData -> (StreamData, StreamData)
BS.splitAt Offset
siz StreamData
b
Stream -> StreamData -> IO ()
writePendingData Stream
strm StreamData
b2
StreamData -> IO StreamData
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (StreamData -> IO StreamData) -> StreamData -> IO StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> StreamData
BS.concat ([StreamData] -> StreamData) -> [StreamData] -> StreamData
forall a b. (a -> b) -> a -> b
$ [StreamData] -> [StreamData]
build [StreamData
b1]
data FlowCntl = OverLimit | Duplicated | Reassembled
putRxStreamData :: Stream -> RxStreamData -> IO FlowCntl
putRxStreamData :: Stream -> RxStreamData -> IO FlowCntl
putRxStreamData Stream
s rx :: RxStreamData
rx@(RxStreamData StreamData
_ Offset
off Offset
len Bool
_) = do
Offset
lim <- Stream -> IO Offset
getRxMaxStreamData Stream
s
if Offset
len Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
+ Offset
off Offset -> Offset -> Bool
forall a. Ord a => a -> a -> Bool
> Offset
lim
then FlowCntl -> IO FlowCntl
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return FlowCntl
OverLimit
else do
Bool
dup <- Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble Stream
s RxStreamData
rx StreamData -> IO ()
put IO ()
putFin
if Bool
dup
then FlowCntl -> IO FlowCntl
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return FlowCntl
Duplicated
else FlowCntl -> IO FlowCntl
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return FlowCntl
Reassembled
where
put :: StreamData -> IO ()
put StreamData
"" = () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
put StreamData
d = do
Stream -> Offset -> IO ()
addRxStreamData Stream
s (Offset -> IO ()) -> Offset -> IO ()
forall a b. (a -> b) -> a -> b
$ StreamData -> Offset
BS.length StreamData
d
Stream -> StreamData -> IO ()
putRecvStreamQ Stream
s StreamData
d
putFin :: IO ()
putFin = Stream -> StreamData -> IO ()
putRecvStreamQ Stream
s StreamData
""
tryReassemble
:: Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble :: Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble Stream{} (RxStreamData StreamData
"" Offset
_ Offset
_ Bool
False) StreamData -> IO ()
_ IO ()
_ = Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
tryReassemble Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} x :: RxStreamData
x@(RxStreamData StreamData
"" Offset
off Offset
_ Bool
True) StreamData -> IO ()
_ IO ()
putFin = do
si0 :: StreamState
si0@(StreamState Offset
off0 Bool
fin0) <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
let si1 :: StreamState
si1 = StreamState
si0{streamFin = True}
if Bool
fin0
then do
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else case Offset
off Offset -> Offset -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
Ordering
LT -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Ordering
EQ -> do
IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
IO ()
putFin
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Ordering
GT -> do
IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
IORef (Skew RxStreamData)
-> (Skew RxStreamData -> Skew RxStreamData) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (RxStreamData -> Skew RxStreamData -> Skew RxStreamData
forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
tryReassemble Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} x :: RxStreamData
x@(RxStreamData StreamData
dat Offset
off Offset
len Bool
False) StreamData -> IO ()
put IO ()
putFin = do
si0 :: StreamState
si0@(StreamState Offset
off0 Bool
_) <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
case Offset
off Offset -> Offset -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
Ordering
LT -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Ordering
EQ -> do
StreamData -> IO ()
put StreamData
dat
StreamState -> Offset -> IO ()
loop StreamState
si0 (Offset
off0 Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
+ Offset
len)
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Ordering
GT -> do
IORef (Skew RxStreamData)
-> (Skew RxStreamData -> Skew RxStreamData) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (RxStreamData -> Skew RxStreamData -> Skew RxStreamData
forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
where
loop :: StreamState -> Offset -> IO ()
loop StreamState
si0 Offset
xff = do
Maybe (Seq RxStreamData)
mrxs <- IORef (Skew RxStreamData)
-> (Skew RxStreamData
-> (Skew RxStreamData, Maybe (Seq RxStreamData)))
-> IO (Maybe (Seq RxStreamData))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Skew RxStreamData)
streamReass (Offset
-> Skew RxStreamData
-> (Skew RxStreamData, Maybe (Seq RxStreamData))
forall a. Frag a => Offset -> Skew a -> (Skew a, Maybe (Seq a))
Skew.deleteMinIf Offset
xff)
case Maybe (Seq RxStreamData)
mrxs of
Maybe (Seq RxStreamData)
Nothing -> IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si0{streamOffset = xff}
Just Seq RxStreamData
rxs -> do
(RxStreamData -> IO ()) -> Seq RxStreamData -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (StreamData -> IO ()
put (StreamData -> IO ())
-> (RxStreamData -> StreamData) -> RxStreamData -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RxStreamData -> StreamData
rxstrmData) Seq RxStreamData
rxs
let xff1 :: Offset
xff1 = Seq RxStreamData -> Offset
forall a. Frag a => a -> Offset
nextOff Seq RxStreamData
rxs
if Seq RxStreamData -> Bool
hasFin Seq RxStreamData
rxs
then do
IO ()
putFin
else do
StreamState -> Offset -> IO ()
loop StreamState
si0 Offset
xff1
tryReassemble Stream{Offset
MVar ()
TVar TxFlow
IORef RxFlow
IORef (Skew RxStreamData)
IORef StreamState
Connection
RecvStreamQ
streamId :: Stream -> Offset
streamConnection :: Stream -> Connection
streamFlowTx :: Stream -> TVar TxFlow
streamFlowRx :: Stream -> IORef RxFlow
streamStateTx :: Stream -> IORef StreamState
streamStateRx :: Stream -> IORef StreamState
streamRecvQ :: Stream -> RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamSyncFinTx :: Stream -> MVar ()
streamId :: Offset
streamConnection :: Connection
streamFlowTx :: TVar TxFlow
streamFlowRx :: IORef RxFlow
streamStateTx :: IORef StreamState
streamStateRx :: IORef StreamState
streamRecvQ :: RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamSyncFinTx :: MVar ()
..} x :: RxStreamData
x@(RxStreamData StreamData
dat Offset
off Offset
len Bool
True) StreamData -> IO ()
put IO ()
putFin = do
si0 :: StreamState
si0@(StreamState Offset
off0 Bool
fin0) <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
let si1 :: StreamState
si1 = StreamState
si0{streamFin = True}
if Bool
fin0
then do
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else case Offset
off Offset -> Offset -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` Offset
off0 of
Ordering
LT -> Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Ordering
EQ -> do
let off1 :: Offset
off1 = Offset
off0 Offset -> Offset -> Offset
forall a. Num a => a -> a -> a
+ Offset
len
IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1{streamOffset = off1}
StreamData -> IO ()
put StreamData
dat
IO ()
putFin
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Ordering
GT -> do
IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
IORef (Skew RxStreamData)
-> (Skew RxStreamData -> Skew RxStreamData) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (RxStreamData -> Skew RxStreamData -> Skew RxStreamData
forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
Bool -> IO Bool
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
hasFin :: Seq RxStreamData -> Bool
hasFin :: Seq RxStreamData -> Bool
hasFin Seq RxStreamData
s = case Seq RxStreamData -> ViewR RxStreamData
forall a. Seq a -> ViewR a
Seq.viewr Seq RxStreamData
s of
ViewR RxStreamData
Seq.EmptyR -> Bool
False
Seq RxStreamData
_ Seq.:> RxStreamData
x -> RxStreamData -> Bool
rxstrmFin RxStreamData
x