module Data.MQueue.ChanQ (ChanQ, dupChanQ, pipeChanQ) where
import Data.MQueue.Class
import Control.Concurrent.MVar
import Control.Monad.Trans
import Control.Monad
import Data.Maybe
data ChanQ a = ChanQ !(MVar (Stream a)) !(MVar (Stream a))
type Stream a = MVar (ChItem a)
data ChItem a = ChItem a (Stream a)
instance MonadIO m => MQueue (ChanQ a) m where
type MQueueKey (ChanQ a) = a
newQueue = liftIO $ do hole <- newEmptyMVar
liftM2 ChanQ (newMVar hole) (newMVar hole)
push (ChanQ _ writeVar) x = liftIO $ do
new_hole <- newEmptyMVar
modifyMVar_ writeVar (\ old_hole -> putMVar old_hole (ChItem x new_hole) >> return new_hole)
pop (ChanQ readVar _) = liftIO $ modifyMVar readVar $ \ read_end -> tryTakeMVar read_end >>= maybe (return (read_end, Nothing))
(\ end@(ChItem x new_read_end) -> putMVar read_end end >> return (new_read_end, Just x))
peek (ChanQ readVar _) = liftIO $ withMVar readVar $ \ read_end -> tryTakeMVar read_end >>= maybe (return Nothing)
(\ end@(ChItem x new_read_end) -> putMVar read_end end >> return (Just x))
isEmpty (ChanQ readVar writeVar) = liftIO $ withMVar readVar $ \ r -> withMVar writeVar $ \ w -> return $! r == w
dupChanQ :: ChanQ a -> IO (ChanQ a)
dupChanQ (ChanQ _ writeVar) = do
hole <- readMVar writeVar
newReadVar <- newMVar hole
return (ChanQ newReadVar writeVar)
pipeChanQ :: ChanQ a -> ChanQ a -> IO ()
pipeChanQ (ChanQ readVar1 writeVar1) (ChanQ _ writeVar2) = do
old_write_var <- takeMVar writeVar2
old_read_var <- readMVar readVar1
modifyMVar_ old_write_var (\ (ChItem x _) -> return (ChItem x old_read_var))
readMVar writeVar1 >>= putMVar writeVar2