| 1 | -- | A set that elements can be added to and remove from concurrently. |
|---|
| 2 | -- |
|---|
| 3 | -- The main difference between this and a queue is that 'ConcurrentSet' does not |
|---|
| 4 | -- make any guarantees about the order in which things will come out -- in fact, |
|---|
| 5 | -- it will go out of its way to make sure that they are unordered! |
|---|
| 6 | -- |
|---|
| 7 | -- The reason that I use this primitive rather than 'Chan' is that: |
|---|
| 8 | -- 1) At Standard Chartered we saw intermitted deadlocks when using 'Chan', |
|---|
| 9 | -- but Neil tells me that he stopped seeing them when they moved to a 'ConcurrentSet' |
|---|
| 10 | -- like thing. We never found the reason for the deadlocks though... |
|---|
| 11 | -- 2) It's better to dequeue parallel tasks in pseudo random order for many |
|---|
| 12 | -- common applications, because (e.g. in Shake) lots of tasks that require the same |
|---|
| 13 | -- machine resources (i.e. CPU or RAM) tend to be next to each other in the list. |
|---|
| 14 | -- Thus, reducing access locality means that we tend to choose tasks that require |
|---|
| 15 | -- different resources. |
|---|
| 16 | module ConcurrentSet ( |
|---|
| 17 | ConcurrentSet, new, insert, delete |
|---|
| 18 | ) where |
|---|
| 19 | |
|---|
| 20 | import Control.Concurrent.MVar |
|---|
| 21 | import Control.Monad |
|---|
| 22 | |
|---|
| 23 | import qualified Data.IntMap as IM |
|---|
| 24 | |
|---|
| 25 | import System.Random |
|---|
| 26 | |
|---|
| 27 | |
|---|
| 28 | data ConcurrentSet a = CS (MVar (StdGen, Either (MVar ()) (IM.IntMap a))) |
|---|
| 29 | |
|---|
| 30 | new :: IO (ConcurrentSet a) |
|---|
| 31 | new = fmap CS $ liftM2 (\gen mvar -> (gen, Left mvar)) newStdGen newEmptyMVar >>= newMVar |
|---|
| 32 | |
|---|
| 33 | insert :: ConcurrentSet a -> a -> IO () |
|---|
| 34 | insert (CS set_mvar) x = modifyMVar_ set_mvar go |
|---|
| 35 | where go (gen, ei_mvar_ys) = do |
|---|
| 36 | let (i, gen') = random gen |
|---|
| 37 | case ei_mvar_ys of |
|---|
| 38 | Left wait_mvar -> do |
|---|
| 39 | -- Wake up all waiters (if any): any one of them may want this item |
|---|
| 40 | putMVar wait_mvar () |
|---|
| 41 | return (gen', Right (IM.singleton i x)) |
|---|
| 42 | Right ys -> return (gen', Right (IM.insert i x ys)) |
|---|
| 43 | |
|---|
| 44 | delete :: ConcurrentSet a -> IO a |
|---|
| 45 | delete (CS set_mvar) = loop |
|---|
| 46 | where |
|---|
| 47 | loop = do |
|---|
| 48 | ei_wait_x <- modifyMVar set_mvar go |
|---|
| 49 | case ei_wait_x of |
|---|
| 50 | Left wait_mvar -> do |
|---|
| 51 | -- NB: it's very important that we don't do this while we are holding the set_mvar! |
|---|
| 52 | takeMVar wait_mvar |
|---|
| 53 | -- Someone put data in the MVar, but we might have to wait again if someone snaffles |
|---|
| 54 | -- it before we got there. |
|---|
| 55 | -- |
|---|
| 56 | -- TODO: make this fairer -- there is definite starvation potential here, though it |
|---|
| 57 | -- doesn't matter for the application I have in mind (Shake) |
|---|
| 58 | loop |
|---|
| 59 | Right x -> return x |
|---|
| 60 | |
|---|
| 61 | go (gen, Left wait_mvar) = return ((gen, Left wait_mvar), Left wait_mvar) |
|---|
| 62 | go (gen, Right xs) = do |
|---|
| 63 | let (chosen, xs') = IM.deleteFindMin xs |
|---|
| 64 | new_value <- if IM.null xs' |
|---|
| 65 | then fmap Left newEmptyMVar |
|---|
| 66 | else return (Right xs') |
|---|
| 67 | return ((gen, new_value), Right chosen) |
|---|