{-| Module : Z.IO.BIO.Base Description : Composable IO Loops Copyright : (c) Dong Han, 2017-2020 License : BSD Maintainer : winterland1989@gmail.com Stability : experimental Portability : non-portable This module provides 'BIO' (block IO) type to facilitate writing streaming programs. A 'BIO' node usually: * Process input in unit of block(or item). * Running in constant spaces, which means the memory usage won't accumulate. * Keep some state in IO, which is sealed in 'BIO' closure. -} module Z.IO.BIO.Base ( -- * The BIO type BIO, pattern EOF, Source, Sink -- ** Basic combinators , appendSource, concatSource, concatSource' , joinSink, fuseSink -- * Run BIO chain , discard , step, step_ , run, run_ , runBlock, runBlock_, unsafeRunBlock , runBlocks, runBlocks_, unsafeRunBlocks -- * Make new BIO , fromPure, fromIO , filter, filterIO -- * Use with fold , fold', foldIO' -- ** Source , initSourceFromFile , initSourceFromFile' , sourceFromIO , sourceFromList , sourceFromBuffered , sourceTextFromBuffered , sourceJSONFromBuffered , sourceParserFromBuffered , sourceParseChunkFromBuffered -- ** Sink , sinkToIO , sinkToList , initSinkToFile , sinkToBuffered , sinkBuilderToBuffered -- ** Bytes specific , newReChunk , newUTF8Decoder , newParser, newMagicSplitter, newLineSplitter , newBase64Encoder, newBase64Decoder , hexEncode , newHexDecoder -- ** Generic BIO , counter , seqNum , newGrouping , ungrouping , consumed ) where import Prelude hiding (filter) import Control.Concurrent.MVar import Control.Concurrent.STM import qualified Control.Foldl as L import Control.Monad import Control.Monad.IO.Class import Data.Bits ((.|.)) import Data.IORef import qualified Data.List as List import Data.Void import Data.Word import System.IO.Unsafe (unsafePerformIO) import qualified Z.Data.Array as A import qualified Z.Data.Builder as B import Z.Data.CBytes (CBytes) import qualified Z.Data.JSON as JSON import qualified Z.Data.Parser as P import Z.Data.PrimRef import qualified Z.Data.Text as T import qualified Z.Data.Text.UTF8Codec as T import qualified Z.Data.Vector as V import qualified Z.Data.Vector.Base as V import Z.Data.Vector.Base64 import qualified Z.Data.Vector.Hex as Hex import Z.IO.Buffered import Z.IO.Exception import qualified Z.IO.FileSystem.Base as FS import Z.IO.Resource -- | A 'BIO'(blocked IO) node. -- -- A 'BIO' node is a push based stream transformer. It can be used to describe different kinds of IO -- devices: -- -- * @BIO inp out@ describe an IO state machine(e.g. z_stream in zlib), -- which takes some input in block, then outputs. -- * @type Source out = BIO Void out@ described an IO source, which never takes input, -- but gives output until EOF by looping. -- * @type Sink inp = BIO inp Void@ described an IO sink, which takes input and perform some IO effects, -- such as writing to terminal or files. -- -- You can connect these 'BIO' nodes with '>|>', which connect left node's output to right node's input, -- and return a new 'BIO' node with left node's input type and right node's output type. -- -- You can run a 'BIO' node in different ways: -- -- * 'step'\/'step_' to supply a single chunk of input and step the BIO node. -- * 'run'\/'run_' will supply EOF directly, which will effectively pull all values from source, -- and push to sink until source reaches EOF. -- * 'runBlock'\/'runBlock_' will supply a single block of input as whole input and run the BIO node. -- * 'runBlocks'\/'runBlocks_' will supply a list of blocks as whole input and run the BIO node. -- -- Note 'BIO' usually contains some IO states, you can consider it as an opaque 'IORef': -- -- * You shouldn't use a 'BIO' node across multiple 'BIO' chain unless the state can be reset. -- * You shouldn't use a 'BIO' node across multiple threads unless document states otherwise. -- -- 'BIO' is simply a convenient way to construct single-thread streaming computation, to use 'BIO' -- in multiple threads, check "Z.IO.BIO.Concurrent" module. -- type BIO inp out = (Maybe out -> IO ()) -- ^ Pass 'EOF' to indicate current node reaches EOF -> Maybe inp -- ^ 'EOF' indicates upstream reaches EOF -> IO () -- | Patterns for more meaningful pattern matching. pattern EOF :: Maybe a pattern EOF = Nothing -- | Type alias for 'BIO' node which never takes input. -- -- Note when implement a 'Source', you should assume 'EOF' argument is supplied only once, and you -- should loop to call downstream continuation with all available chunks, then write a final 'EOF' -- to indicate EOF. type Source x = BIO Void x -- | Type alias for 'BIO' node which only takes input and perform effects. -- -- Note when implement a 'Sink', you should assume 'EOF' argument is supplied only once(when upstream -- reaches EOF), you do not need to call downstream continuation before EOF, and -- do a flush(also write a final 'EOF') when upstream reach EOF. type Sink x = BIO x () -- | Connect two 'BIO' source, after first reach EOF, draw elements from second. appendSource :: HasCallStack => Source a -> Source a -> Source a {-# INLINABLE appendSource #-} b1 `appendSource` b2 = \ k _ -> b1 (\ y -> case y of Just _ -> k y _ -> b2 k EOF) EOF -- | Fuse two 'BIO' sinks, i.e. everything written to the fused sink will be written to left and right sink. -- -- Flush result 'BIO' will effectively flush both sink. joinSink :: HasCallStack => Sink out -> Sink out -> Sink out {-# INLINABLE joinSink #-} b1 `joinSink` b2 = \ k mx -> case mx of Just _ -> do b1 discard mx b2 discard mx _ -> do b1 discard EOF b2 discard EOF k EOF -- | Fuse a list of 'BIO' sinks, everything written to the fused sink will be written to every sink in the list. -- -- Flush result 'BIO' will effectively flush every sink in the list. fuseSink :: HasCallStack => [Sink out] -> Sink out {-# INLINABLE fuseSink #-} fuseSink ss = \ k mx -> case mx of Just _ -> mapM_ (\ s -> s discard mx) ss _ -> do mapM_ (\ s -> s discard mx) ss k EOF -- | Connect list of 'BIO' sources, after one reach EOF, draw element from next. concatSource :: HasCallStack => [Source a] -> Source a {-# INLINABLE concatSource #-} concatSource = List.foldl' appendSource emptySource -- | A 'Source' directly write EOF to downstream. emptySource :: Source a {-# INLINABLE emptySource #-} emptySource = \ k _ -> k EOF -- | Connect list of 'BIO' sources, after one reach EOF, draw element from next. concatSource' :: HasCallStack => Source (Source a) -> Source a {-# INLINABLE concatSource' #-} concatSource' ssrc = \ k _ -> ssrc (\ msrc -> case msrc of Just src -> src (\ mx -> case mx of Just _ -> k mx _ -> return ()) EOF _ -> k EOF) EOF ------------------------------------------------------------------------------- -- Run BIO -- | Discards a value. discard :: a -> IO () {-# INLINABLE discard #-} discard _ = return () -- | Supply a single chunk of input to a 'BIO' and collect result. step :: HasCallStack => BIO inp out -> inp -> IO [out] {-# INLINABLE step #-} step bio inp = do accRef <- newIORef [] bio (mapM_ $ \ x -> modifyIORef' accRef (x:)) (Just inp) reverse <$> readIORef accRef -- | Supply a single chunk of input to a 'BIO' without collecting result. step_ :: HasCallStack => BIO inp out -> inp -> IO () {-# INLINABLE step_ #-} step_ bio = bio discard . Just -- | Run a 'BIO' loop without providing input. -- -- When used on 'Source', it starts the streaming loop. -- When used on 'Sink', it performs a flush. run_ :: HasCallStack => BIO inp out -> IO () {-# INLINABLE run_ #-} run_ bio = bio discard EOF -- | Run a 'BIO' loop without providing input, and collect result. -- -- When used on 'Source', it will collect all input chunks. run :: HasCallStack => BIO inp out -> IO [out] {-# INLINABLE run #-} run bio = do accRef <- newIORef [] bio (mapM_ $ \ x -> modifyIORef' accRef (x:)) EOF reverse <$> readIORef accRef -- | Run a strict fold over a source with 'L.Fold'. fold' :: L.Fold a b -> Source a -> IO b {-# INLINABLE fold' #-} fold' (L.Fold s i e) bio = do iref <- newIORef i bio (mapM_ (\ x -> modifyIORef' iref (\ i' -> s i' x))) Nothing e <$> readIORef iref -- | Run a strict fold over a source with 'L.FoldM'. foldIO' :: L.FoldM IO a b -> Source a -> IO b {-# INLINABLE foldIO' #-} foldIO' (L.FoldM s i e) bio = do iref <- newIORef =<< i bio (mapM_ (\ x -> do i' <- readIORef iref !x' <- s i' x writeIORef iref x')) Nothing e =<< readIORef iref -- | Run a 'BIO' loop with a single chunk of input and EOF, and collect result. -- runBlock :: HasCallStack => BIO inp out -> inp -> IO [out] {-# INLINABLE runBlock #-} runBlock bio inp = do accRef <- newIORef [] bio (mapM_ $ \ x -> modifyIORef' accRef (x:)) (Just inp) bio (mapM_ $ \ x -> modifyIORef' accRef (x:)) EOF reverse <$> readIORef accRef -- | Run a 'BIO' loop with a single chunk of input and EOF, without collecting result. -- runBlock_ :: HasCallStack => BIO inp out -> inp -> IO () {-# INLINABLE runBlock_ #-} runBlock_ bio inp = do bio discard (Just inp) bio discard EOF -- | Wrap 'runBlock' into a pure interface. -- -- You can wrap a stateful BIO computation(including the creation of 'BIO' node), -- when you can guarantee a computation is pure, e.g. compressing, decoding, etc. unsafeRunBlock :: HasCallStack => IO (BIO inp out) -> inp -> [out] {-# INLINABLE unsafeRunBlock #-} unsafeRunBlock new inp = unsafePerformIO (new >>= \ bio -> runBlock bio inp) -- | Supply blocks of input and EOF to a 'BIO', and collect results. -- -- Note many 'BIO' node will be closed or not be able to take new input after drained. runBlocks :: HasCallStack => BIO inp out -> [inp] -> IO [out] {-# INLINABLE runBlocks #-} runBlocks bio inps = do accRef <- newIORef [] forM_ inps $ bio (mapM_ $ \ x -> modifyIORef' accRef (x:)) . Just bio (mapM_ $ \ x -> modifyIORef' accRef (x:)) EOF reverse <$> readIORef accRef -- | Supply blocks of input and EOF to a 'BIO', without collecting results. -- -- Note many 'BIO' node will be closed or not be able to take new input after drained. runBlocks_ :: HasCallStack => BIO inp out -> [inp] -> IO () {-# INLINABLE runBlocks_ #-} runBlocks_ bio inps = do forM_ inps $ bio discard . Just bio discard EOF -- | Wrap 'runBlocks' into a pure interface. -- -- Similar to 'unsafeRunBlock', but with a list of input blocks. unsafeRunBlocks :: HasCallStack => IO (BIO inp out) -> [inp] -> [out] {-# INLINABLE unsafeRunBlocks #-} unsafeRunBlocks new inps = unsafePerformIO (new >>= \ bio -> runBlocks bio inps) ------------------------------------------------------------------------------- -- Source -- | Source a list(or any 'Foldable') from memory. -- sourceFromList :: Foldable f => f a -> Source a {-# INLINABLE sourceFromList #-} sourceFromList xs0 = \ k _ -> do mapM_ (k . Just) xs0 k EOF -- | Turn a 'BufferedInput' into 'BIO' source, map EOF to EOF. -- sourceFromBuffered :: HasCallStack => BufferedInput -> Source V.Bytes {-# INLINABLE sourceFromBuffered #-} sourceFromBuffered i = \ k _ -> loop k where loop k = do x <- readBuffer i if V.null x then k EOF else k (Just x) >> loop k -- | Turn a `IO` action into 'Source' sourceFromIO :: HasCallStack => IO (Maybe a) -> Source a {-# INLINABLE sourceFromIO #-} sourceFromIO io = \ k _ -> loop k where loop k = do x <- io case x of Just _ -> k x >> loop k _ -> k EOF -- | Turn a UTF8 encoded 'BufferedInput' into 'BIO' source, map EOF to EOF. -- sourceTextFromBuffered :: HasCallStack => BufferedInput -> Source T.Text {-# INLINABLE sourceTextFromBuffered #-} sourceTextFromBuffered i = \ k _ -> loop k where loop k = do x <- readBufferText i if T.null x then k EOF else k (Just x) >> loop k -- | Turn a 'JSON' encoded 'BufferedInput' into 'BIO' source, ignoring any -- whitespaces bewteen JSON objects. If EOF reached, then return 'EOF'. -- Throw 'OtherError' with name "EJSON" if JSON value is not parsed or converted. -- sourceJSONFromBuffered :: forall a. (JSON.JSON a, HasCallStack) => BufferedInput -> Source a {-# INLINABLE sourceJSONFromBuffered #-} sourceJSONFromBuffered = sourceParseChunkFromBuffered JSON.decodeChunk -- | Turn buffered input device into a packet source, throw 'OtherError' with name @EPARSE@ if parsing fail. sourceParserFromBuffered :: HasCallStack => P.Parser a -> BufferedInput -> Source a {-# INLINABLE sourceParserFromBuffered #-} sourceParserFromBuffered p = sourceParseChunkFromBuffered (P.parseChunk p) -- | Turn buffered input device into a packet source, throw 'OtherError' with name @EPARSE@ if parsing fail. sourceParseChunkFromBuffered :: (HasCallStack, T.Print e) => (V.Bytes -> P.Result e a) -> BufferedInput -> Source a {-# INLINABLE sourceParseChunkFromBuffered #-} sourceParseChunkFromBuffered pc bi = \ k _ -> let loopA = do bs <- readBuffer bi if V.null bs then k EOF else loopB bs loopB bs = do (rest, r) <- P.parseChunks pc (readBuffer bi) bs case r of Right v -> k (Just v) Left e -> throwOtherError "EPARSE" (T.toText e) if V.null rest then loopA else loopB rest in loopA -- | Turn a file into a 'V.Bytes' source. initSourceFromFile :: HasCallStack => CBytes -> Resource (Source V.Bytes) {-# INLINABLE initSourceFromFile #-} initSourceFromFile p = do f <- FS.initFile p FS.O_RDONLY FS.DEFAULT_FILE_MODE liftIO (sourceFromBuffered <$> newBufferedInput f) -- | Turn a file into a 'V.Bytes' source with given chunk size. initSourceFromFile' :: HasCallStack => CBytes -> Int -> Resource (Source V.Bytes) {-# INLINABLE initSourceFromFile' #-} initSourceFromFile' p bufSiz = do f <- FS.initFile p FS.O_RDONLY FS.DEFAULT_FILE_MODE liftIO (sourceFromBuffered <$> newBufferedInput' bufSiz f) -------------------------------------------------------------------------------- -- Sink -- | Turn a 'BufferedOutput' into a 'V.Bytes' sink. sinkToBuffered :: HasCallStack => BufferedOutput -> Sink V.Bytes {-# INLINABLE sinkToBuffered #-} sinkToBuffered bo = \ k mbs -> case mbs of Just bs -> writeBuffer bo bs >> k (Just ()) _ -> flushBuffer bo >> k EOF -- | Turn a 'BufferedOutput' into a 'B.Builder' sink. -- sinkBuilderToBuffered :: HasCallStack => BufferedOutput -> Sink (B.Builder a) {-# INLINABLE sinkBuilderToBuffered #-} sinkBuilderToBuffered bo = \ k mbs -> case mbs of Just bs -> writeBuilder bo bs >> k (Just ()) _ -> flushBuffer bo >> k EOF -- | Turn a file into a 'V.Bytes' sink. -- -- Note the file will be opened in @'FS.O_APPEND' .|. 'FS.O_CREAT' .|. 'FS.O_WRONLY'@ mode, -- bytes will be written after the end of the original file if there'are old bytes. initSinkToFile :: HasCallStack => CBytes -> Resource (Sink V.Bytes) {-# INLINABLE initSinkToFile #-} initSinkToFile p = do f <- FS.initFile p (FS.O_APPEND .|. FS.O_CREAT .|. FS.O_WRONLY) FS.DEFAULT_FILE_MODE liftIO (sinkToBuffered <$> newBufferedOutput f) -- | Turn an `IO` action into 'BIO' sink. -- sinkToIO :: HasCallStack => (a -> IO ()) -> Sink a {-# INLINABLE sinkToIO #-} sinkToIO f = \ k ma -> case ma of Just a -> f a >> k (Just ()) _ -> k EOF -- | Turn an `IO` action(and a flush action), into 'BIO' sink. -- sinkToIO' :: HasCallStack => (a -> IO ()) -> IO () -> Sink a {-# INLINABLE sinkToIO' #-} sinkToIO' f flush = \ k ma -> case ma of Just a -> f a >> k (Just ()) _ -> flush >> k EOF -- | Sink to a list in memory. -- -- The 'MVar' will be empty during sinking, and will be filled after sink receives an EOF. sinkToList :: IO (MVar [a], Sink a) sinkToList = do xsRef <- newIORef [] rRef <- newEmptyMVar return (rRef, sinkToIO' (\ x -> modifyIORef xsRef (x:)) (do modifyIORef xsRef reverse xs <- readIORef xsRef putMVar rRef xs)) -------------------------------------------------------------------------------- -- s -- | BIO node from a pure function. -- -- BIO node made with this funtion are stateless, thus can be reused across chains. fromPure :: (a -> b) -> BIO a b {-# INLINABLE fromPure #-} fromPure f = \ k x -> k (f <$> x) -- | BIO node from an IO function. -- -- BIO node made with this funtion may not be stateless, it depends on if the IO function use -- IO state. fromIO :: HasCallStack => (a -> IO b) -> BIO a b {-# INLINABLE fromIO #-} fromIO f = \ k x -> case x of Just x' -> f x' >>= k . Just _ -> k EOF -- | BIO node from a pure filter. -- -- BIO node made with this funtion are stateless, thus can be reused across chains. filter :: (a -> Bool) -> BIO a a {-# INLINABLE filter #-} filter f k = go where go (Just a) = when (f a) $ k (Just a) go Nothing = k Nothing -- | BIO node from an impure filter. -- -- BIO node made with this funtion may not be stateless, it depends on if the IO function use filterIO :: (a -> IO Bool) -> BIO a a {-# INLINABLE filterIO #-} filterIO f k = go where go (Just a) = do mbool <- f a when mbool $ k (Just a) go Nothing = k Nothing -- | Make a chunk size divider. -- -- A divider size divide each chunk's size to the nearest multiplier to granularity, -- last trailing chunk is directly returned. newReChunk :: Int -- ^ chunk granularity -> IO (BIO V.Bytes V.Bytes) {-# INLINABLE newReChunk #-} newReChunk n = do trailingRef <- newIORef V.empty return $ \ k mbs -> case mbs of Just bs -> do trailing <- readIORef trailingRef let chunk = trailing `V.append` bs l = V.length chunk if l >= n then do let l' = l - (l `rem` n) (chunk', rest) = V.splitAt l' chunk writeIORef trailingRef rest k (Just chunk') else writeIORef trailingRef chunk _ -> do trailing <- readIORef trailingRef unless (V.null trailing) $ do writeIORef trailingRef V.empty k (Just trailing) k EOF -- | Read buffer and parse with 'Parser'. -- -- This function will turn a 'Parser' into a 'BIO', throw 'OtherError' with name @EPARSE@ if parsing fail. -- newParser :: HasCallStack => P.Parser a -> IO (BIO V.Bytes a) {-# INLINABLE newParser #-} newParser p = do -- type LastParseState = Maybe (V.Bytes -> P.Result) resultRef <- newIORef EOF return $ \ k mbs -> do let loop f chunk = case f chunk of P.Success a trailing -> do k (Just a) unless (V.null trailing) (loop f trailing) P.Partial f' -> writeIORef resultRef (Just f') P.Failure e _ -> throwOtherError "EPARSE" (T.toText e) lastResult <- readIORef resultRef case mbs of Just bs -> do let f = case lastResult of Just x -> x _ -> P.parseChunk p loop f bs _ -> case lastResult of Just f -> loop f V.empty _ -> k EOF -- | Make a new UTF8 decoder, which decode bytes streams into text streams. -- -- If there're invalid UTF8 bytes, an 'OtherError' with name 'EINVALIDUTF8' will be thrown.` -- -- Note this node is supposed to be used with preprocess node such as decompressor, parser, etc. -- where bytes boundary cannot be controlled, UTF8 decoder will concat trailing bytes from last block to next one. -- Use this node directly with 'sourceFromBuffered' will not be as efficient as directly use -- 'sourceTextFromBuffered', because 'BufferedInput' provides push back capability, -- trailing bytes can be pushed back to reading buffer then returned with next block input together. -- newUTF8Decoder :: HasCallStack => IO (BIO V.Bytes T.Text) {-# INLINABLE newUTF8Decoder #-} newUTF8Decoder = do trailingRef <- newIORef V.empty return $ \ k mbs -> do case mbs of Just bs -> do trailing <- readIORef trailingRef let chunk = trailing `V.append` bs (V.PrimVector arr s l) = chunk if l > 0 && T.decodeCharLen arr s <= l then do let (i, _) = V.findR (\ w -> w >= 0b11000000 || w <= 0b01111111) chunk if (i == -1) then throwOtherError "EINVALIDUTF8" "invalid UTF8 bytes" else do if T.decodeCharLen arr (s + i) > l - i then do writeIORef trailingRef (V.fromArr arr (s+i) (l-i)) k (Just (T.validate (V.fromArr arr s i))) else do writeIORef trailingRef V.empty k (Just (T.validate chunk)) else writeIORef trailingRef chunk _ -> do trailing <- readIORef trailingRef if V.null trailing then k EOF else throwOtherError "EINVALIDUTF8" "invalid UTF8 bytes" -- | Make a new stream splitter based on magic byte. -- newMagicSplitter :: Word8 -> IO (BIO V.Bytes V.Bytes) {-# INLINABLE newMagicSplitter #-} newMagicSplitter magic = do trailingRef <- newIORef V.empty return $ \ k mx -> case mx of Just bs -> do trailing <- readIORef trailingRef let loop chunk = case V.elemIndex magic chunk of Just i -> do -- TODO: looping let (line, rest) = V.splitAt (i+1) chunk k (Just line) loop rest _ -> writeIORef trailingRef chunk loop (trailing `V.append` bs) _ -> do chunk <- readIORef trailingRef unless (V.null chunk) $ do writeIORef trailingRef V.empty k (Just chunk) k EOF -- | Make a new stream splitter based on linefeed(@\r\n@ or @\n@). -- -- The result bytes doesn't contain linefeed. newLineSplitter :: IO (BIO V.Bytes V.Bytes) {-# INLINABLE newLineSplitter #-} newLineSplitter = do s <- newMagicSplitter 10 return (s . fromPure dropLineEnd) where dropLineEnd bs@(V.PrimVector arr s l) = case bs `V.indexMaybe` (l-2) of Just r | r == 13 -> V.PrimVector arr s (l-2) | otherwise -> V.PrimVector arr s (l-1) _ | V.head bs == 10 -> V.PrimVector arr s (l-1) | otherwise -> V.PrimVector arr s l -- | Make a new base64 encoder node. newBase64Encoder :: IO (BIO V.Bytes V.Bytes) {-# INLINABLE newBase64Encoder #-} newBase64Encoder = do re <- newReChunk 3 return (re . fromPure base64Encode) -- | Make a new base64 decoder node. newBase64Decoder :: HasCallStack => IO (BIO V.Bytes V.Bytes) {-# INLINABLE newBase64Decoder #-} newBase64Decoder = do re <- newReChunk 4 return (re . fromPure base64Decode') -- | Make a hex encoder node. -- -- Hex encoder is stateless, it can be reused across chains. hexEncode :: Bool -- ^ uppercase? -> BIO V.Bytes V.Bytes {-# INLINABLE hexEncode #-} hexEncode upper = fromPure (Hex.hexEncode upper) -- | Make a new hex decoder node. newHexDecoder :: IO (BIO V.Bytes V.Bytes) {-# INLINABLE newHexDecoder #-} newHexDecoder = do re <- newReChunk 2 return (re . fromPure Hex.hexDecode') -- | Make a new BIO node which counts items flow throught it. -- -- 'Counter' is increased atomically, it's safe to read \/ reset the counter from other threads. counter :: Counter -> BIO a a {-# INLINABLE counter #-} counter c = fromIO inc where inc x = do atomicAddCounter_ c 1 return x -- | Make a new BIO node which counts items, and label item with a sequence number. -- -- 'Counter' is increased atomically, it's safe to read \/ reset the counter from other threads. seqNum :: Counter -> BIO a (Int, a) {-# INLINABLE seqNum #-} seqNum c = fromIO inc where inc x = do i <- atomicAddCounter c 1 return (i, x) -- | Make a BIO node grouping items into fixed size arrays. -- -- Trailing items are directly returned. newGrouping :: V.Vec v a => Int -> IO (BIO a (v a)) {-# INLINABLE newGrouping #-} newGrouping n | n < 1 = newGrouping 1 | otherwise = do c <- newCounter 0 arrRef <- newIORef =<< A.newArr n return $ \ k mx -> case mx of Just x -> do i <- readPrimRef c if i == n - 1 then do marr <- readIORef arrRef A.writeArr marr i x writePrimRef c 0 writeIORef arrRef =<< A.newArr n arr <- A.unsafeFreezeArr marr k . Just $! V.fromArr arr 0 n else do marr <- readIORef arrRef A.writeArr marr i x writePrimRef c (i+1) _ -> do i <- readPrimRef c if i /= 0 then do writePrimRef c 0 marr <- readIORef arrRef A.shrinkMutableArr marr i arr <- A.unsafeFreezeArr marr k . Just $! V.fromArr arr 0 i else k EOF -- | A BIO node flatten items. -- ungrouping :: BIO (V.Vector a) a {-# INLINABLE ungrouping #-} ungrouping = \ k mx -> case mx of Just x -> V.traverse_ (k . Just) x _ -> k EOF -- | A BIO node which write 'True' to 'IORef' when 'EOF' is reached. consumed :: TVar Bool -> BIO a a {-# INLINABLE consumed #-} consumed ref = \ k mx -> case mx of Just _ -> k mx _ -> do atomically (writeTVar ref True) k EOF