module Control.Concurrent.Datastructures.BlockingConcurrentQueue where



import Control.Concurrent

import Control.Concurrent.MVar

import Control.Concurrent.Datastructures.ThreadWaitQueue





data BlockingConcurrentQueue a = BlockingConcurrentQueue { queue :: MVar [a]

                                                         , threadWaitQueue :: ThreadWaitQueue -- Used to signal when to try again if the queue was empty and we wanted to take one

                                                         }



instance Show (BlockingConcurrentQueue a) where

    show _ = "BlockingConcurrentQueue"

                                 



showBlockingConcurrentQueue :: (Show a) => BlockingConcurrentQueue a -> IO [Char]

showBlockingConcurrentQueue blockingConcurrentQueue

    = do

        els <- readAllFromBlockingConcurrentQueue blockingConcurrentQueue

        return (show els)

            



createBlockingConcurrentQueue :: IO (BlockingConcurrentQueue a)

createBlockingConcurrentQueue

    = do

        newQueue <- newMVar []

        threadWaitQueue <- createWaitQueue

        return (BlockingConcurrentQueue newQueue threadWaitQueue)



                     

putInBlockingConcurrentQueue :: BlockingConcurrentQueue a -> a -> IO ()

putInBlockingConcurrentQueue blockingConcurrentQueue el

    = do

        oldQueue <- takeMVar (queue blockingConcurrentQueue)

        putMVar (queue blockingConcurrentQueue) (oldQueue ++ [el])

        openAndRecloseWaitQueue (threadWaitQueue blockingConcurrentQueue)



        

putAllInBlockingConcurrentQueue :: BlockingConcurrentQueue a -> [a] -> IO ()

putAllInBlockingConcurrentQueue blockingConcurrentQueue els

    = do

        oldQueue <- takeMVar (queue blockingConcurrentQueue)

        putMVar (queue blockingConcurrentQueue) (oldQueue ++ els)

        openAndRecloseWaitQueue (threadWaitQueue blockingConcurrentQueue)



        

takeFromBlockingConcurrentQueue :: BlockingConcurrentQueue a -> IO a

takeFromBlockingConcurrentQueue blockingConcurrentQueue

    = do

        oldQueue <- takeMVar (queue blockingConcurrentQueue)

        case oldQueue of

            []       -> do

                            queueTicket <- getQueueTicket (threadWaitQueue blockingConcurrentQueue)

                            putMVar (queue blockingConcurrentQueue) []

                            enterWaitQueueWithTicket queueTicket

                            takeFromBlockingConcurrentQueue blockingConcurrentQueue

            (el:els) -> do

                            putMVar (queue blockingConcurrentQueue) els

                            return el



                            

takeAllFromBlockingConcurrentQueue :: BlockingConcurrentQueue a -> IO [a]

takeAllFromBlockingConcurrentQueue blockingConcurrentQueue

    = do

        oldQueue <- takeMVar (queue blockingConcurrentQueue)

        case oldQueue of

            []       -> do

                            queueTicket <- getQueueTicket (threadWaitQueue blockingConcurrentQueue)

                            putMVar (queue blockingConcurrentQueue) []

                            enterWaitQueueWithTicket queueTicket

                            takeAllFromBlockingConcurrentQueue blockingConcurrentQueue

            (el:els) -> do

                            putMVar (queue blockingConcurrentQueue) []

                            return (el:els)

        

    

readAllFromBlockingConcurrentQueue :: BlockingConcurrentQueue a -> IO [a]

readAllFromBlockingConcurrentQueue blockingConcurrentQueue

    = readMVar (queue blockingConcurrentQueue)