{-# LANGUAGE Rank2Types, KindSignatures #-} -- | Contains a simple source and sink linking together conduits in -- different threads. For extended examples of usage and bottlenecks -- see 'Data.Conduit.TMChan'. -- -- TQueue is an amoritized FIFO queue behaves like TChan, with two -- important differences: -- -- * it's faster (but amortized thus the cost of individual operations -- may vary a lot) -- -- * it doesn't provide equivalent of the dupTChan and cloneTChan -- operations -- -- -- Here is short description of data structures: -- -- * TQueue - unbounded infinite queue -- -- * TBQueue - bounded infinite queue -- -- * TMQueue - unbounded finite (closable) queue -- -- * TBMQueue - bounded finite (closable) queue -- -- Caveats -- -- Infinite operations means that source doesn't know when stream is -- ended so one need to use other methods of finishing stream like -- sending an exception or finish conduit in downstream. -- module Data.Conduit.TQueue ( -- * Connectors -- ** Infinite queues -- $inifinite -- *** TQueue connectors sourceTQueue , sinkTQueue -- *** TBQueue connectors , sourceTBQueue , sinkTBQueue , entangledPair -- ** Closable queues -- *** TMQueue connectors , sourceTMQueue , sinkTMQueue -- *** TBMQueue connectors , sourceTBMQueue , sinkTBMQueue , module Control.Concurrent.STM.TQueue ) where import Control.Concurrent.STM import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TBMQueue import Control.Concurrent.STM.TMQueue import Control.Monad import Control.Monad.IO.Class import Data.Conduit import Data.Conduit.Internal -- | A simple wrapper around a "TQueue". As data is pushed into the queue, the -- source will read it and pass it down the conduit pipeline. sourceTQueue :: MonadIO m => TQueue a -> Source m a sourceTQueue q = ConduitM src where src = PipeM pull pull = do x <- liftSTM $ readTQueue q return $ HaveOutput src close x close = return () -- | A simple wrapper around a "TQueue". As data is pushed into this sink, it -- will magically begin to appear in the queue. sinkTQueue :: MonadIO m => TQueue a -> Sink a m () sinkTQueue q = ConduitM src where src = sink sink = NeedInput push close push input = PipeM ((liftSTM $ writeTQueue q input) >> (return $ NeedInput push close)) close _ = return () -- | A simple wrapper around a "TBQueue". As data is pushed into the queue, the -- source will read it and pass it down the conduit pipeline. sourceTBQueue :: MonadIO m => TBQueue a -> Source m a sourceTBQueue q = ConduitM src where src = PipeM pull pull = do x <- liftSTM $ readTBQueue q return $ HaveOutput src close x close = return () -- | A simple wrapper around a "TBQueue". As data is pushed into this sink, it -- will magically begin to appear in the queue. Boolean argument is used -- to specify if queue should be closed when the sink is closed. sinkTBQueue :: MonadIO m => TBQueue a -> Sink a m () sinkTBQueue q = ConduitM src where src = sink sink = NeedInput push close push input = PipeM ((liftSTM $ writeTBQueue q input) >> (return $ NeedInput push close)) close _ = return () -- | A convenience wrapper for creating a source and sink TBQueue of the given -- size at once, without exposing the underlying queue. entangledPair :: MonadIO m => Int -> m (Source m a, Sink a m ()) entangledPair size = liftM (liftM2 (,) sourceTBQueue sinkTBQueue) $ liftIO $ atomically $ newTBQueue size -- | A simple wrapper around a "TMQueue". As data is pushed into the queue, the -- source will read it and pass it down the conduit pipeline. When the -- queue is closed, the source will close also. sourceTMQueue :: MonadIO m => TMQueue a -> Source m a sourceTMQueue q = ConduitM src where src = PipeM pull pull = do mx <- liftSTM $ readTMQueue q case mx of Nothing -> return $ Done () Just x -> return $ HaveOutput src close x close = do liftSTM $ closeTMQueue q return () -- | A simple wrapper around a "TMQueue". As data is pushed into this sink, it -- will magically begin to appear in the queue. sinkTMQueue :: MonadIO m => TMQueue a -> Bool -- ^ Should the queue be closed when the sink is closed? -> Sink a m () sinkTMQueue q shouldClose = ConduitM src where src = sink sink = NeedInput push close push input = PipeM ((liftSTM $ writeTMQueue q input) >> (return $ NeedInput push close)) close _ = do when shouldClose (liftSTM $ closeTMQueue q) return () -- | A simple wrapper around a "TBMQueue". As data is pushed into the queue, the -- source will read it and pass it down the conduit pipeline. When the -- queue is closed, the source will close also. sourceTBMQueue :: MonadIO m => TBMQueue a -> Source m a sourceTBMQueue q = ConduitM src where src = PipeM pull pull = do mx <- liftSTM $ readTBMQueue q case mx of Nothing -> return $ Done () Just x -> return $ HaveOutput src close x close = do liftSTM $ closeTBMQueue q return () -- | A simple wrapper around a "TBMQueue". As data is pushed into this sink, it -- will magically begin to appear in the queue. sinkTBMQueue :: MonadIO m => TBMQueue a -> Bool -- ^ Should the queue be closed when the sink is closed? -> Sink a m () sinkTBMQueue q shouldClose = ConduitM src where src = sink sink = NeedInput push close push input = PipeM ((liftSTM $ writeTBMQueue q input) >> (return $ NeedInput push close)) close _ = do when shouldClose (liftSTM $ closeTBMQueue q) return () liftSTM :: forall (m :: * -> *) a. MonadIO m => STM a -> m a liftSTM = liftIO . atomically -- $infinite -- It's impossible to close infinite queues but they work slightly faster, -- so it's reasonable to use them inside infinite computations for -- performance reasons.