module Coadjute.Task.Perform (performTasks) where
import Control.Monad (foldM, foldM_, forM_, unless, when)
import Control.Concurrent (forkIO)
import Control.Concurrent.Chan (Chan, newChan, readChan, writeChan)
import Control.Concurrent.MVar (MVar, newEmptyMVar, newMVar, takeMVar, putMVar)
import Control.Concurrent.QSem (QSem, newQSem, waitQSem, signalQSem)
import qualified Control.Exception as C
import Data.Array.IO (IOUArray, newArray, readArray, writeArray)
import Data.Graph.Inductive (Node, pre, suc, nodes, lab, noNodes, nodeRange)
import Data.IntSet (IntSet)
import qualified Data.IntSet as Set
import Data.List (intercalate)
import Data.Maybe (fromJust)
import System.Directory (createDirectoryIfMissing)
import System.IO (hPutStrLn, stderr)
import System.FilePath (dropFileName)
import Text.Printf (printf, hPrintf)
#ifdef __GLASGOW_HASKELL__
import GHC.Conc (numCapabilities)
#endif
import Coadjute.CoData
import Coadjute.Task (Task(tTargets, tAction), TaskGraph)
import Coadjute.Util.Misc (plural)
performTasks :: TaskGraph -> CoData ()
performTasks gr = do
parallel <- asks coParallel
if parallel
then parallelPerformTasks gr
else linearPerformTasks gr
targetString :: Task -> String
targetString = intercalate "," . tTargets
printMessage :: Verbosity -> Task -> IO ()
printMessage v = when (v >= VeryVerbose) . printf "%s...\n" . targetString
performTask :: Task -> IO String
performTask t = do
mapM_ (createDirectoryIfMissing True . dropFileName) (tTargets t)
(tAction t >> return "") `C.catch` handler
where
handler :: C.SomeException -> IO String
handler =
return . printf "%s failed! %s" (targetString t) . show
roots :: TaskGraph -> [Node]
roots graph = filter (null . pre graph) (nodes graph)
linearPerformTasks :: TaskGraph -> CoData ()
linearPerformTasks gr = do
verbosity <- asks coVerbosity
let n = noNodes gr
when (verbosity >= Verbose).io $ printf
"Performing %d tasks linearly.\n" n
io $ foldM_ (f verbosity) Set.empty (roots gr)
where
f :: Verbosity -> IntSet -> Node -> IO IntSet
f verbosity visited x =
if x `Set.member` visited
then return visited
else do
vs <- foldM (f verbosity) visited (suc gr x)
let task = fromJust . lab gr $ x
printMessage verbosity task
s <- performTask task
when (not.null $ s) $ hPutStrLn stderr s
return (Set.insert x vs)
data Message = KeepGoing Task (Chan ())
| ErrorMessage String
| Done
type ManagerChan = Chan Message
type VisitedArray = IOUArray Node Bool
parallelPerformTasks :: TaskGraph -> CoData ()
parallelPerformTasks gr = do
opt <- asks coParallelOpt
verbosity <- asks coVerbosity
let cap = case opt of
Uncapped -> Nothing
Capped c -> Just c
#ifdef __GLASGOW_HASKELL__
Processor -> Just numCapabilities
#endif
io$ do
when (verbosity >= Verbose) $ do
let n = noNodes gr
printf "Performing %d task%s in parallel, " n (plural n) :: IO ()
case cap of
Nothing -> putStrLn "with no limit to the simultaneity."
Just c -> printf "running at most %d at a time.\n" c
arr <- newArray (nodeRange gr) False
visited <- newMVar arr
managerDone <- newEmptyMVar
managerChan <- newChan
_ <- forkIO $ do
manage verbosity cap managerChan
putMVar managerDone ()
let rs = roots gr
doneChan <- fork gr visited managerChan rs
waitFor rs doneChan
writeChan managerChan Done
takeMVar managerDone
manage :: Verbosity -> Maybe Int -> ManagerChan -> IO ()
manage verbosity limit chan =
case limit of
Nothing ->
doManagement Nothing undefined undefined
Just cap -> do
running <- newQSem cap
doManagement (Just running) waitQSem signalQSem
where
doManagement :: Maybe QSem -> (QSem -> IO ()) -> (QSem -> IO ()) -> IO ()
doManagement cap wait signal = do
let
loop n = do
msg <- readChan chan
case msg of
Done -> return n
ErrorMessage err -> do
hPutStrLn stderr err
loop (n+1)
KeepGoing task doneChan -> do
maybe (return ()) wait cap
printMessage verbosity task
_ <- forkIO $ do
s <- performTask task
when (not.null $ s) (writeChan chan (ErrorMessage s))
writeChan doneChan ()
maybe (return ()) signal cap
loop n
n <- loop (0 :: Int)
when (n /= 0) $ hPrintf stderr "%d error%s occurred!\n" n (plural n)
fork :: TaskGraph -> MVar VisitedArray -> ManagerChan -> [Node] -> IO (Chan ())
fork gr visited manager xs = do
doneChan <- newChan
forM_ xs (forkIO . traverse gr visited manager doneChan)
return doneChan
waitFor :: [Node] -> Chan () -> IO ()
waitFor xs = forM_ xs . const . readChan
traverse :: TaskGraph -> MVar VisitedArray -> ManagerChan
-> Chan () -> Node -> IO ()
traverse gr visited manager doneChan x = do
vis <- takeMVar visited
been <- readArray vis x
unless been (writeArray vis x True)
putMVar visited vis
unless been $ do
let nexts = suc gr x
task = fromJust . lab gr $ x
unless (null nexts) $ do
let (n:ns) = nexts
doneChan' <- fork gr visited manager ns
traverse gr visited manager doneChan' n
waitFor ns doneChan'
writeChan manager (KeepGoing task doneChan)