{-# Language LambdaCase, Rank2Types #-} module Pipes.IOStream ( -- * IStream and OStream IStream, OStream, -- * Convert between IOStreams and pipes mkIStream, mkOStream, fromIStream, fromOStream, -- * Reading readOne, readWith, -- * Wrapping wrapIStream, -- * Writting write, writeProducer, -- * Detecting end of stream isEnded, -- * Connect and resume for consumer (>->+), -- * Re-exports Producer, Consumer, Parser ) where import Control.Monad import Data.IORef import Pipes import Pipes.Internal import Pipes.Parse newtype IStream a = IS (IORef (Producer a IO ())) newtype OStream a = OS (IORef (Consumer a IO ())) mkIStream :: Producer a IO () -> IO (IStream a) mkIStream p = liftM IS $ newIORef p mkOStream :: Consumer a IO () -> IO (OStream a) mkOStream c = liftM OS $ newIORef c fromIStream :: IStream a -> IO (Producer a IO ()) fromIStream (IS rp) = readIORef rp fromOStream :: OStream a -> IO (Consumer a IO ()) fromOStream (OS rc) = readIORef rc wrapIStream :: IStream a -> (Producer a IO () -> Producer b IO ()) -> IO (IStream b) wrapIStream i f = fromIStream i >>= mkIStream . f readOne :: IStream a -> IO (Maybe a) readOne (IS rp) = do p <- readIORef rp next p >>= \case Left () -> return Nothing Right (a, p') -> writeIORef rp p' >> return (Just a) readWith :: IStream a -> Parser a IO r -> IO r readWith (IS rp) parser = do p <- readIORef rp (r, p') <- runStateT parser p writeIORef rp p' return r write :: OStream a -> a -> IO () write o a = writeProducer o (yield a) writeProducer :: OStream a -> Producer a IO () -> IO () writeProducer (OS rc) p = readIORef rc >>= (p >->+) >>= writeIORef rc isEnded :: OStream a -> IO Bool isEnded (OS rc) = readIORef rc >>= \case Pure _ -> return True _ -> return False {- | Pairs each await in the consumer with an yield in the producer. Returns remaining consumer. -} (>->+) :: Monad m => Producer a m x -> Consumer a m y -> m (Consumer a m y) p0 >->+ c0 = go p0 c0 where go p c = case c of Request _ f -> go1 p where go1 = \case Respond a g -> g () `go` f a M p' -> go1 =<< p' Pure _ -> return c Request v _ -> closed v M c' -> go p =<< c' Pure _ -> return c Respond v _ -> closed v