{-# LANGUAGE RankNTypes, NamedFieldPuns, BangPatterns,
ExistentialQuantification, CPP, DeriveDataTypeable #-}
{-# OPTIONS_GHC -Wall -fno-warn-name-shadowing -fno-warn-unused-do-bind #-}
module Control.Monad.Par.Scheds.TraceInternal (
Trace(..), Sched(..), Par(..),
IVar(..), IVarContents(..),
sched,
runPar, runParIO, runParAsync,
new, newFull, newFull_, get, put_, put,
pollIVar, yield, fixPar, FixParException (..)
) where
#if MIN_VERSION_base(4,6,0)
import Prelude hiding (mapM, sequence, head,tail)
#else
import Prelude hiding (mapM, sequence, head,tail,catch)
#endif
import Control.Monad as M hiding (mapM, sequence, join)
import Data.IORef
import System.IO.Unsafe
#if MIN_VERSION_base(4,9,0)
import GHC.IO.Unsafe (unsafeDupableInterleaveIO)
#else
import System.IO.Unsafe (unsafeInterleaveIO)
#endif
import Control.Concurrent hiding (yield)
import GHC.Conc (numCapabilities)
import Control.DeepSeq
import Control.Monad.Fix (MonadFix (mfix))
import Control.Exception (Exception, throwIO, BlockedIndefinitelyOnMVar (..),
catch)
import Data.Typeable (Typeable)
#if !MIN_VERSION_base(4,8,0)
import Control.Applicative
#endif
#if __GLASGOW_HASKELL__ <= 700
import GHC.Conc (forkOnIO)
forkOn = forkOnIO
#endif
data Trace = forall a . Get (IVar a) (a -> Trace)
| forall a . Put (IVar a) a Trace
| forall a . New (IVarContents a) (IVar a -> Trace)
| Fork Trace Trace
| Done
| Yield Trace
| forall a . LiftIO (IO a) (a -> Trace)
sched :: Bool -> Sched -> Trace -> IO ()
sched :: Bool -> Sched -> Trace -> IO ()
sched Bool
_doSync Sched
queue Trace
t = Trace -> IO ()
loop Trace
t
where
loop :: Trace -> IO ()
loop Trace
t = case Trace
t of
New IVarContents a
a IVar a -> Trace
f -> do
IORef (IVarContents a)
r <- forall a. a -> IO (IORef a)
newIORef IVarContents a
a
Trace -> IO ()
loop (IVar a -> Trace
f (forall a. IORef (IVarContents a) -> IVar a
IVar IORef (IVarContents a)
r))
Get (IVar IORef (IVarContents a)
v) a -> Trace
c -> do
IVarContents a
e <- forall a. IORef a -> IO a
readIORef IORef (IVarContents a)
v
case IVarContents a
e of
Full a
a -> Trace -> IO ()
loop (a -> Trace
c a
a)
IVarContents a
_other -> do
IO ()
r <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (IVarContents a)
v forall a b. (a -> b) -> a -> b
$ \IVarContents a
e -> case IVarContents a
e of
IVarContents a
Empty -> (forall a. [a -> Trace] -> IVarContents a
Blocked [a -> Trace
c], Sched -> IO ()
reschedule Sched
queue)
Full a
a -> (forall a. a -> IVarContents a
Full a
a, Trace -> IO ()
loop (a -> Trace
c a
a))
Blocked [a -> Trace]
cs -> (forall a. [a -> Trace] -> IVarContents a
Blocked (a -> Trace
cforall a. a -> [a] -> [a]
:[a -> Trace]
cs), Sched -> IO ()
reschedule Sched
queue)
IO ()
r
Put (IVar IORef (IVarContents a)
v) a
a Trace
t -> do
[a -> Trace]
cs <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef (IVarContents a)
v forall a b. (a -> b) -> a -> b
$ \IVarContents a
e -> case IVarContents a
e of
IVarContents a
Empty -> (forall a. a -> IVarContents a
Full a
a, [])
Full a
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"multiple put"
Blocked [a -> Trace]
cs -> (forall a. a -> IVarContents a
Full a
a, [a -> Trace]
cs)
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (Sched -> Trace -> IO ()
pushWork Sched
queueforall b c a. (b -> c) -> (a -> b) -> a -> c
. (forall a b. (a -> b) -> a -> b
$a
a)) [a -> Trace]
cs
Trace -> IO ()
loop Trace
t
Fork Trace
child Trace
parent -> do
Sched -> Trace -> IO ()
pushWork Sched
queue Trace
child
Trace -> IO ()
loop Trace
parent
Trace
Done ->
if Bool
_doSync
then Sched -> IO ()
reschedule Sched
queue
else do [Char] -> IO ()
putStrLn [Char]
" [par] Forking replacement thread..\n"
IO () -> IO ThreadId
forkIO (Sched -> IO ()
reschedule Sched
queue); forall (m :: * -> *) a. Monad m => a -> m a
return ()
Yield Trace
parent -> do
let Sched { IORef [Trace]
workpool :: Sched -> IORef [Trace]
workpool :: IORef [Trace]
workpool } = Sched
queue
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef [Trace]
workpool forall a b. (a -> b) -> a -> b
$ \[Trace]
ts -> ([Trace]
tsforall a. [a] -> [a] -> [a]
++[Trace
parent], ())
Sched -> IO ()
reschedule Sched
queue
LiftIO IO a
io a -> Trace
c -> do
a
r <- IO a
io
Trace -> IO ()
loop (a -> Trace
c a
r)
data FixParException = FixParException deriving (Int -> FixParException -> ShowS
[FixParException] -> ShowS
FixParException -> [Char]
forall a.
(Int -> a -> ShowS) -> (a -> [Char]) -> ([a] -> ShowS) -> Show a
showList :: [FixParException] -> ShowS
$cshowList :: [FixParException] -> ShowS
show :: FixParException -> [Char]
$cshow :: FixParException -> [Char]
showsPrec :: Int -> FixParException -> ShowS
$cshowsPrec :: Int -> FixParException -> ShowS
Show, Typeable)
instance Exception FixParException
reschedule :: Sched -> IO ()
reschedule :: Sched -> IO ()
reschedule queue :: Sched
queue@Sched{ IORef [Trace]
workpool :: IORef [Trace]
workpool :: Sched -> IORef [Trace]
workpool } = do
Maybe Trace
e <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef [Trace]
workpool forall a b. (a -> b) -> a -> b
$ \[Trace]
ts ->
case [Trace]
ts of
[] -> ([], forall a. Maybe a
Nothing)
(Trace
t:[Trace]
ts') -> ([Trace]
ts', forall a. a -> Maybe a
Just Trace
t)
case Maybe Trace
e of
Maybe Trace
Nothing -> Sched -> IO ()
steal Sched
queue
Just Trace
t -> Bool -> Sched -> Trace -> IO ()
sched Bool
True Sched
queue Trace
t
steal :: Sched -> IO ()
steal :: Sched -> IO ()
steal q :: Sched
q@Sched{ IORef [MVar Bool]
idle :: Sched -> IORef [MVar Bool]
idle :: IORef [MVar Bool]
idle, [Sched]
scheds :: Sched -> [Sched]
scheds :: [Sched]
scheds, no :: Sched -> Int
no=Int
my_no } = do
[Sched] -> IO ()
go [Sched]
scheds
where
go :: [Sched] -> IO ()
go [] = do MVar Bool
m <- forall a. IO (MVar a)
newEmptyMVar
[MVar Bool]
r <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef [MVar Bool]
idle forall a b. (a -> b) -> a -> b
$ \[MVar Bool]
is -> (MVar Bool
mforall a. a -> [a] -> [a]
:[MVar Bool]
is, [MVar Bool]
is)
if forall (t :: * -> *) a. Foldable t => t a -> Int
length [MVar Bool]
r forall a. Eq a => a -> a -> Bool
== Int
numCapabilities forall a. Num a => a -> a -> a
- Int
1
then do
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ (\MVar Bool
m -> forall a. MVar a -> a -> IO ()
putMVar MVar Bool
m Bool
True) [MVar Bool]
r
else do
Bool
done <- forall a. MVar a -> IO a
takeMVar MVar Bool
m
if Bool
done
then do
forall (m :: * -> *) a. Monad m => a -> m a
return ()
else do
[Sched] -> IO ()
go [Sched]
scheds
go (Sched
x:[Sched]
xs)
| Sched -> Int
no Sched
x forall a. Eq a => a -> a -> Bool
== Int
my_no = [Sched] -> IO ()
go [Sched]
xs
| Bool
otherwise = do
Maybe Trace
r <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef (Sched -> IORef [Trace]
workpool Sched
x) forall a b. (a -> b) -> a -> b
$ \ [Trace]
ts ->
case [Trace]
ts of
[] -> ([], forall a. Maybe a
Nothing)
(Trace
x:[Trace]
xs) -> ([Trace]
xs, forall a. a -> Maybe a
Just Trace
x)
case Maybe Trace
r of
Just Trace
t -> do
Bool -> Sched -> Trace -> IO ()
sched Bool
True Sched
q Trace
t
Maybe Trace
Nothing -> [Sched] -> IO ()
go [Sched]
xs
pushWork :: Sched -> Trace -> IO ()
pushWork :: Sched -> Trace -> IO ()
pushWork Sched { IORef [Trace]
workpool :: IORef [Trace]
workpool :: Sched -> IORef [Trace]
workpool, IORef [MVar Bool]
idle :: IORef [MVar Bool]
idle :: Sched -> IORef [MVar Bool]
idle } Trace
t = do
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef [Trace]
workpool forall a b. (a -> b) -> a -> b
$ \[Trace]
ts -> (Trace
tforall a. a -> [a] -> [a]
:[Trace]
ts, ())
[MVar Bool]
idles <- forall a. IORef a -> IO a
readIORef IORef [MVar Bool]
idle
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Bool -> Bool
not (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [MVar Bool]
idles)) forall a b. (a -> b) -> a -> b
$ do
IO ()
r <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef IORef [MVar Bool]
idle (\[MVar Bool]
is -> case [MVar Bool]
is of
[] -> ([], forall (m :: * -> *) a. Monad m => a -> m a
return ())
(MVar Bool
i:[MVar Bool]
is) -> ([MVar Bool]
is, forall a. MVar a -> a -> IO ()
putMVar MVar Bool
i Bool
False))
IO ()
r
data Sched = Sched
{ Sched -> Int
no :: {-# UNPACK #-} !Int,
Sched -> IORef [Trace]
workpool :: IORef [Trace],
Sched -> IORef [MVar Bool]
idle :: IORef [MVar Bool],
Sched -> [Sched]
scheds :: [Sched]
}
newtype Par a = Par {
forall a. Par a -> (a -> Trace) -> Trace
runCont :: (a -> Trace) -> Trace
}
instance Functor Par where
fmap :: forall a b. (a -> b) -> Par a -> Par b
fmap a -> b
f Par a
m = forall a. ((a -> Trace) -> Trace) -> Par a
Par forall a b. (a -> b) -> a -> b
$ \b -> Trace
c -> forall a. Par a -> (a -> Trace) -> Trace
runCont Par a
m (b -> Trace
c forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> b
f)
instance Monad Par where
return :: forall a. a -> Par a
return = forall (f :: * -> *) a. Applicative f => a -> f a
pure
Par a
m >>= :: forall a b. Par a -> (a -> Par b) -> Par b
>>= a -> Par b
k = forall a. ((a -> Trace) -> Trace) -> Par a
Par forall a b. (a -> b) -> a -> b
$ \b -> Trace
c -> forall a. Par a -> (a -> Trace) -> Trace
runCont Par a
m forall a b. (a -> b) -> a -> b
$ \a
a -> forall a. Par a -> (a -> Trace) -> Trace
runCont (a -> Par b
k a
a) b -> Trace
c
instance Applicative Par where
<*> :: forall a b. Par (a -> b) -> Par a -> Par b
(<*>) = forall (m :: * -> *) a b. Monad m => m (a -> b) -> m a -> m b
ap
pure :: forall a. a -> Par a
pure a
a = forall a. ((a -> Trace) -> Trace) -> Par a
Par (forall a b. (a -> b) -> a -> b
$ a
a)
instance MonadFix Par where
mfix :: forall a. (a -> Par a) -> Par a
mfix = forall a. (a -> Par a) -> Par a
fixPar
fixPar :: (a -> Par a) -> Par a
fixPar :: forall a. (a -> Par a) -> Par a
fixPar a -> Par a
f = forall a. ((a -> Trace) -> Trace) -> Par a
Par forall a b. (a -> b) -> a -> b
$ \ a -> Trace
c ->
forall a. IO a -> (a -> Trace) -> Trace
LiftIO (do
MVar a
mv <- forall a. IO (MVar a)
newEmptyMVar
a
ans <- forall a. IO a -> IO a
unsafeDupableInterleaveIO (forall a. MVar a -> IO a
readMVar MVar a
mv
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`catch` \ ~BlockedIndefinitelyOnMVar
BlockedIndefinitelyOnMVar -> forall e a. Exception e => e -> IO a
throwIO FixParException
FixParException)
case a -> Par a
f a
ans of
Par (a -> Trace) -> Trace
q -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ (a -> Trace) -> Trace
q forall a b. (a -> b) -> a -> b
$ \a
a -> forall a. IO a -> (a -> Trace) -> Trace
LiftIO (forall a. MVar a -> a -> IO ()
putMVar MVar a
mv a
a) (\ ~() -> a -> Trace
c a
a)) forall a. a -> a
id
#if !MIN_VERSION_base(4,9,0)
unsafeDupableInterleaveIO :: IO a -> IO a
unsafeDupableInterleaveIO = unsafeInterleaveIO
#endif
newtype IVar a = IVar (IORef (IVarContents a))
instance Eq (IVar a) where
(IVar IORef (IVarContents a)
r1) == :: IVar a -> IVar a -> Bool
== (IVar IORef (IVarContents a)
r2) = IORef (IVarContents a)
r1 forall a. Eq a => a -> a -> Bool
== IORef (IVarContents a)
r2
instance NFData (IVar a) where
rnf :: IVar a -> ()
rnf !IVar a
_ = ()
pollIVar :: IVar a -> IO (Maybe a)
pollIVar :: forall a. IVar a -> IO (Maybe a)
pollIVar (IVar IORef (IVarContents a)
ref) =
do IVarContents a
contents <- forall a. IORef a -> IO a
readIORef IORef (IVarContents a)
ref
case IVarContents a
contents of
Full a
x -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. a -> Maybe a
Just a
x)
IVarContents a
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. Maybe a
Nothing)
data IVarContents a = Full a | Empty | Blocked [a -> Trace]
{-# INLINE runPar_internal #-}
runPar_internal :: Bool -> Par a -> IO a
runPar_internal :: forall a. Bool -> Par a -> IO a
runPar_internal Bool
_doSync Par a
x = do
[IORef [Trace]]
workpools <- forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
numCapabilities forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef []
IORef [MVar Bool]
idle <- forall a. a -> IO (IORef a)
newIORef []
let states :: [Sched]
states = [ Sched { no :: Int
no=Int
x, workpool :: IORef [Trace]
workpool=IORef [Trace]
wp, IORef [MVar Bool]
idle :: IORef [MVar Bool]
idle :: IORef [MVar Bool]
idle, scheds :: [Sched]
scheds=[Sched]
states }
| (Int
x,IORef [Trace]
wp) <- forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0..] [IORef [Trace]]
workpools ]
#if __GLASGOW_HASKELL__ >= 701 /* 20110301 */
(Int
main_cpu, Bool
_) <- ThreadId -> IO (Int, Bool)
threadCapability forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO ThreadId
myThreadId
#else
let main_cpu = 0
#endif
MVar (IVarContents a)
m <- forall a. IO (MVar a)
newEmptyMVar
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ (forall a b. [a] -> [b] -> [(a, b)]
zip [Int
0..] [Sched]
states) forall a b. (a -> b) -> a -> b
$ \(Int
cpu,Sched
state) ->
Int -> IO () -> IO ThreadId
forkOn Int
cpu forall a b. (a -> b) -> a -> b
$
if (Int
cpu forall a. Eq a => a -> a -> Bool
/= Int
main_cpu)
then Sched -> IO ()
reschedule Sched
state
else do
IORef (IVarContents a)
rref <- forall a. a -> IO (IORef a)
newIORef forall a. IVarContents a
Empty
Bool -> Sched -> Trace -> IO ()
sched Bool
_doSync Sched
state forall a b. (a -> b) -> a -> b
$ forall a. Par a -> (a -> Trace) -> Trace
runCont (Par a
x forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. IVar a -> a -> Par ()
put_ (forall a. IORef (IVarContents a) -> IVar a
IVar IORef (IVarContents a)
rref)) (forall a b. a -> b -> a
const Trace
Done)
forall a. IORef a -> IO a
readIORef IORef (IVarContents a)
rref forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. MVar a -> a -> IO ()
putMVar MVar (IVarContents a)
m
IVarContents a
r <- forall a. MVar a -> IO a
takeMVar MVar (IVarContents a)
m
case IVarContents a
r of
Full a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
IVarContents a
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"no result"
runPar :: Par a -> a
runPar :: forall a. Par a -> a
runPar = forall a. IO a -> a
unsafePerformIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Bool -> Par a -> IO a
runPar_internal Bool
True
runParIO :: Par a -> IO a
runParIO :: forall a. Par a -> IO a
runParIO = forall a. Bool -> Par a -> IO a
runPar_internal Bool
True
runParAsync :: Par a -> a
runParAsync :: forall a. Par a -> a
runParAsync = forall a. IO a -> a
unsafePerformIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Bool -> Par a -> IO a
runPar_internal Bool
False
new :: Par (IVar a)
new :: forall a. Par (IVar a)
new = forall a. ((a -> Trace) -> Trace) -> Par a
Par forall a b. (a -> b) -> a -> b
$ forall a. IVarContents a -> (IVar a -> Trace) -> Trace
New forall a. IVarContents a
Empty
newFull :: NFData a => a -> Par (IVar a)
newFull :: forall a. NFData a => a -> Par (IVar a)
newFull a
x = forall a. ((a -> Trace) -> Trace) -> Par a
Par forall a b. (a -> b) -> a -> b
$ \IVar a -> Trace
c -> a
x forall a b. NFData a => a -> b -> b
`deepseq` forall a. IVarContents a -> (IVar a -> Trace) -> Trace
New (forall a. a -> IVarContents a
Full a
x) IVar a -> Trace
c
newFull_ :: a -> Par (IVar a)
newFull_ :: forall a. a -> Par (IVar a)
newFull_ !a
x = forall a. ((a -> Trace) -> Trace) -> Par a
Par forall a b. (a -> b) -> a -> b
$ forall a. IVarContents a -> (IVar a -> Trace) -> Trace
New (forall a. a -> IVarContents a
Full a
x)
get :: IVar a -> Par a
get :: forall a. IVar a -> Par a
get IVar a
v = forall a. ((a -> Trace) -> Trace) -> Par a
Par forall a b. (a -> b) -> a -> b
$ \a -> Trace
c -> forall a. IVar a -> (a -> Trace) -> Trace
Get IVar a
v a -> Trace
c
put_ :: IVar a -> a -> Par ()
put_ :: forall a. IVar a -> a -> Par ()
put_ IVar a
v !a
a = forall a. ((a -> Trace) -> Trace) -> Par a
Par forall a b. (a -> b) -> a -> b
$ \() -> Trace
c -> forall a. IVar a -> a -> Trace -> Trace
Put IVar a
v a
a (() -> Trace
c ())
put :: NFData a => IVar a -> a -> Par ()
put :: forall a. NFData a => IVar a -> a -> Par ()
put IVar a
v a
a = forall a. ((a -> Trace) -> Trace) -> Par a
Par forall a b. (a -> b) -> a -> b
$ \() -> Trace
c -> a
a forall a b. NFData a => a -> b -> b
`deepseq` forall a. IVar a -> a -> Trace -> Trace
Put IVar a
v a
a (() -> Trace
c ())
yield :: Par ()
yield :: Par ()
yield = forall a. ((a -> Trace) -> Trace) -> Par a
Par forall a b. (a -> b) -> a -> b
$ \() -> Trace
c -> Trace -> Trace
Yield (() -> Trace
c ())