{-# Language CPP #-}
module HGraph.Parallel
( produceConsumeExhaustive
, processJobList
)
where
import HGraph.Debugging
import Control.Concurrent
import Data.Maybe
import Control.Monad
import Control.Concurrent.MVar
import Control.Exception
import System.IO (hPutStrLn, stderr)
produceConsumeExhaustive :: (IO (Maybe a)) -> (a -> IO b) -> IO (IO (Maybe b))
produceConsumeExhaustive :: forall a b. IO (Maybe a) -> (a -> IO b) -> IO (IO (Maybe b))
produceConsumeExhaustive IO (Maybe a)
produce a -> IO b
consume = do
Int
numForks <- IO Int
getNumCapabilities
MVar (Maybe a)
mProduct <- IO (MVar (Maybe a))
forall a. IO (MVar a)
newEmptyMVar
[MVar ()]
workerDone <- Int -> IO (MVar ()) -> IO [MVar ()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numForks (IO (MVar ()) -> IO [MVar ()]) -> IO (MVar ()) -> IO [MVar ()]
forall a b. (a -> b) -> a -> b
$ IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
[ThreadId]
producers <- [MVar ()] -> (MVar () -> IO ThreadId) -> IO [ThreadId]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [MVar ()]
workerDone ((MVar () -> IO ThreadId) -> IO [ThreadId])
-> (MVar () -> IO ThreadId) -> IO [ThreadId]
forall a b. (a -> b) -> a -> b
$ \MVar ()
doneM -> IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally
( let loop :: IO ()
loop = do
Maybe a
g <- String -> IO (Maybe a) -> IO (Maybe a)
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ produce
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe a -> Bool
forall a. Maybe a -> Bool
isJust Maybe a
g) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mProduct g
IO ()
loop
in IO ()
loop
)
(\Either SomeException ()
e -> do
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar doneM ()
case Either SomeException ()
e of
Left SomeException
err -> (String -> IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
sayErrString (__FILE__ ++ " on line " ++ show __LINE__
++ " produce-consume error (producer):\n\t -"
++ show (err :: SomeException)))
Right ()
res -> do
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
)
[MVar ()]
consumerDone <- Int -> IO (MVar ()) -> IO [MVar ()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numForks (IO (MVar ()) -> IO [MVar ()]) -> IO (MVar ()) -> IO [MVar ()]
forall a b. (a -> b) -> a -> b
$ IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
MVar (Maybe b)
consumerOutput <- IO (MVar (Maybe b))
forall a. IO (MVar a)
newEmptyMVar
[ThreadId]
consumers <- [MVar ()] -> (MVar () -> IO ThreadId) -> IO [ThreadId]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [MVar ()]
consumerDone ((MVar () -> IO ThreadId) -> IO [ThreadId])
-> (MVar () -> IO ThreadId) -> IO [ThreadId]
forall a b. (a -> b) -> a -> b
$ \MVar ()
doneM -> IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally
( let loop :: IO ()
loop = do
Maybe a
g <- String -> IO (Maybe a) -> IO (Maybe a)
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ takeMVar mProduct
case Maybe a
g of
Just a
p -> do
b
out <- String -> IO b -> IO b
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ consume (IO b -> IO b) -> IO b -> IO b
forall a b. (a -> b) -> a -> b
p
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar consumerOutput (Just out)
IO ()
loop
Maybe a
Nothing -> String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mProduct Nothing
in IO ()
loop
)
(\Either SomeException ()
e -> do
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar doneM ()
case Either SomeException ()
e of
Left SomeException
err -> (String -> IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
sayErrString (__FILE__ ++ " on line " ++ show __LINE__ ++ ": produce-consume error (consumer):\n\t- " ++ show (err :: SomeException)))
Right ()
res -> do
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
)
IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally (do
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ mapM_ takeMVar workerDone
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mProduct Nothing
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ mapM_ takeMVar consumerDone
)
(\Either SomeException ()
res -> do
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar consumerOutput Nothing
case Either SomeException ()
res of
Left SomeException
err -> String -> IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
sayErrString (__FILE__ ++ " on line " ++ show __LINE__
++ ": produce-consume error (final):\n\t- "
++ show (err :: SomeException))
Right ()
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
)
IO (Maybe b) -> IO (IO (Maybe b))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (Maybe b) -> IO (Maybe b)
forall a. MVar a -> IO a
takeMVar MVar (Maybe b)
consumerOutput)
processJobList :: (a -> IO ([a], [b])) -> [a] -> IO (IO (Maybe b))
processJobList :: forall a b. (a -> IO ([a], [b])) -> [a] -> IO (IO (Maybe b))
processJobList a -> IO ([a], [b])
worker [a]
jobs = do
Int
numForks <- IO Int
getNumCapabilities
[MVar ()]
workerDone <- Int -> IO (MVar ()) -> IO [MVar ()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numForks (IO (MVar ()) -> IO [MVar ()]) -> IO (MVar ()) -> IO [MVar ()]
forall a b. (a -> b) -> a -> b
$ IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
MVar (Maybe b)
workerOutput <- IO (MVar (Maybe b))
forall a. IO (MVar a)
newEmptyMVar
MVar ()
mReady <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
MVar [a]
mJobs <- [a] -> IO (MVar [a])
forall a. a -> IO (MVar a)
newMVar [a]
jobs
MVar Integer
mBusyCount <- Integer -> IO (MVar Integer)
forall a. a -> IO (MVar a)
newMVar Integer
0
[ThreadId]
_ <- [MVar ()] -> (MVar () -> IO ThreadId) -> IO [ThreadId]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [MVar ()]
workerDone ((MVar () -> IO ThreadId) -> IO [ThreadId])
-> (MVar () -> IO ThreadId) -> IO [ThreadId]
forall a b. (a -> b) -> a -> b
$ \MVar ()
doneM -> IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally
( let loop :: IO ()
loop = do
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ takeMVar mReady
[a]
openJobs <- String -> IO [a] -> IO [a]
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ takeMVar mJobs
Integer
busy0 <- MVar Integer -> (Integer -> IO (Integer, Integer)) -> IO Integer
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar Integer
mBusyCount (\Integer
c -> (Integer, Integer) -> IO (Integer, Integer)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ((Integer, Integer) -> IO (Integer, Integer))
-> (Integer, Integer) -> IO (Integer, Integer)
forall a b. (a -> b) -> a -> b
$ (Integer
c Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
1, Integer
c Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
1))
case [a]
openJobs of
[] -> do
MVar Integer -> (Integer -> IO Integer) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar Integer
mBusyCount (\Integer
c -> Integer -> IO Integer
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Integer -> IO Integer) -> Integer -> IO Integer
forall a b. (a -> b) -> a -> b
$ Integer
c Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
1)
if Integer
busy0 Integer -> Integer -> Bool
forall a. Eq a => a -> a -> Bool
== Integer
1 then do
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mReady ()
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mJobs []
else
IO ()
loop
(a
j : [a]
jobs') -> do
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [a] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [a]
jobs') (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mReady ()
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mJobs jobs'
([a]
newJobs, [b]
newResults) <- a -> IO ([a], [b])
worker a
j
[a]
openJobs'' <- MVar [a] -> ([a] -> IO ([a], [a])) -> IO [a]
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar [a]
mJobs (([a] -> IO ([a], [a])) -> IO [a])
-> ([a] -> IO ([a], [a])) -> IO [a]
forall a b. (a -> b) -> a -> b
$ \[a]
openJobs' -> do
([a], [a]) -> IO ([a], [a])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (([a], [a]) -> IO ([a], [a])) -> ([a], [a]) -> IO ([a], [a])
forall a b. (a -> b) -> a -> b
$ ([a]
newJobs [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
openJobs', [a]
openJobs')
Integer
busy <- MVar Integer -> (Integer -> IO (Integer, Integer)) -> IO Integer
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar Integer
mBusyCount (\Integer
c -> (Integer, Integer) -> IO (Integer, Integer)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ((Integer, Integer) -> IO (Integer, Integer))
-> (Integer, Integer) -> IO (Integer, Integer)
forall a b. (a -> b) -> a -> b
$ (Integer
c Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
1, Integer
c Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
1))
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([a] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [a]
openJobs'' Bool -> Bool -> Bool
&& ( (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [a] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [a]
newJobs) Bool -> Bool -> Bool
|| Integer
busy Integer -> Integer -> Bool
forall a. Eq a => a -> a -> Bool
== Integer
0)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mReady ()
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ mapM_ ((putMVar workerOutput) . Just) newResults
IO ()
loop
in IO ()
loop
)
(\Either SomeException ()
e -> do
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar doneM ()
case Either SomeException ()
e of
Left SomeException
err -> (String -> IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
sayErrString (__FILE__ ++ " on line " ++ show __LINE__
++ " job list error (worker):\n\t -"
++ show (err :: SomeException)))
Right ()
res -> do
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
)
IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally (do
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ mapM_ takeMVar workerDone
)
(\Either SomeException ()
res -> do
String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar workerOutput Nothing
case Either SomeException ()
res of
Left SomeException
err -> String -> IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
sayErrString (__FILE__ ++ " on line " ++ show __LINE__
++ ": job list error (end):\n\t- "
++ show (err :: SomeException))
Right ()
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
)
IO (Maybe b) -> IO (IO (Maybe b))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (Maybe b) -> IO (Maybe b)
forall a. MVar a -> IO a
takeMVar MVar (Maybe b)
workerOutput)