{-# LANGUAGE CPP #-} -- File created: 2008-05-30 19:23:00 module Coadjute.Task.Perform (performTasks) where import Control.Monad (foldM, foldM_, forM_, unless, when) import Control.Concurrent (forkIO) import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan) import Control.Concurrent.MVar (MVar, newEmptyMVar, newMVar, takeMVar, putMVar) import Control.Concurrent.QSem (QSem, newQSem, waitQSem, signalQSem) import qualified Control.Exception as C import Data.Array.IO (IOUArray, newArray, readArray, writeArray) import Data.Graph.Inductive (Node, pre, suc, nodes, lab, noNodes, nodeRange) import Data.IntSet (IntSet) import qualified Data.IntSet as Set import Data.List (intercalate) import Data.Maybe (fromJust) import System.Directory (createDirectoryIfMissing) import System.IO (hPutStrLn, stderr) import System.FilePath (dropFileName) import Text.Printf (printf, hPrintf) #ifdef __GLASGOW_HASKELL__ import GHC.Conc (numCapabilities) #endif import Coadjute.CoData import Coadjute.Task (Task(tTargets, tAction), TaskGraph) import Coadjute.Util.Misc (plural) performTasks :: TaskGraph -> CoData () performTasks gr = do parallel <- asks coParallel if parallel then parallelPerformTasks gr else linearPerformTasks gr targetString :: Task -> String targetString = intercalate "," . tTargets printMessage :: Verbosity -> Task -> IO () printMessage v = when (v >= VeryVerbose) . printf "%s...\n" . targetString performTask :: Task -> IO String performTask t = do -- create directories for a's targets if they don't exist mapM_ (createDirectoryIfMissing True . dropFileName) (tTargets t) (tAction t >> return "") `C.catch` handler where handler :: C.SomeException -> IO String handler = return . printf "%s failed! %s" (targetString t) . show -- TODO: remove the targets or we're left with something half-done -- -- somewhat nontrivial since I can't find a lib that would handle -- symlinks properly on all platforms, and System.Directory's -- removeDirectoryRecursive traverses symlinks -- roots are nodes at which no edges end roots :: TaskGraph -> [Node] roots graph = filter (null . pre graph) (nodes graph) linearPerformTasks :: TaskGraph -> CoData () linearPerformTasks gr = do verbosity <- asks coVerbosity let n = noNodes gr when (verbosity >= Verbose).io $ printf "Performing %d tasks linearly.\n" n io $ foldM_ (f verbosity) Set.empty (roots gr) where f :: Verbosity -> IntSet -> Node -> IO IntSet f verbosity visited x = if x `Set.member` visited then return visited else do vs <- foldM (f verbosity) visited (suc gr x) let task = fromJust . lab gr $ x printMessage verbosity task s <- performTask task when (not.null $ s) $ hPutStrLn stderr s return (Set.insert x vs) -- The task and the channel to notify when the action is completed data Message = KeepGoing Task (Chan ()) | ErrorMessage String | Done type ManagerChan = Chan Message type VisitedArray = IOUArray Node Bool parallelPerformTasks :: TaskGraph -> CoData () parallelPerformTasks gr = do opt <- asks coParallelOpt verbosity <- asks coVerbosity let cap = case opt of Uncapped -> Nothing Capped c -> Just c #ifdef __GLASGOW_HASKELL__ Processor -> Just numCapabilities #endif io$ do when (verbosity >= Verbose) $ do let n = noNodes gr printf "Performing %d task%s in parallel, " n (plural n) case cap of Nothing -> putStrLn "with no limit to the simultaneity." Just c -> printf "running at most %d at a time.\n" c -- setup: no nodes have been visited arr <- newArray (nodeRange gr) False visited <- newMVar arr -- set the manager thread rolling managerDone <- newEmptyMVar managerChan <- newChan forkIO $ do manage verbosity cap managerChan putMVar managerDone () -- and let's get to work. let rs = roots gr doneChan <- fork gr visited managerChan rs waitFor rs doneChan writeChan managerChan Done takeMVar managerDone manage :: Verbosity -> Maybe Int -> ManagerChan -> IO () manage verbosity limit chan = case limit of Nothing -> doManagement Nothing undefined undefined Just cap -> do running <- newQSem cap doManagement (Just running) waitQSem signalQSem where doManagement :: Maybe QSem -> (QSem -> IO ()) -> (QSem -> IO ()) -> IO () doManagement cap wait signal = do let loop n = do msg <- readChan chan case msg of Done -> return n ErrorMessage err -> do hPutStrLn stderr err loop (n+1) KeepGoing task doneChan -> do maybe (return ()) wait cap printMessage verbosity task forkIO $ do s <- performTask task when (not.null $ s) (writeChan chan (ErrorMessage s)) writeChan doneChan () maybe (return ()) signal cap loop n n <- loop (0 :: Int) when (n /= 0) $ hPrintf stderr "%d error%s occurred!\n" n (plural n) fork :: TaskGraph -> MVar VisitedArray -> ManagerChan -> [Node] -> IO (Chan ()) fork gr visited manager xs = do doneChan <- newChan forM_ xs (forkIO . traverse gr visited manager doneChan) return doneChan waitFor :: [Node] -> Chan () -> IO () waitFor xs = forM_ xs . const . readChan traverse :: TaskGraph -> MVar VisitedArray -> ManagerChan -> Chan () -> Node -> IO () traverse gr visited manager doneChan x = do -- do nothing if we've been here before -- and set that we have been here if we haven't vis <- takeMVar visited been <- readArray vis x unless been (writeArray vis x True) putMVar visited vis unless been $ do let nexts = suc gr x task = fromJust . lab gr $ x unless (null nexts) $ do let (n:ns) = nexts doneChan' <- fork gr visited manager ns traverse gr visited manager doneChan' n waitFor ns doneChan' writeChan manager (KeepGoing task doneChan)