module Data.Conduit.TQueue
(
sourceTQueue
, sinkTQueue
, sourceTBQueue
, sinkTBQueue
, sourceTMQueue
, sinkTMQueue
, sourceTBMQueue
, sinkTBMQueue
, module Control.Concurrent.STM.TQueue
) where
import Control.Concurrent.STM
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TBMQueue
import Control.Concurrent.STM.TMQueue
import Control.Monad.IO.Class
import Data.Conduit
import Data.Conduit.Internal
sourceTQueue :: MonadIO m => TQueue a -> Source m a
sourceTQueue q = ConduitM src
where src = PipeM pull
pull = do x <- liftSTM $ readTQueue q
return $ HaveOutput src close x
close = return ()
sinkTQueue :: MonadIO m => TQueue a -> Sink a m ()
sinkTQueue q = ConduitM src
where src = sink
sink = NeedInput push close
push input = PipeM ((liftSTM $ writeTQueue q input)
>> (return $ NeedInput push close))
close _ = return ()
sourceTBQueue :: MonadIO m => TBQueue a -> Source m a
sourceTBQueue q = ConduitM src
where src = PipeM pull
pull = do x <- liftSTM $ readTBQueue q
return $ HaveOutput src close x
close = return ()
sinkTBQueue :: MonadIO m => TBQueue a -> Sink a m ()
sinkTBQueue q = ConduitM src
where src = sink
sink = NeedInput push close
push input = PipeM ((liftSTM $ writeTBQueue q input)
>> (return $ NeedInput push close))
close _ = return ()
sourceTMQueue :: MonadIO m => TMQueue a -> Source m a
sourceTMQueue q = ConduitM src
where src = PipeM pull
pull = do mx <- liftSTM $ readTMQueue q
case mx of
Nothing -> return $ Done ()
Just x -> return $ HaveOutput src close x
close = do liftSTM $ closeTMQueue q
return ()
sinkTMQueue :: MonadIO m => TMQueue a -> Sink a m ()
sinkTMQueue q = ConduitM src
where src = sink
sink = NeedInput push close
push input = PipeM ((liftSTM $ writeTMQueue q input)
>> (return $ NeedInput push close))
close _ = do liftSTM $ closeTMQueue q
return ()
sourceTBMQueue :: MonadIO m => TBMQueue a -> Source m a
sourceTBMQueue q = ConduitM src
where src = PipeM pull
pull = do mx <- liftSTM $ readTBMQueue q
case mx of
Nothing -> return $ Done ()
Just x -> return $ HaveOutput src close x
close = do liftSTM $ closeTBMQueue q
return ()
sinkTBMQueue :: MonadIO m => TBMQueue a -> Sink a m ()
sinkTBMQueue q = ConduitM src
where src = sink
sink = NeedInput push close
push input = PipeM ((liftSTM $ writeTBMQueue q input)
>> (return $ NeedInput push close))
close _ = do liftSTM $ closeTBMQueue q
return ()
liftSTM :: forall (m :: * -> *) a. MonadIO m => STM a -> m a
liftSTM = liftIO . atomically