module Data.Repa.Eval.Gang
(Gang, forkGang, gangSize, gangIO, gangST)
where
import GHC.IO
import GHC.ST
import GHC.Conc (forkOn)
import Control.Concurrent.MVar
import Control.Exception (assert)
import Control.Monad
import System.IO
import GHC.Exts
data Req
= ReqDo (Int# -> IO ())
| ReqShutdown
data Gang
= Gang
{
Gang -> Int#
_gangThreads :: Int#
, Gang -> [MVar Req]
_gangRequestVars :: [MVar Req]
, Gang -> [MVar ()]
_gangResultVars :: [MVar ()]
, Gang -> MVar Bool
_gangBusy :: MVar Bool
}
instance Show Gang where
showsPrec :: Int -> Gang -> ShowS
showsPrec Int
p (Gang Int#
n [MVar Req]
_ [MVar ()]
_ MVar Bool
_)
= String -> ShowS
showString String
"<<"
ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Int -> ShowS
forall a. Show a => Int -> a -> ShowS
showsPrec Int
p (Int# -> Int
I# Int#
n)
ShowS -> ShowS -> ShowS
forall b c a. (b -> c) -> (a -> b) -> a -> c
. String -> ShowS
showString String
" threads>>"
gangSize :: Gang -> Int#
gangSize :: Gang -> Int#
gangSize (Gang Int#
n [MVar Req]
_ [MVar ()]
_ MVar Bool
_)
= Int#
n
{-# NOINLINE gangSize #-}
forkGang :: Int -> IO Gang
forkGang :: Int -> IO Gang
forkGang !n :: Int
n@(I# Int#
n_)
= Bool -> IO Gang -> IO Gang
forall a. (?callStack::CallStack) => Bool -> a -> a
assert (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
0)
(IO Gang -> IO Gang) -> IO Gang -> IO Gang
forall a b. (a -> b) -> a -> b
$ do
[MVar Req]
mvsRequest <- [IO (MVar Req)] -> IO [MVar Req]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence ([IO (MVar Req)] -> IO [MVar Req])
-> [IO (MVar Req)] -> IO [MVar Req]
forall a b. (a -> b) -> a -> b
$ Int -> IO (MVar Req) -> [IO (MVar Req)]
forall a. Int -> a -> [a]
replicate Int
n (IO (MVar Req) -> [IO (MVar Req)])
-> IO (MVar Req) -> [IO (MVar Req)]
forall a b. (a -> b) -> a -> b
$ IO (MVar Req)
forall a. IO (MVar a)
newEmptyMVar
[MVar ()]
mvsDone <- [IO (MVar ())] -> IO [MVar ()]
forall (t :: * -> *) (m :: * -> *) a.
(Traversable t, Monad m) =>
t (m a) -> m (t a)
forall (m :: * -> *) a. Monad m => [m a] -> m [a]
sequence ([IO (MVar ())] -> IO [MVar ()]) -> [IO (MVar ())] -> IO [MVar ()]
forall a b. (a -> b) -> a -> b
$ Int -> IO (MVar ()) -> [IO (MVar ())]
forall a. Int -> a -> [a]
replicate Int
n (IO (MVar ()) -> [IO (MVar ())]) -> IO (MVar ()) -> [IO (MVar ())]
forall a b. (a -> b) -> a -> b
$ IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
(MVar Req -> MVar () -> IO (Weak (MVar Req)))
-> [MVar Req] -> [MVar ()] -> IO ()
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m ()
zipWithM_ (\MVar Req
varReq MVar ()
varDone
-> MVar Req -> IO () -> IO (Weak (MVar Req))
forall a. MVar a -> IO () -> IO (Weak (MVar a))
mkWeakMVar MVar Req
varReq (MVar Req -> MVar () -> IO ()
finaliseWorker MVar Req
varReq MVar ()
varDone))
[MVar Req]
mvsRequest
[MVar ()]
mvsDone
(Int -> IO () -> IO ThreadId) -> [Int] -> [IO ()] -> IO ()
forall (m :: * -> *) a b c.
Applicative m =>
(a -> b -> m c) -> [a] -> [b] -> m ()
zipWithM_ Int -> IO () -> IO ThreadId
forkOn [Int
0..]
([IO ()] -> IO ()) -> [IO ()] -> IO ()
forall a b. (a -> b) -> a -> b
$ (Int -> MVar Req -> MVar () -> IO ())
-> [Int] -> [MVar Req] -> [MVar ()] -> [IO ()]
forall a b c d. (a -> b -> c -> d) -> [a] -> [b] -> [c] -> [d]
zipWith3 (\(I# Int#
i) -> Int# -> MVar Req -> MVar () -> IO ()
gangWorker Int#
i)
[Int
0 .. Int
n Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1] [MVar Req]
mvsRequest [MVar ()]
mvsDone
MVar Bool
busy <- Bool -> IO (MVar Bool)
forall a. a -> IO (MVar a)
newMVar Bool
False
Gang -> IO Gang
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (Gang -> IO Gang) -> Gang -> IO Gang
forall a b. (a -> b) -> a -> b
$ Int# -> [MVar Req] -> [MVar ()] -> MVar Bool -> Gang
Gang Int#
n_ [MVar Req]
mvsRequest [MVar ()]
mvsDone MVar Bool
busy
{-# NOINLINE forkGang #-}
gangWorker :: Int# -> MVar Req -> MVar () -> IO ()
gangWorker :: Int# -> MVar Req -> MVar () -> IO ()
gangWorker Int#
threadId MVar Req
varRequest MVar ()
varDone
= do
Req
req <- MVar Req -> IO Req
forall a. MVar a -> IO a
takeMVar MVar Req
varRequest
case Req
req of
ReqDo Int# -> IO ()
action
-> do
Int# -> IO ()
action Int#
threadId
MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
varDone ()
Int# -> MVar Req -> MVar () -> IO ()
gangWorker Int#
threadId MVar Req
varRequest MVar ()
varDone
Req
ReqShutdown
-> MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
varDone ()
{-# NOINLINE gangWorker #-}
finaliseWorker :: MVar Req -> MVar () -> IO ()
finaliseWorker :: MVar Req -> MVar () -> IO ()
finaliseWorker MVar Req
varReq MVar ()
varDone
= do MVar Req -> Req -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Req
varReq Req
ReqShutdown
MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar MVar ()
varDone
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
{-# NOINLINE finaliseWorker #-}
gangIO :: Gang
-> (Int# -> IO ())
-> IO ()
gangIO :: Gang -> (Int# -> IO ()) -> IO ()
gangIO gang :: Gang
gang@(Gang Int#
_ [MVar Req]
_ [MVar ()]
_ MVar Bool
busy) Int# -> IO ()
action
= do Bool
b <- MVar Bool -> Bool -> IO Bool
forall a. MVar a -> a -> IO a
swapMVar MVar Bool
busy Bool
True
if Bool
b
then do
Gang -> (Int# -> IO ()) -> IO ()
seqIO Gang
gang Int# -> IO ()
action
else do
Gang -> (Int# -> IO ()) -> IO ()
parIO Gang
gang Int# -> IO ()
action
Bool
_ <- MVar Bool -> Bool -> IO Bool
forall a. MVar a -> a -> IO a
swapMVar MVar Bool
busy Bool
False
() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
{-# NOINLINE gangIO #-}
seqIO :: Gang -> (Int# -> IO ()) -> IO ()
seqIO :: Gang -> (Int# -> IO ()) -> IO ()
seqIO (Gang Int#
n [MVar Req]
_ [MVar ()]
_ MVar Bool
_) Int# -> IO ()
action
= do Handle -> String -> IO ()
hPutStr Handle
stderr
(String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ [String] -> String
unlines
[ String
"Data.Array.Repa.Bulk.Par: Performing nested parallel computation sequentially."
, String
" Something is trying to run a compuation on a gang that is already busy. "
, String
" You've probably used a Repa 'computeP', 'foldP' or similar function while "
, String
" another instance was already running. This can happen if you've passed a "
, String
" parallel worker function to a combinator like 'map', or some parallel "
, String
" compuation was suspended via lazy evaluation. Try using `seq` to ensure that"
, String
" each array is fully evaluated before computing the next one. "
, String
"" ]
(Int -> IO ()) -> [Int] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\(I# Int#
i) -> Int# -> IO ()
action Int#
i) [Int
0 .. (Int# -> Int
I# Int#
n) Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1]
{-# NOINLINE seqIO #-}
parIO :: Gang -> (Int# -> IO ()) -> IO ()
parIO :: Gang -> (Int# -> IO ()) -> IO ()
parIO (Gang Int#
_ [MVar Req]
mvsRequest [MVar ()]
mvsResult MVar Bool
_) Int# -> IO ()
action
= do
(MVar Req -> IO ()) -> [MVar Req] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\MVar Req
v -> MVar Req -> Req -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Req
v ((Int# -> IO ()) -> Req
ReqDo Int# -> IO ()
action)) [MVar Req]
mvsRequest
(MVar () -> IO ()) -> [MVar ()] -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ MVar () -> IO ()
forall a. MVar a -> IO a
takeMVar [MVar ()]
mvsResult
{-# NOINLINE parIO #-}
gangST :: Gang -> (Int# -> ST s ()) -> ST s ()
gangST :: forall s. Gang -> (Int# -> ST s ()) -> ST s ()
gangST Gang
g Int# -> ST s ()
p
= IO () -> ST s ()
forall a s. IO a -> ST s a
unsafeIOToST (IO () -> ST s ()) -> IO () -> ST s ()
forall a b. (a -> b) -> a -> b
$ Gang -> (Int# -> IO ()) -> IO ()
gangIO Gang
g (\Int#
i -> ST s () -> IO ()
forall s a. ST s a -> IO a
unsafeSTToIO (ST s () -> IO ()) -> ST s () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int# -> ST s ()
p Int#
i)
{-# NOINLINE gangST #-}