{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module Network.QUIC.Stream.Reass (
takeRecvStreamQwithSize
, putRxStreamData
, tryReassemble
) where
import Data.Sequence (Seq)
import qualified Data.Sequence as Seq
import qualified Data.ByteString as BS
import Network.QUIC.Imports
import Network.QUIC.Stream.Frag
import Network.QUIC.Stream.Misc
import Network.QUIC.Stream.Queue
import qualified Network.QUIC.Stream.Skew as Skew
import Network.QUIC.Stream.Types
import Network.QUIC.Types
getEndOfStream :: Stream -> IO Bool
getEndOfStream :: Stream -> IO Bool
getEndOfStream Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
..} = IORef Bool -> IO Bool
forall a. IORef a -> IO a
readIORef (IORef Bool -> IO Bool) -> IORef Bool -> IO Bool
forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ
setEndOfStream :: Stream -> IO ()
setEndOfStream :: Stream -> IO ()
setEndOfStream Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
..} = IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef Bool
endOfStream RecvStreamQ
streamRecvQ) Bool
True
readPendingData :: Stream -> IO (Maybe ByteString)
readPendingData :: Stream -> IO (Maybe ByteString)
readPendingData Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
..} = IORef (Maybe ByteString) -> IO (Maybe ByteString)
forall a. IORef a -> IO a
readIORef (IORef (Maybe ByteString) -> IO (Maybe ByteString))
-> IORef (Maybe ByteString) -> IO (Maybe ByteString)
forall a b. (a -> b) -> a -> b
$ RecvStreamQ -> IORef (Maybe ByteString)
pendingData RecvStreamQ
streamRecvQ
writePendingData :: Stream -> ByteString -> IO ()
writePendingData :: Stream -> ByteString -> IO ()
writePendingData Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
..} ByteString
bs = IORef (Maybe ByteString) -> Maybe ByteString -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe ByteString)
pendingData RecvStreamQ
streamRecvQ) (Maybe ByteString -> IO ()) -> Maybe ByteString -> IO ()
forall a b. (a -> b) -> a -> b
$ ByteString -> Maybe ByteString
forall a. a -> Maybe a
Just ByteString
bs
clearPendingData :: Stream -> IO ()
clearPendingData :: Stream -> IO ()
clearPendingData Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
..} = IORef (Maybe ByteString) -> Maybe ByteString -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef (RecvStreamQ -> IORef (Maybe ByteString)
pendingData RecvStreamQ
streamRecvQ) Maybe ByteString
forall a. Maybe a
Nothing
takeRecvStreamQwithSize :: Stream -> Int -> IO ByteString
takeRecvStreamQwithSize :: Stream -> StreamId -> IO ByteString
takeRecvStreamQwithSize Stream
strm StreamId
siz0 = do
Bool
eos <- Stream -> IO Bool
getEndOfStream Stream
strm
if Bool
eos then
ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
""
else do
Maybe ByteString
mb <- Stream -> IO (Maybe ByteString)
readPendingData Stream
strm
case Maybe ByteString
mb of
Maybe ByteString
Nothing -> do
ByteString
b0 <- Stream -> IO ByteString
takeRecvStreamQ Stream
strm
if ByteString
b0 ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
"" then do
Stream -> IO ()
setEndOfStream Stream
strm
ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
""
else do
let len :: StreamId
len = ByteString -> StreamId
BS.length ByteString
b0
case StreamId
len StreamId -> StreamId -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` StreamId
siz0 of
Ordering
LT -> StreamId -> ([ByteString] -> [ByteString]) -> IO ByteString
tryRead (StreamId
siz0 StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
- StreamId
len) (ByteString
b0 ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
:)
Ordering
EQ -> ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
b0
Ordering
GT -> do
let (ByteString
b1,ByteString
b2) = StreamId -> ByteString -> (ByteString, ByteString)
BS.splitAt StreamId
siz0 ByteString
b0
Stream -> ByteString -> IO ()
writePendingData Stream
strm ByteString
b2
ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return ByteString
b1
Just ByteString
b0 -> do
Stream -> IO ()
clearPendingData Stream
strm
let len :: StreamId
len = ByteString -> StreamId
BS.length ByteString
b0
StreamId -> ([ByteString] -> [ByteString]) -> IO ByteString
tryRead (StreamId
siz0 StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
- StreamId
len) (ByteString
b0 ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
:)
where
tryRead :: StreamId -> ([ByteString] -> [ByteString]) -> IO ByteString
tryRead StreamId
siz [ByteString] -> [ByteString]
build = do
Maybe ByteString
mb <- Stream -> IO (Maybe ByteString)
tryTakeRecvStreamQ Stream
strm
case Maybe ByteString
mb of
Maybe ByteString
Nothing -> ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
BS.concat ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> [ByteString]
build []
Just ByteString
b -> do
if ByteString
b ByteString -> ByteString -> Bool
forall a. Eq a => a -> a -> Bool
== ByteString
"" then do
Stream -> IO ()
setEndOfStream Stream
strm
ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
BS.concat ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> [ByteString]
build []
else do
let len :: StreamId
len = ByteString -> StreamId
BS.length ByteString
b
case StreamId
len StreamId -> StreamId -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` StreamId
siz of
Ordering
LT -> StreamId -> ([ByteString] -> [ByteString]) -> IO ByteString
tryRead (StreamId
siz StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
- StreamId
len) ([ByteString] -> [ByteString]
build ([ByteString] -> [ByteString])
-> ([ByteString] -> [ByteString]) -> [ByteString] -> [ByteString]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (ByteString
b ByteString -> [ByteString] -> [ByteString]
forall a. a -> [a] -> [a]
:))
Ordering
EQ -> ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
BS.concat ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> [ByteString]
build [ByteString
b]
Ordering
GT -> do
let (ByteString
b1,ByteString
b2) = StreamId -> ByteString -> (ByteString, ByteString)
BS.splitAt StreamId
siz ByteString
b
Stream -> ByteString -> IO ()
writePendingData Stream
strm ByteString
b2
ByteString -> IO ByteString
forall (m :: * -> *) a. Monad m => a -> m a
return (ByteString -> IO ByteString) -> ByteString -> IO ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> ByteString
BS.concat ([ByteString] -> ByteString) -> [ByteString] -> ByteString
forall a b. (a -> b) -> a -> b
$ [ByteString] -> [ByteString]
build [ByteString
b1]
putRxStreamData :: Stream -> RxStreamData -> IO Bool
putRxStreamData :: Stream -> RxStreamData -> IO Bool
putRxStreamData Stream
s rx :: RxStreamData
rx@(RxStreamData ByteString
dat StreamId
off StreamId
_ Bool
_) = do
StreamId
lim <- Stream -> IO StreamId
getRxMaxStreamData Stream
s
if ByteString -> StreamId
BS.length ByteString
dat StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ StreamId
off StreamId -> StreamId -> Bool
forall a. Ord a => a -> a -> Bool
> StreamId
lim then
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
Bool
_ <- Stream -> RxStreamData -> (ByteString -> IO ()) -> IO () -> IO Bool
tryReassemble Stream
s RxStreamData
rx ByteString -> IO ()
put IO ()
putFin
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
where
put :: ByteString -> IO ()
put ByteString
"" = () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
put ByteString
d = Stream -> ByteString -> IO ()
putRecvStreamQ Stream
s ByteString
d
putFin :: IO ()
putFin = Stream -> ByteString -> IO ()
putRecvStreamQ Stream
s ByteString
""
tryReassemble :: Stream -> RxStreamData -> (StreamData -> IO ()) -> IO () -> IO Bool
tryReassemble :: Stream -> RxStreamData -> (ByteString -> IO ()) -> IO () -> IO Bool
tryReassemble Stream{} (RxStreamData ByteString
"" StreamId
_ StreamId
_ Bool
False) ByteString -> IO ()
_ IO ()
_ = Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
tryReassemble Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
..} x :: RxStreamData
x@(RxStreamData ByteString
"" StreamId
off StreamId
_ Bool
True) ByteString -> IO ()
_ IO ()
putFin = do
si0 :: StreamState
si0@(StreamState StreamId
off0 Bool
fin0) <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
let si1 :: StreamState
si1 = StreamState
si0 { streamFin :: Bool
streamFin = Bool
True }
if Bool
fin0 then do
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else case StreamId
off StreamId -> StreamId -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` StreamId
off0 of
Ordering
LT -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Ordering
EQ -> do
IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
IO ()
putFin
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Ordering
GT -> do
IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
IORef (Skew RxStreamData)
-> (Skew RxStreamData -> Skew RxStreamData) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (RxStreamData -> Skew RxStreamData -> Skew RxStreamData
forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
tryReassemble Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
..} x :: RxStreamData
x@(RxStreamData ByteString
dat StreamId
off StreamId
len Bool
False) ByteString -> IO ()
put IO ()
putFin = do
si0 :: StreamState
si0@(StreamState StreamId
off0 Bool
_) <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
case StreamId
off StreamId -> StreamId -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` StreamId
off0 of
Ordering
LT -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Ordering
EQ -> do
ByteString -> IO ()
put ByteString
dat
StreamState -> StreamId -> IO ()
loop StreamState
si0 (StreamId
off0 StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ StreamId
len)
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Ordering
GT -> do
IORef (Skew RxStreamData)
-> (Skew RxStreamData -> Skew RxStreamData) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (RxStreamData -> Skew RxStreamData -> Skew RxStreamData
forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
where
loop :: StreamState -> StreamId -> IO ()
loop StreamState
si0 StreamId
xff = do
Maybe (Seq RxStreamData)
mrxs <- IORef (Skew RxStreamData)
-> (Skew RxStreamData
-> (Skew RxStreamData, Maybe (Seq RxStreamData)))
-> IO (Maybe (Seq RxStreamData))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef (Skew RxStreamData)
streamReass (StreamId
-> Skew RxStreamData
-> (Skew RxStreamData, Maybe (Seq RxStreamData))
forall a. Frag a => StreamId -> Skew a -> (Skew a, Maybe (Seq a))
Skew.deleteMinIf StreamId
xff)
case Maybe (Seq RxStreamData)
mrxs of
Maybe (Seq RxStreamData)
Nothing -> IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si0 { streamOffset :: StreamId
streamOffset = StreamId
xff }
Just Seq RxStreamData
rxs -> do
(RxStreamData -> IO ()) -> Seq RxStreamData -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (ByteString -> IO ()
put (ByteString -> IO ())
-> (RxStreamData -> ByteString) -> RxStreamData -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. RxStreamData -> ByteString
rxstrmData) Seq RxStreamData
rxs
let xff1 :: StreamId
xff1 = Seq RxStreamData -> StreamId
forall a. Frag a => a -> StreamId
nextOff Seq RxStreamData
rxs
if Seq RxStreamData -> Bool
hasFin Seq RxStreamData
rxs then do
IO ()
putFin
else do
StreamState -> StreamId -> IO ()
loop StreamState
si0 StreamId
xff1
tryReassemble Stream{StreamId
TVar Flow
IORef (Skew RxStreamData)
IORef StreamState
IORef Flow
Connection
RecvStreamQ
streamReass :: IORef (Skew RxStreamData)
streamRecvQ :: RecvStreamQ
streamStateRx :: IORef StreamState
streamStateTx :: IORef StreamState
streamFlowRx :: IORef Flow
streamFlowTx :: TVar Flow
streamConnection :: Connection
streamId :: StreamId
streamReass :: Stream -> IORef (Skew RxStreamData)
streamRecvQ :: Stream -> RecvStreamQ
streamStateRx :: Stream -> IORef StreamState
streamStateTx :: Stream -> IORef StreamState
streamFlowRx :: Stream -> IORef Flow
streamFlowTx :: Stream -> TVar Flow
streamConnection :: Stream -> Connection
streamId :: Stream -> StreamId
..} x :: RxStreamData
x@(RxStreamData ByteString
dat StreamId
off StreamId
len Bool
True) ByteString -> IO ()
put IO ()
putFin = do
si0 :: StreamState
si0@(StreamState StreamId
off0 Bool
fin0) <- IORef StreamState -> IO StreamState
forall a. IORef a -> IO a
readIORef IORef StreamState
streamStateRx
let si1 :: StreamState
si1 = StreamState
si0 { streamFin :: Bool
streamFin = Bool
True }
if Bool
fin0 then do
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else case StreamId
off StreamId -> StreamId -> Ordering
forall a. Ord a => a -> a -> Ordering
`compare` StreamId
off0 of
Ordering
LT -> Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
Ordering
EQ -> do
let off1 :: StreamId
off1 = StreamId
off0 StreamId -> StreamId -> StreamId
forall a. Num a => a -> a -> a
+ StreamId
len
IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1 { streamOffset :: StreamId
streamOffset = StreamId
off1 }
ByteString -> IO ()
put ByteString
dat
IO ()
putFin
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
Ordering
GT -> do
IORef StreamState -> StreamState -> IO ()
forall a. IORef a -> a -> IO ()
writeIORef IORef StreamState
streamStateRx StreamState
si1
IORef (Skew RxStreamData)
-> (Skew RxStreamData -> Skew RxStreamData) -> IO ()
forall a. IORef a -> (a -> a) -> IO ()
atomicModifyIORef'' IORef (Skew RxStreamData)
streamReass (RxStreamData -> Skew RxStreamData -> Skew RxStreamData
forall a. Frag a => a -> Skew a -> Skew a
Skew.insert RxStreamData
x)
Bool -> IO Bool
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
hasFin :: Seq RxStreamData -> Bool
hasFin :: Seq RxStreamData -> Bool
hasFin Seq RxStreamData
s = case Seq RxStreamData -> ViewR RxStreamData
forall a. Seq a -> ViewR a
Seq.viewr Seq RxStreamData
s of
ViewR RxStreamData
Seq.EmptyR -> Bool
False
Seq RxStreamData
_ Seq.:> RxStreamData
x -> RxStreamData -> Bool
rxstrmFin RxStreamData
x