module Control.Concurrent.ParallelIO.ConcurrentCollection ( ConcurrentSet, Chan, ConcurrentCollection(..) ) where import Control.Concurrent.ParallelIO.Compat import Control.Concurrent.MVar import Control.Concurrent.Chan import Control.Monad import qualified Data.IntMap as IM import System.Random class ConcurrentCollection p where new :: IO (p a) insert :: p a -> a -> IO () delete :: p a -> IO a -- | A set that elements can be added to and remove from concurrently. -- -- The main difference between this and a queue is that 'ConcurrentSet' does not -- make any guarantees about the order in which things will come out -- in fact, -- it will go out of its way to make sure that they are unordered! -- -- The reason that I use this primitive rather than 'Chan' is that: -- 1) At Standard Chartered we saw intermitted deadlocks when using 'Chan', -- but Neil tells me that he stopped seeing them when they moved to a 'ConcurrentSet' -- like thing. We never found the reason for the deadlocks though... -- 2) It's better to dequeue parallel tasks in pseudo random order for many -- common applications, because (e.g. in Shake) lots of tasks that require the same -- machine resources (i.e. CPU or RAM) tend to be next to each other in the list. -- Thus, reducing access locality means that we tend to choose tasks that require -- different resources. data ConcurrentSet a = CS (MVar (StdGen, Contents (IM.IntMap a))) data Contents a = EmptyWithWaiters (MVar ()) | NonEmpty a instance ConcurrentCollection ConcurrentSet where new = fmap CS $ liftM2 (\gen mvar -> (gen, EmptyWithWaiters mvar)) newStdGen newEmptyMVar >>= newMVar -- We don't mask asynchronous exceptions here because it's OK if we signal the wait_mvar -- but the set still doesn't contain anything: the readers (i.e. in "delete") will just -- discover that and start waiting again, just as if another thread had deleted before -- they got a chance to read from a newly non-empty set insert (CS set_mvar) x = modifyMVar_ set_mvar go where go (gen, contents) = do let (i, gen') = random gen case contents of EmptyWithWaiters wait_mvar -> do -- Wake up all waiters (if any): any one of them may want this item -- -- NB: we don't use putMvar here (even though it would be safe) because -- this way I get an obvious exception if I've done something daft. True <- tryPutMVar wait_mvar () return (gen', NonEmpty (IM.singleton i x)) NonEmpty ys -> return (gen', NonEmpty (IM.insert i x ys)) delete (CS set_mvar) = loop where loop = do contents <- modifyMVar set_mvar peek_inside case contents of EmptyWithWaiters wait_mvar -> do -- NB: it's very important that we don't do this while we are holding the set_mvar! -- -- We are careful to readMVar here rather than takeMVar, because *there may be more -- than one waiter*. This does lead to a bit of a scrummage, because every single -- waiter will get woken up and go for newly-added data simultaneously, but the alternative -- is disconcertingly subtle. () <- readMVar wait_mvar -- Someone put data in the MVar, but we might have to wait again if someone snaffles -- it before we got there. -- -- TODO: make this fairer -- there is definite starvation potential here, though it -- doesn't matter for the application I have in mind (Shake) loop NonEmpty x -> return x peek_inside (gen, EmptyWithWaiters wait_mvar) = return ((gen, EmptyWithWaiters wait_mvar), EmptyWithWaiters wait_mvar) peek_inside (gen, NonEmpty xs) = do let (chosen, xs') = IM.deleteFindMin xs new_value <- if IM.null xs' then fmap EmptyWithWaiters newEmptyMVar else return (NonEmpty xs') return ((gen, new_value), NonEmpty chosen) instance ConcurrentCollection Chan where new = newChan insert = writeChan delete = readChan