-- | A set that elements can be added to and remove from concurrently.
--
-- The main difference between this and a queue is that 'ConcurrentSet' does not
-- make any guarantees about the order in which things will come out -- in fact,
-- it will go out of its way to make sure that they are unordered!
--
-- The reason that I use this primitive rather than 'Chan' is that:
--   1) At Standard Chartered we saw intermitted deadlocks when using 'Chan',
--      but Neil tells me that he stopped seeing them when they moved to a 'ConcurrentSet'
--      like thing. We never found the reason for the deadlocks though...
--   2) It's better to dequeue parallel tasks in pseudo random order for many
--      common applications, because (e.g. in Shake) lots of tasks that require the same
--      machine resources (i.e. CPU or RAM) tend to be next to each other in the list.
--      Thus, reducing access locality means that we tend to choose tasks that require
--      different resources.
module Control.Concurrent.ParallelIO.ConcurrentSet (
    ConcurrentSet, new, insert, delete
  ) where

import Control.Concurrent.MVar
import Control.Monad

import qualified Data.IntMap as IM

import System.Random


data ConcurrentSet a = CS (MVar (StdGen, Either (MVar ()) (IM.IntMap a)))

new :: IO (ConcurrentSet a)
new = fmap CS $ liftM2 (\gen mvar -> (gen, Left mvar)) newStdGen newEmptyMVar >>= newMVar

insert :: ConcurrentSet a -> a -> IO ()
insert (CS set_mvar) x = modifyMVar_ set_mvar go
  where go (gen, ei_mvar_ys) = do
            let (i, gen') = random gen
            case ei_mvar_ys of
              Left wait_mvar -> do
                -- Wake up all waiters (if any): any one of them may want this item
                putMVar wait_mvar ()
                return (gen', Right (IM.singleton i x))
              Right ys -> return (gen', Right (IM.insert i x ys))

delete :: ConcurrentSet a -> IO a
delete (CS set_mvar) = loop
  where
    loop = do
        ei_wait_x <- modifyMVar set_mvar go
        case ei_wait_x of
            Left wait_mvar -> do
                -- NB: it's very important that we don't do this while we are holding the set_mvar!
                takeMVar wait_mvar
                -- Someone put data in the MVar, but we might have to wait again if someone snaffles
                -- it before we got there.
                --
                -- TODO: make this fairer -- there is definite starvation potential here, though it
                -- doesn't matter for the application I have in mind (Shake)
                loop
            Right x -> return x
    
    go (gen, Left wait_mvar) = return ((gen, Left wait_mvar), Left wait_mvar)
    go (gen, Right xs) = do
        let (chosen, xs') = IM.deleteFindMin xs
        new_value <- if IM.null xs'
                      then fmap Left newEmptyMVar
                      else return (Right xs')
        return ((gen, new_value), Right chosen)