{-# LANGUAGE CPP #-}
module SlaveThread
(
fork,
forkWithUnmask,
forkFinally,
forkFinallyWithUnmask,
SlaveThreadCrashed(..)
)
where
import SlaveThread.Prelude
import SlaveThread.Util.LowLevelForking
import qualified DeferredFolds.UnfoldlM as UnfoldlM
import qualified StmContainers.Multimap as Multimap
import qualified Control.Foldl as Foldl
import qualified Focus
{-# NOINLINE slaveRegistry #-}
slaveRegistry :: Multimap.Multimap ThreadId ThreadId
slaveRegistry :: Multimap ThreadId ThreadId
slaveRegistry =
IO (Multimap ThreadId ThreadId) -> Multimap ThreadId ThreadId
forall a. IO a -> a
unsafePerformIO IO (Multimap ThreadId ThreadId)
forall key value. IO (Multimap key value)
Multimap.newIO
{-# INLINABLE fork #-}
fork :: IO a -> IO ThreadId
fork :: IO a -> IO ThreadId
fork =
IO () -> IO a -> IO ThreadId
forall a b. IO a -> IO b -> IO ThreadId
forkFinally (IO () -> IO a -> IO ThreadId) -> IO () -> IO a -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
{-# INLINABLE forkWithUnmask #-}
forkWithUnmask :: ((forall x. IO x -> IO x) -> IO a) -> IO ThreadId
forkWithUnmask :: ((forall x. IO x -> IO x) -> IO a) -> IO ThreadId
forkWithUnmask =
IO () -> ((forall x. IO x -> IO x) -> IO a) -> IO ThreadId
forall a b.
IO a -> ((forall x. IO x -> IO x) -> IO b) -> IO ThreadId
forkFinallyWithUnmask (IO () -> ((forall x. IO x -> IO x) -> IO a) -> IO ThreadId)
-> IO () -> ((forall x. IO x -> IO x) -> IO a) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
{-# INLINABLE forkFinally #-}
forkFinally :: IO a -> IO b -> IO ThreadId
forkFinally :: IO a -> IO b -> IO ThreadId
forkFinally IO a
finalizer IO b
computation =
IO a -> ((forall x. IO x -> IO x) -> IO b) -> IO ThreadId
forall a b.
IO a -> ((forall x. IO x -> IO x) -> IO b) -> IO ThreadId
forkFinallyWithUnmask IO a
finalizer (\forall x. IO x -> IO x
unmask -> IO b -> IO b
forall x. IO x -> IO x
unmask IO b
computation)
{-# INLINABLE forkFinallyWithUnmask #-}
forkFinallyWithUnmask :: IO a -> ((forall x. IO x -> IO x) -> IO b) -> IO ThreadId
forkFinallyWithUnmask :: IO a -> ((forall x. IO x -> IO x) -> IO b) -> IO ThreadId
forkFinallyWithUnmask IO a
finalizer (forall x. IO x -> IO x) -> IO b
computation =
((forall x. IO x -> IO x) -> IO ThreadId) -> IO ThreadId
forall b. ((forall x. IO x -> IO x) -> IO b) -> IO b
uninterruptibleMask (((forall x. IO x -> IO x) -> IO ThreadId) -> IO ThreadId)
-> ((forall x. IO x -> IO x) -> IO ThreadId) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall x. IO x -> IO x
unmask -> do
ThreadId
masterThread <- IO ThreadId
myThreadId
ThreadId
slaveThread <- IO () -> IO ThreadId
forkIOWithoutHandler (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
ThreadId
slaveThread <- IO ThreadId
myThreadId
Maybe SomeException
computationExceptions <- IO (Maybe SomeException)
-> (SomeException -> IO (Maybe SomeException))
-> IO (Maybe SomeException)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch ((forall x. IO x -> IO x) -> IO b
computation forall x. IO x -> IO x
unmask IO b -> Maybe SomeException -> IO (Maybe SomeException)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Maybe SomeException
forall (f :: * -> *) a. Alternative f => f a
empty) (Maybe SomeException -> IO (Maybe SomeException)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe SomeException -> IO (Maybe SomeException))
-> (SomeException -> Maybe SomeException)
-> SomeException
-> IO (Maybe SomeException)
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. SomeException -> Maybe SomeException
forall (f :: * -> *) a. Applicative f => a -> f a
pure)
[SomeException]
slavesDyingExceptions <- let
loop :: [SomeException] -> IO [SomeException]
loop ![SomeException]
exceptions =
IO [SomeException]
-> (SomeException -> IO [SomeException]) -> IO [SomeException]
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch
(IO [SomeException] -> IO [SomeException]
forall x. IO x -> IO x
unmask (IO [SomeException] -> IO [SomeException])
-> IO [SomeException] -> IO [SomeException]
forall a b. (a -> b) -> a -> b
$ do
ThreadId -> IO ()
killSlaves ThreadId
slaveThread
ThreadId -> IO ()
waitForSlavesToDie ThreadId
slaveThread
[SomeException] -> IO [SomeException]
forall (m :: * -> *) a. Monad m => a -> m a
return [SomeException]
exceptions)
(\ !SomeException
exception -> [SomeException] -> IO [SomeException]
loop (SomeException
exception SomeException -> [SomeException] -> [SomeException]
forall a. a -> [a] -> [a]
: [SomeException]
exceptions))
in [SomeException] -> IO [SomeException]
loop []
Maybe SomeException
finalizerExceptions <- IO (Maybe SomeException)
-> (SomeException -> IO (Maybe SomeException))
-> IO (Maybe SomeException)
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
catch (IO a
finalizer IO a -> Maybe SomeException -> IO (Maybe SomeException)
forall (f :: * -> *) a b. Functor f => f a -> b -> f b
$> Maybe SomeException
forall (f :: * -> *) a. Alternative f => f a
empty) (Maybe SomeException -> IO (Maybe SomeException)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe SomeException -> IO (Maybe SomeException))
-> (SomeException -> Maybe SomeException)
-> SomeException
-> IO (Maybe SomeException)
forall k (cat :: k -> k -> *) (b :: k) (c :: k) (a :: k).
Category cat =>
cat b c -> cat a b -> cat a c
. SomeException -> Maybe SomeException
forall (f :: * -> *) a. Applicative f => a -> f a
pure)
let
handler :: SomeException -> IO ()
handler SomeException
e = do
case SomeException -> Maybe AsyncException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
e of
Just AsyncException
ThreadKilled -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Maybe AsyncException
_ -> ThreadId -> SlaveThreadCrashed -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
masterThread (ThreadId -> SomeException -> SlaveThreadCrashed
SlaveThreadCrashed ThreadId
slaveThread SomeException
e)
in do
Maybe SomeException -> (SomeException -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ @Maybe Maybe SomeException
computationExceptions SomeException -> IO ()
handler
[SomeException] -> (SomeException -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [SomeException]
slavesDyingExceptions SomeException -> IO ()
handler
Maybe SomeException -> (SomeException -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ @Maybe Maybe SomeException
finalizerExceptions SomeException -> IO ()
handler
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
Maybe ()
result <- Focus () STM (Maybe ())
-> ThreadId
-> ThreadId
-> Multimap ThreadId ThreadId
-> STM (Maybe ())
forall key value result.
(Eq key, Hashable key, Eq value, Hashable value) =>
Focus () STM result
-> value -> key -> Multimap key value -> STM result
Multimap.focus Focus () STM (Maybe ())
forall (m :: * -> *) a. Monad m => Focus a m (Maybe a)
Focus.lookupAndDelete ThreadId
slaveThread ThreadId
masterThread Multimap ThreadId ThreadId
slaveRegistry
case Maybe ()
result of
Just ()
_ -> () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Maybe ()
_ -> STM ()
forall a. STM a
retry
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ ThreadId -> ThreadId -> Multimap ThreadId ThreadId -> STM ()
forall key value.
(Eq key, Hashable key, Eq value, Hashable value) =>
value -> key -> Multimap key value -> STM ()
Multimap.insert ThreadId
slaveThread ThreadId
masterThread Multimap ThreadId ThreadId
slaveRegistry
ThreadId -> IO ThreadId
forall (m :: * -> *) a. Monad m => a -> m a
return ThreadId
slaveThread
killSlaves :: ThreadId -> IO ()
killSlaves :: ThreadId -> IO ()
killSlaves ThreadId
thread = do
#if MIN_VERSION_stm_containers(1,2,0)
[ThreadId]
threads <- STM [ThreadId] -> IO [ThreadId]
forall a. STM a -> IO a
atomically (FoldM STM ThreadId [ThreadId]
-> UnfoldlM STM ThreadId -> STM [ThreadId]
forall (m :: * -> *) input output.
Monad m =>
FoldM m input output -> UnfoldlM m input -> m output
UnfoldlM.foldM (Fold ThreadId [ThreadId] -> FoldM STM ThreadId [ThreadId]
forall (m :: * -> *) a b. Monad m => Fold a b -> FoldM m a b
Foldl.generalize Fold ThreadId [ThreadId]
forall a. Fold a [a]
Foldl.revList) (ThreadId -> Multimap ThreadId ThreadId -> UnfoldlM STM ThreadId
forall key value.
(Eq key, Hashable key) =>
key -> Multimap key value -> UnfoldlM STM value
Multimap.unfoldlMByKey ThreadId
thread Multimap ThreadId ThreadId
slaveRegistry))
#else
threads <- atomically (UnfoldlM.foldM (Foldl.generalize Foldl.revList) (Multimap.unfoldMByKey thread slaveRegistry))
#endif
(ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_ ThreadId -> IO ()
killThread [ThreadId]
threads
waitForSlavesToDie :: ThreadId -> IO ()
waitForSlavesToDie :: ThreadId -> IO ()
waitForSlavesToDie ThreadId
thread =
STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
#if MIN_VERSION_stm_containers(1,2,0)
Bool
null <- UnfoldlM STM ThreadId -> STM Bool
forall (m :: * -> *) input. Monad m => UnfoldlM m input -> m Bool
UnfoldlM.null (UnfoldlM STM ThreadId -> STM Bool)
-> UnfoldlM STM ThreadId -> STM Bool
forall a b. (a -> b) -> a -> b
$ ThreadId -> Multimap ThreadId ThreadId -> UnfoldlM STM ThreadId
forall key value.
(Eq key, Hashable key) =>
key -> Multimap key value -> UnfoldlM STM value
Multimap.unfoldlMByKey ThreadId
thread Multimap ThreadId ThreadId
slaveRegistry
#else
null <- UnfoldlM.null $ Multimap.unfoldMByKey thread slaveRegistry
#endif
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless Bool
null STM ()
forall a. STM a
retry
data SlaveThreadCrashed
= SlaveThreadCrashed !ThreadId !SomeException
deriving (Int -> SlaveThreadCrashed -> ShowS
[SlaveThreadCrashed] -> ShowS
SlaveThreadCrashed -> String
(Int -> SlaveThreadCrashed -> ShowS)
-> (SlaveThreadCrashed -> String)
-> ([SlaveThreadCrashed] -> ShowS)
-> Show SlaveThreadCrashed
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [SlaveThreadCrashed] -> ShowS
$cshowList :: [SlaveThreadCrashed] -> ShowS
show :: SlaveThreadCrashed -> String
$cshow :: SlaveThreadCrashed -> String
showsPrec :: Int -> SlaveThreadCrashed -> ShowS
$cshowsPrec :: Int -> SlaveThreadCrashed -> ShowS
Show)
instance Exception SlaveThreadCrashed where
toException :: SlaveThreadCrashed -> SomeException
toException = SlaveThreadCrashed -> SomeException
forall e. Exception e => e -> SomeException
asyncExceptionToException
fromException :: SomeException -> Maybe SlaveThreadCrashed
fromException = SomeException -> Maybe SlaveThreadCrashed
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException