Ticket #4850: ConcurrentSet.hs

File ConcurrentSet.hs, 2.8 KB (added by NeilMitchell, 3 years ago)
Line 
1-- | A set that elements can be added to and remove from concurrently.
2--
3-- The main difference between this and a queue is that 'ConcurrentSet' does not
4-- make any guarantees about the order in which things will come out -- in fact,
5-- it will go out of its way to make sure that they are unordered!
6--
7-- The reason that I use this primitive rather than 'Chan' is that:
8--   1) At Standard Chartered we saw intermitted deadlocks when using 'Chan',
9--      but Neil tells me that he stopped seeing them when they moved to a 'ConcurrentSet'
10--      like thing. We never found the reason for the deadlocks though...
11--   2) It's better to dequeue parallel tasks in pseudo random order for many
12--      common applications, because (e.g. in Shake) lots of tasks that require the same
13--      machine resources (i.e. CPU or RAM) tend to be next to each other in the list.
14--      Thus, reducing access locality means that we tend to choose tasks that require
15--      different resources.
16module ConcurrentSet (
17    ConcurrentSet, new, insert, delete
18  ) where
19
20import Control.Concurrent.MVar
21import Control.Monad
22
23import qualified Data.IntMap as IM
24
25import System.Random
26
27
28data ConcurrentSet a = CS (MVar (StdGen, Either (MVar ()) (IM.IntMap a)))
29
30new :: IO (ConcurrentSet a)
31new = fmap CS $ liftM2 (\gen mvar -> (gen, Left mvar)) newStdGen newEmptyMVar >>= newMVar
32
33insert :: ConcurrentSet a -> a -> IO ()
34insert (CS set_mvar) x = modifyMVar_ set_mvar go
35  where go (gen, ei_mvar_ys) = do
36            let (i, gen') = random gen
37            case ei_mvar_ys of
38              Left wait_mvar -> do
39                -- Wake up all waiters (if any): any one of them may want this item
40                putMVar wait_mvar ()
41                return (gen', Right (IM.singleton i x))
42              Right ys -> return (gen', Right (IM.insert i x ys))
43
44delete :: ConcurrentSet a -> IO a
45delete (CS set_mvar) = loop
46  where
47    loop = do
48        ei_wait_x <- modifyMVar set_mvar go
49        case ei_wait_x of
50            Left wait_mvar -> do
51                -- NB: it's very important that we don't do this while we are holding the set_mvar!
52                takeMVar wait_mvar
53                -- Someone put data in the MVar, but we might have to wait again if someone snaffles
54                -- it before we got there.
55                --
56                -- TODO: make this fairer -- there is definite starvation potential here, though it
57                -- doesn't matter for the application I have in mind (Shake)
58                loop
59            Right x -> return x
60   
61    go (gen, Left wait_mvar) = return ((gen, Left wait_mvar), Left wait_mvar)
62    go (gen, Right xs) = do
63        let (chosen, xs') = IM.deleteFindMin xs
64        new_value <- if IM.null xs'
65                      then fmap Left newEmptyMVar
66                      else return (Right xs')
67        return ((gen, new_value), Right chosen)