2007 Sep 15

The author disclaims copyright to this source. In place of a legal
notice, here is a blessing:

   May you do good and not evil.
   May you find forgiveness for yourself and forgive others.
   May you share freely, never taking more than you give.

(after the sqlite source code)


> module Control.Hmk.Concurrent where
> import {-# SOURCE #-} Control.Hmk
> import Control.Concurrent
> import Control.Monad.State
> import Control.Applicative
> import qualified Data.Map as Map

Map the dependency graph to a tree of concurrent threads, where each thread
encapsulates a recipe to execute after having waited for the completion of the
threads of each prerequesite. It is up to the runtime to schedule the
execution of the threads as it best sees fit.

However, interleaving the execution of potentially many thousands threads
simultaneously, which are usually all I/O bound, is a bad idea. So we do
enforce some rate limiting of the number of threads that can run
simultaneously by making all threads wait on the same quantity semaphore. The
quantity semaphore can be thought of as making available a fixed amount of
slots. So long as slots remain, threads may take a slot and run. But if slots
run out, the remaining threads must wait until existing threads have finished
their job.

Because one cannot wait for a thread to finish directly, we signal job
completion by means of MVar's. A finished thread writes some inessential value
to its MVar, which unblocks threads waiting on that MVar.

Note that there are two distinct monadic levels here. One level is the IO
monad to which a state transformer is applied. This level constructs and
combines monadic values that form the process tree. This process tree is then
pulled out of the state transformed IO monad and executed.

> data Done = Done
> processTree :: (Ord a, Show a) => Int -> DepGraph IO a -> IO ()
> processTree slots gr = do
>   sem <- newQSem slots
>   join $ evalStateT (sequence_ <$> mapM (aux sem) gr) Map.empty
>     where aux sem (Node x ps rule) = do
>             ws <- mapM (wait sem) ps
>             signal <- liftIO $ newEmptyMVar
>             modify (Map.insert x signal)
>             return $ do
>                 sequence ws
>                 waitQSem sem
>                 result <- maybe (return TaskSuccess) ($ prereqs rule) (recipe rule)
>                 case result of
>                   TaskSuccess -> do
>                     signalQSem sem
>                     putMVar signal Done
>                   TaskFailure -> error $ "Recipe for " ++ show x ++ " failed."
>           wait sem n@(Node x _ _) = do
>             seen <- get
>             case Map.lookup x seen of
>               Just signal -> return $ do
>                 readMVar signal
>                 return ()
>               Nothing -> do
>                 work <- aux sem n
>                 signal <- (Map.! x) <$> get
>                 return $ do
>                   tid <- forkIO work
>                   readMVar signal
>                   killThread tid