module Development.Shake.Pool(Pool, addPool, blockPool, runPool) where
import Control.Concurrent
import Control.Exception hiding (blocked)
import Development.Shake.Locks
import qualified Data.HashSet as Set
data SuperQueue a = SuperQueue [a] [a]
newSuperQueue :: SuperQueue a
newSuperQueue = SuperQueue [] []
enqueuePriority :: a -> SuperQueue a -> SuperQueue a
enqueuePriority x (SuperQueue p n) = SuperQueue (x:p) n
enqueue :: a -> SuperQueue a -> SuperQueue a
enqueue x (SuperQueue p n) = SuperQueue p (x:n)
dequeue :: SuperQueue a -> Maybe (a, SuperQueue a)
dequeue (SuperQueue (p:ps) ns) = Just (p, SuperQueue ps ns)
dequeue (SuperQueue [] (n:ns)) = Just (n, SuperQueue [] ns)
dequeue (SuperQueue [] []) = Nothing
data Pool = Pool Int (Var (Maybe S)) (Barrier (Maybe SomeException))
data S = S
{threads :: Set.HashSet ThreadId
,working :: Int
,blocked :: Int
,todo :: SuperQueue (IO ())
}
emptyS :: S
emptyS = S Set.empty 0 0 newSuperQueue
step :: Pool -> (S -> S) -> IO ()
step pool@(Pool n var done) op = do
let onVar act = modifyVar_ var $ maybe (return Nothing) act
onVar $ \s -> do
s <- return $ op s
case dequeue (todo s) of
Just (now, todo2) | working s < n -> do
t <- forkIO $ do
t <- myThreadId
res <- try now
case res of
Left e -> onVar $ \s -> do
mapM_ killThread $ Set.toList $ Set.delete t $ threads s
signalBarrier done $ Just e
return Nothing
Right _ -> step pool $ \s -> s{working = working s 1, threads = Set.delete t $ threads s}
return $ Just s{working = working s + 1, todo = todo2, threads = Set.insert t $ threads s}
Nothing | working s == 0 && blocked s == 0 -> do
signalBarrier done Nothing
return Nothing
_ -> return $ Just s
addPool :: Pool -> IO a -> IO ()
addPool pool act = step pool $ \s -> s{todo = enqueue (act >> return ()) (todo s)}
blockPool :: Pool -> IO a -> IO a
blockPool pool act = do
step pool $ \s -> s{working = working s 1, blocked = blocked s + 1}
res <- act
var <- newBarrier
let act = do
step pool $ \s -> s{working = working s + 1, blocked = blocked s 1}
signalBarrier var ()
step pool $ \s -> s{todo = enqueuePriority act $ todo s}
waitBarrier var
return res
runPool :: Int -> (Pool -> IO ()) -> IO ()
runPool n act = do
s <- newVar $ Just emptyS
res <- newBarrier
let pool = Pool n s res
addPool pool $ act pool
res <- waitBarrier res
case res of
Nothing -> return ()
Just e -> throw e