module Control.Concurrent.Datastructures.BlockingConcurrentQueue where
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Concurrent.Datastructures.ThreadWaitQueue
data BlockingConcurrentQueue a = BlockingConcurrentQueue { queue :: MVar [a]
, threadWaitQueue :: ThreadWaitQueue
}
instance Show (BlockingConcurrentQueue a) where
show _ = "BlockingConcurrentQueue"
showBlockingConcurrentQueue :: (Show a) => BlockingConcurrentQueue a -> IO [Char]
showBlockingConcurrentQueue blockingConcurrentQueue
= do
els <- readAllFromBlockingConcurrentQueue blockingConcurrentQueue
return (show els)
createBlockingConcurrentQueue :: IO (BlockingConcurrentQueue a)
createBlockingConcurrentQueue
= do
newQueue <- newMVar []
threadWaitQueue <- createWaitQueue
return (BlockingConcurrentQueue newQueue threadWaitQueue)
putInBlockingConcurrentQueue :: BlockingConcurrentQueue a -> a -> IO ()
putInBlockingConcurrentQueue blockingConcurrentQueue el
= do
oldQueue <- takeMVar (queue blockingConcurrentQueue)
putMVar (queue blockingConcurrentQueue) (oldQueue ++ [el])
openAndRecloseWaitQueue (threadWaitQueue blockingConcurrentQueue)
putAllInBlockingConcurrentQueue :: BlockingConcurrentQueue a -> [a] -> IO ()
putAllInBlockingConcurrentQueue blockingConcurrentQueue els
= do
oldQueue <- takeMVar (queue blockingConcurrentQueue)
putMVar (queue blockingConcurrentQueue) (oldQueue ++ els)
openAndRecloseWaitQueue (threadWaitQueue blockingConcurrentQueue)
takeFromBlockingConcurrentQueue :: BlockingConcurrentQueue a -> IO a
takeFromBlockingConcurrentQueue blockingConcurrentQueue
= do
oldQueue <- takeMVar (queue blockingConcurrentQueue)
case oldQueue of
[] -> do
queueTicket <- getQueueTicket (threadWaitQueue blockingConcurrentQueue)
putMVar (queue blockingConcurrentQueue) []
enterWaitQueueWithTicket queueTicket
takeFromBlockingConcurrentQueue blockingConcurrentQueue
(el:els) -> do
putMVar (queue blockingConcurrentQueue) els
return el
takeAllFromBlockingConcurrentQueue :: BlockingConcurrentQueue a -> IO [a]
takeAllFromBlockingConcurrentQueue blockingConcurrentQueue
= do
oldQueue <- takeMVar (queue blockingConcurrentQueue)
case oldQueue of
[] -> do
queueTicket <- getQueueTicket (threadWaitQueue blockingConcurrentQueue)
putMVar (queue blockingConcurrentQueue) []
enterWaitQueueWithTicket queueTicket
takeAllFromBlockingConcurrentQueue blockingConcurrentQueue
(el:els) -> do
putMVar (queue blockingConcurrentQueue) []
return (el:els)
readAllFromBlockingConcurrentQueue :: BlockingConcurrentQueue a -> IO [a]
readAllFromBlockingConcurrentQueue blockingConcurrentQueue
= readMVar (queue blockingConcurrentQueue)