module Data.Conduit.TQueue
(
sourceTQueue
, sinkTQueue
, sourceTBQueue
, sinkTBQueue
, module Control.Concurrent.STM.TQueue
) where
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Control.Monad.IO.Class
import Data.Conduit
import Data.Conduit.Internal
sourceTQueue :: MonadIO m => TQueue a -> Source m a
sourceTQueue q = ConduitM src
where src = PipeM pull
pull = do x <- liftSTM $ readTQueue q
return $ HaveOutput src close x
close = return ()
sinkTQueue :: MonadIO m => TQueue a -> Sink a m ()
sinkTQueue q = ConduitM src
where src = sink
sink = NeedInput push close
push input = PipeM ((liftSTM $ writeTQueue q input)
>> (return $ NeedInput push close))
close _ = return ()
sourceTBQueue :: MonadIO m => TBQueue a -> Source m a
sourceTBQueue q = ConduitM src
where src = PipeM pull
pull = do x <- liftSTM $ readTBQueue q
return $ HaveOutput src close x
close = return ()
sinkTBQueue :: MonadIO m => TBQueue a -> Sink a m ()
sinkTBQueue q = ConduitM src
where src = sink
sink = NeedInput push close
push input = PipeM ((liftSTM $ writeTBQueue q input)
>> (return $ NeedInput push close))
close _ = return ()
liftSTM :: forall (m :: * -> *) a. MonadIO m => STM a -> m a
liftSTM = liftIO . atomically