module Utils.Parallel where
import Data.IORef
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception as E
import Control.Monad
import Utils.Monad
import Utils.File
myForkIO io = do
mvar <- newEmptyMVar
forkIO (io `finally` (putMVar mvar ()) )
return mvar
tvarTake x n = do
l <- readTVar x
let (res,rem) = (take n l,drop n l)
writeTVar x rem
return res
tvarPop x = do
l <- readTVar x
writeTVar x $ tail l
return $ head l
tvarPush x t = do
l <- readTVar x
writeTVar x $ l ++ [t]
return $ head l
liftT f = \var -> do
v <- readTVar var
f v
data TaskRunnerStrategy = Persistent | Failing deriving(Eq,Show)
taskRunner :: TaskRunnerStrategy -> TVar Bool -> TVar [IO ()] -> IO ()
taskRunner s terminated tasks = complete (Just $ return ())
where
catch :: IO () -> IOException -> IO ()
catch t e = case s of
Persistent -> do
atomically (tvarPush tasks t)
strictAppendFile "PAR-RUN-ERRORS" ("persisting: "++show e++"\n")
`E.catch` (\err -> error $"Error adding to PAR-RUN-ERRORS "
++show (err:: IOException)++". Tried to add "
++show e)
Failing -> strictAppendFile "PAR-RUN-ERRORS" ("failing: "++show e++"\n")
`E.catch` (\err -> error $"Error adding to PAR-RUN-ERRORS "
++show (err:: IOException)++". Tried to add "
++show e)
complete (Just task) = do
task `E.catch` (catch task)
newTask <- next
complete newTask
complete Nothing = atomically $ writeTVar terminated True
next = atomically $ do
anyLeft <- (liftT (return.not.null)) tasks
if anyLeft then tvarPop tasks
>>= \x -> return $ Just x
else return Nothing
parRun :: TaskRunnerStrategy -> Int -> [IO ()] -> IO ()
parRun s poolSize tasks = do
t <- atomically $ newTVar tasks
terminators <- replicateM poolSize $ do
terminator <- atomically $ newTVar False
myForkIO (taskRunner s terminator t)
return terminator
atomically $ do
terminated <- mapM readTVar terminators >>= return.and
check terminated
parRunWithMonitor :: TaskRunnerStrategy -> Int -> (TVar [IO ()] -> [TVar Bool] -> IO ()) -> [IO ()] -> IO ()
parRunWithMonitor s poolSize monitor tasks = do
t <- atomically $ newTVar tasks
terminators <- replicateM poolSize $ do
terminator <- atomically $ newTVar False
myForkIO (taskRunner s terminator t)
return terminator
forkIO $ (monitor t terminators)
atomically $ do
terminated <- mapM readTVar terminators >>= return.and
check terminated
fork1 :: (a -> IO b) -> a -> IO (MVar b)
fork1 f x =
do
cell <- newEmptyMVar
forkIO (do { result <- f x; putMVar cell result })
return cell
fork :: (a -> IO b) -> [a] -> IO [MVar b]
fork f = mapM (fork1 f)
joinMVars :: [MVar b] -> IO [b]
joinMVars = mapM takeMVar
forkAndJoin :: (a -> IO b) -> [a] -> IO [b]
forkAndJoin f xs = (fork f xs) >>= joinMVars