{-# Language CPP #-}

module HGraph.Parallel
       ( produceConsumeExhaustive
       , processJobList
       )
where

import HGraph.Debugging

import Control.Concurrent
import Data.Maybe
import Control.Monad
import Control.Concurrent.MVar
import Control.Exception
import System.IO (hPutStrLn, stderr)

-- | Runs the producer until it output `Nothing`.
-- Each successful product is sent to the consumer.
-- Returns an IO action which takes the next product (or `Nothing` if there are no more products)
produceConsumeExhaustive :: (IO (Maybe a)) -> (a -> IO b) -> IO (IO (Maybe b))
produceConsumeExhaustive :: forall a b. IO (Maybe a) -> (a -> IO b) -> IO (IO (Maybe b))
produceConsumeExhaustive IO (Maybe a)
produce a -> IO b
consume = do
  Int
numForks <- IO Int
getNumCapabilities
  MVar (Maybe a)
mProduct <- IO (MVar (Maybe a))
forall a. IO (MVar a)
newEmptyMVar
  [MVar ()]
workerDone <- Int -> IO (MVar ()) -> IO [MVar ()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numForks (IO (MVar ()) -> IO [MVar ()]) -> IO (MVar ()) -> IO [MVar ()]
forall a b. (a -> b) -> a -> b
$ IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  [ThreadId]
producers <- [MVar ()] -> (MVar () -> IO ThreadId) -> IO [ThreadId]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [MVar ()]
workerDone ((MVar () -> IO ThreadId) -> IO [ThreadId])
-> (MVar () -> IO ThreadId) -> IO [ThreadId]
forall a b. (a -> b) -> a -> b
$ \MVar ()
doneM -> IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally
    ( let loop :: IO ()
loop = do
              Maybe a
g <- String -> IO (Maybe a) -> IO (Maybe a)
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ produce
              Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe a -> Bool
forall a. Maybe a -> Bool
isJust Maybe a
g) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mProduct g
                IO ()
loop
      in IO ()
loop
    )
    (\Either SomeException ()
e -> do
      String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar doneM ()
      case Either SomeException ()
e of
        Left SomeException
err -> (String -> IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
sayErrString (__FILE__ ++ " on line " ++ show __LINE__
                                   ++ " produce-consume error (producer):\n\t -"
                                   ++ show (err :: SomeException)))
        Right ()
res -> do
          () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    )
  [MVar ()]
consumerDone <- Int -> IO (MVar ()) -> IO [MVar ()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numForks (IO (MVar ()) -> IO [MVar ()]) -> IO (MVar ()) -> IO [MVar ()]
forall a b. (a -> b) -> a -> b
$ IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  MVar (Maybe b)
consumerOutput <- IO (MVar (Maybe b))
forall a. IO (MVar a)
newEmptyMVar
  [ThreadId]
consumers <- [MVar ()] -> (MVar () -> IO ThreadId) -> IO [ThreadId]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [MVar ()]
consumerDone ((MVar () -> IO ThreadId) -> IO [ThreadId])
-> (MVar () -> IO ThreadId) -> IO [ThreadId]
forall a b. (a -> b) -> a -> b
$ \MVar ()
doneM -> IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally
    ( let loop :: IO ()
loop = do
                  Maybe a
g <- String -> IO (Maybe a) -> IO (Maybe a)
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ takeMVar mProduct
                  case Maybe a
g of
                    Just a
p -> do
                      b
out <- String -> IO b -> IO b
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ consume (IO b -> IO b) -> IO b -> IO b
forall a b. (a -> b) -> a -> b
p
                      String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar consumerOutput (Just out)
                      IO ()
loop
                    Maybe a
Nothing -> String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mProduct Nothing
      in IO ()
loop
    )
    (\Either SomeException ()
e -> do
      String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar doneM ()
      case Either SomeException ()
e of
        Left SomeException
err -> (String -> IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
sayErrString (__FILE__ ++ " on line " ++ show __LINE__ ++ ": produce-consume error (consumer):\n\t- " ++ show (err :: SomeException)))
        Right ()
res -> do
          () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    )
  IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally (do
    String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ mapM_ takeMVar workerDone
    String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mProduct Nothing
    String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ mapM_ takeMVar consumerDone
    )
    (\Either SomeException ()
res -> do
      String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar consumerOutput Nothing
      case Either SomeException ()
res of
        Left SomeException
err -> String -> IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
sayErrString (__FILE__ ++ " on line " ++ show __LINE__
                                  ++ ": produce-consume error (final):\n\t- "
                                  ++ show (err :: SomeException))
        Right ()
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    )

  IO (Maybe b) -> IO (IO (Maybe b))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (Maybe b) -> IO (Maybe b)
forall a. MVar a -> IO a
takeMVar MVar (Maybe b)
consumerOutput)

processJobList :: (a -> IO ([a], [b])) -> [a] -> IO (IO (Maybe b))
processJobList :: forall a b. (a -> IO ([a], [b])) -> [a] -> IO (IO (Maybe b))
processJobList a -> IO ([a], [b])
worker [a]
jobs = do
  Int
numForks <- IO Int
getNumCapabilities
  [MVar ()]
workerDone <- Int -> IO (MVar ()) -> IO [MVar ()]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numForks (IO (MVar ()) -> IO [MVar ()]) -> IO (MVar ()) -> IO [MVar ()]
forall a b. (a -> b) -> a -> b
$ IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  MVar (Maybe b)
workerOutput <- IO (MVar (Maybe b))
forall a. IO (MVar a)
newEmptyMVar
  MVar ()
mReady <- () -> IO (MVar ())
forall a. a -> IO (MVar a)
newMVar ()
  MVar [a]
mJobs  <- [a] -> IO (MVar [a])
forall a. a -> IO (MVar a)
newMVar [a]
jobs
  MVar Integer
mBusyCount <- Integer -> IO (MVar Integer)
forall a. a -> IO (MVar a)
newMVar Integer
0

  [ThreadId]
_ <- [MVar ()] -> (MVar () -> IO ThreadId) -> IO [ThreadId]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM [MVar ()]
workerDone ((MVar () -> IO ThreadId) -> IO [ThreadId])
-> (MVar () -> IO ThreadId) -> IO [ThreadId]
forall a b. (a -> b) -> a -> b
$ \MVar ()
doneM -> IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally
    ( let loop :: IO ()
loop = do
              String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ takeMVar mReady
              [a]
openJobs <- String -> IO [a] -> IO [a]
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ takeMVar mJobs
              Integer
busy0 <- MVar Integer -> (Integer -> IO (Integer, Integer)) -> IO Integer
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar Integer
mBusyCount (\Integer
c -> (Integer, Integer) -> IO (Integer, Integer)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ((Integer, Integer) -> IO (Integer, Integer))
-> (Integer, Integer) -> IO (Integer, Integer)
forall a b. (a -> b) -> a -> b
$ (Integer
c Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
1, Integer
c Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
+ Integer
1))
              case [a]
openJobs of
                [] -> do
                  MVar Integer -> (Integer -> IO Integer) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar Integer
mBusyCount (\Integer
c -> Integer -> IO Integer
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Integer -> IO Integer) -> Integer -> IO Integer
forall a b. (a -> b) -> a -> b
$ Integer
c Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
1)
                  if Integer
busy0 Integer -> Integer -> Bool
forall a. Eq a => a -> a -> Bool
== Integer
1 then do
                    String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mReady ()
                    String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mJobs []
                  else
                    IO ()
loop
                (a
j : [a]
jobs') -> do
                  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [a] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [a]
jobs') (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ 
                    String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mReady ()
                  String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mJobs jobs'
                  ([a]
newJobs, [b]
newResults) <- a -> IO ([a], [b])
worker a
j
                  [a]
openJobs'' <- MVar [a] -> ([a] -> IO ([a], [a])) -> IO [a]
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar [a]
mJobs (([a] -> IO ([a], [a])) -> IO [a])
-> ([a] -> IO ([a], [a])) -> IO [a]
forall a b. (a -> b) -> a -> b
$ \[a]
openJobs' -> do
                    ([a], [a]) -> IO ([a], [a])
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (([a], [a]) -> IO ([a], [a])) -> ([a], [a]) -> IO ([a], [a])
forall a b. (a -> b) -> a -> b
$ ([a]
newJobs [a] -> [a] -> [a]
forall a. [a] -> [a] -> [a]
++ [a]
openJobs', [a]
openJobs')
                  Integer
busy <- MVar Integer -> (Integer -> IO (Integer, Integer)) -> IO Integer
forall a b. MVar a -> (a -> IO (a, b)) -> IO b
modifyMVar MVar Integer
mBusyCount (\Integer
c -> (Integer, Integer) -> IO (Integer, Integer)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ((Integer, Integer) -> IO (Integer, Integer))
-> (Integer, Integer) -> IO (Integer, Integer)
forall a b. (a -> b) -> a -> b
$ (Integer
c Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
1, Integer
c Integer -> Integer -> Integer
forall a. Num a => a -> a -> a
- Integer
1))
                  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when ([a] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [a]
openJobs'' Bool -> Bool -> Bool
&& ( (Bool -> Bool
not (Bool -> Bool) -> Bool -> Bool
forall a b. (a -> b) -> a -> b
$ [a] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [a]
newJobs) Bool -> Bool -> Bool
|| Integer
busy Integer -> Integer -> Bool
forall a. Eq a => a -> a -> Bool
== Integer
0)) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
                    String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar mReady ()
                  String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ mapM_ ((putMVar workerOutput) . Just) newResults
                  IO ()
loop
      in IO ()
loop
    )
    (\Either SomeException ()
e -> do
      String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar doneM ()
      case Either SomeException ()
e of
        Left SomeException
err -> (String -> IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
sayErrString (__FILE__ ++ " on line " ++ show __LINE__
                                   ++ " job list error (worker):\n\t -"
                                   ++ show (err :: SomeException)))
        Right ()
res -> do
          () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    )

  IO () -> (Either SomeException () -> IO ()) -> IO ThreadId
forall a. IO a -> (Either SomeException a -> IO ()) -> IO ThreadId
forkFinally (do
    String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ mapM_ takeMVar workerDone
    )
    (\Either SomeException ()
res -> do
      String -> IO () -> IO ()
forall a. String -> IO a -> IO a
hasLocked (__FILE__ ++ " on line " ++ show __LINE__) $ putMVar workerOutput Nothing
      case Either SomeException ()
res of
        Left SomeException
err -> String -> IO ()
forall (m :: * -> *). MonadIO m => String -> m ()
sayErrString (__FILE__ ++ " on line " ++ show __LINE__
                                  ++ ": job list error (end):\n\t- "
                                  ++ show (err :: SomeException))
        Right ()
_ -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    )

  IO (Maybe b) -> IO (IO (Maybe b))
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (MVar (Maybe b) -> IO (Maybe b)
forall a. MVar a -> IO a
takeMVar MVar (Maybe b)
workerOutput)