{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE RecordWildCards #-}
module Control.Scheduler.Global
( GlobalScheduler
, globalScheduler
, newGlobalScheduler
, withGlobalScheduler_
) where
import Data.Maybe
import Control.Concurrent (ThreadId)
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad
import Control.Monad.IO.Unlift
import Control.Scheduler
import Control.Scheduler.Internal
import Control.Scheduler.Types
import Data.IORef
import System.IO.Unsafe (unsafePerformIO)
globalScheduler :: GlobalScheduler IO
globalScheduler :: GlobalScheduler IO
globalScheduler = IO (GlobalScheduler IO) -> GlobalScheduler IO
forall a. IO a -> a
unsafePerformIO (Comp -> IO (GlobalScheduler IO)
forall (m :: * -> *).
MonadUnliftIO m =>
Comp -> m (GlobalScheduler m)
newGlobalScheduler Comp
Par)
{-# NOINLINE globalScheduler #-}
initGlobalScheduler ::
MonadUnliftIO m => Comp -> (Scheduler m a -> [ThreadId] -> m b) -> m b
initGlobalScheduler :: Comp -> (Scheduler m a -> [ThreadId] -> m b) -> m b
initGlobalScheduler Comp
comp Scheduler m a -> [ThreadId] -> m b
action = do
(Jobs m a
jobs, [ThreadId] -> Scheduler m a
mkScheduler) <- Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> m (Jobs m a, [ThreadId] -> Scheduler m a)
forall (m :: * -> *) a.
MonadIO m =>
Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> m (Jobs m a, [ThreadId] -> Scheduler m a)
initScheduler Comp
comp Jobs m a -> (WorkerId -> m a) -> m ()
forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ (m [a] -> JQueue m a -> m [a]
forall a b. a -> b -> a
const ([a] -> m [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []))
m [ThreadId] -> ([ThreadId] -> m ()) -> ([ThreadId] -> m b) -> m b
forall (m :: * -> *) a b c.
MonadUnliftIO m =>
m a -> (a -> m b) -> (a -> m c) -> m c
safeBracketOnError (Jobs m a -> Comp -> m [ThreadId]
forall (m :: * -> *) a.
MonadUnliftIO m =>
Jobs m a -> Comp -> m [ThreadId]
spawnWorkers Jobs m a
jobs Comp
comp) (IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> ([ThreadId] -> IO ()) -> [ThreadId] -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [ThreadId] -> IO ()
terminateWorkers) (([ThreadId] -> m b) -> m b) -> ([ThreadId] -> m b) -> m b
forall a b. (a -> b) -> a -> b
$ \[ThreadId]
tids ->
Scheduler m a -> [ThreadId] -> m b
action ([ThreadId] -> Scheduler m a
mkScheduler [ThreadId]
tids) [ThreadId]
tids
newGlobalScheduler :: MonadUnliftIO m => Comp -> m (GlobalScheduler m)
newGlobalScheduler :: Comp -> m (GlobalScheduler m)
newGlobalScheduler Comp
comp =
Comp
-> (Scheduler m () -> [ThreadId] -> m (GlobalScheduler m))
-> m (GlobalScheduler m)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler m a -> [ThreadId] -> m b) -> m b
initGlobalScheduler Comp
comp ((Scheduler m () -> [ThreadId] -> m (GlobalScheduler m))
-> m (GlobalScheduler m))
-> (Scheduler m () -> [ThreadId] -> m (GlobalScheduler m))
-> m (GlobalScheduler m)
forall a b. (a -> b) -> a -> b
$ \Scheduler m ()
scheduler [ThreadId]
tids ->
IO (GlobalScheduler m) -> m (GlobalScheduler m)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (GlobalScheduler m) -> m (GlobalScheduler m))
-> IO (GlobalScheduler m) -> m (GlobalScheduler m)
forall a b. (a -> b) -> a -> b
$ do
MVar (Scheduler m ())
mvar <- Scheduler m () -> IO (MVar (Scheduler m ()))
forall a. a -> IO (MVar a)
newMVar Scheduler m ()
scheduler
IORef [ThreadId]
tidsRef <- [ThreadId] -> IO (IORef [ThreadId])
forall a. a -> IO (IORef a)
newIORef [ThreadId]
tids
Weak (MVar (Scheduler m ()))
_ <- MVar (Scheduler m ()) -> IO () -> IO (Weak (MVar (Scheduler m ())))
forall a. MVar a -> IO () -> IO (Weak (MVar a))
mkWeakMVar MVar (Scheduler m ())
mvar (IORef [ThreadId] -> IO [ThreadId]
forall a. IORef a -> IO a
readIORef IORef [ThreadId]
tidsRef IO [ThreadId] -> ([ThreadId] -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= [ThreadId] -> IO ()
terminateWorkers)
GlobalScheduler m -> IO (GlobalScheduler m)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (GlobalScheduler m -> IO (GlobalScheduler m))
-> GlobalScheduler m -> IO (GlobalScheduler m)
forall a b. (a -> b) -> a -> b
$
GlobalScheduler :: forall (m :: * -> *).
Comp
-> MVar (Scheduler m ()) -> IORef [ThreadId] -> GlobalScheduler m
GlobalScheduler
{ globalSchedulerComp :: Comp
globalSchedulerComp = Comp
comp
, globalSchedulerMVar :: MVar (Scheduler m ())
globalSchedulerMVar = MVar (Scheduler m ())
mvar
, globalSchedulerThreadIdsRef :: IORef [ThreadId]
globalSchedulerThreadIdsRef = IORef [ThreadId]
tidsRef
}
withGlobalScheduler_ :: MonadUnliftIO m => GlobalScheduler m -> (Scheduler m () -> m a) -> m ()
withGlobalScheduler_ :: GlobalScheduler m -> (Scheduler m () -> m a) -> m ()
withGlobalScheduler_ GlobalScheduler {IORef [ThreadId]
MVar (Scheduler m ())
Comp
globalSchedulerThreadIdsRef :: IORef [ThreadId]
globalSchedulerMVar :: MVar (Scheduler m ())
globalSchedulerComp :: Comp
globalSchedulerThreadIdsRef :: forall (m :: * -> *). GlobalScheduler m -> IORef [ThreadId]
globalSchedulerMVar :: forall (m :: * -> *). GlobalScheduler m -> MVar (Scheduler m ())
globalSchedulerComp :: forall (m :: * -> *). GlobalScheduler m -> Comp
..} Scheduler m () -> m a
action =
((forall a. m a -> IO a) -> IO ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
let initializeNewScheduler :: m ()
initializeNewScheduler = do
Comp -> (Scheduler m () -> [ThreadId] -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler m a -> [ThreadId] -> m b) -> m b
initGlobalScheduler Comp
globalSchedulerComp ((Scheduler m () -> [ThreadId] -> m ()) -> m ())
-> (Scheduler m () -> [ThreadId] -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Scheduler m ()
scheduler [ThreadId]
tids ->
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
[ThreadId]
oldTids <- IORef [ThreadId]
-> ([ThreadId] -> ([ThreadId], [ThreadId])) -> IO [ThreadId]
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [ThreadId]
globalSchedulerThreadIdsRef (([ThreadId] -> ([ThreadId], [ThreadId])) -> IO [ThreadId])
-> ([ThreadId] -> ([ThreadId], [ThreadId])) -> IO [ThreadId]
forall a b. (a -> b) -> a -> b
$ (,) [ThreadId]
tids
[ThreadId] -> IO ()
terminateWorkers [ThreadId]
oldTids
MVar (Scheduler m ()) -> Scheduler m () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Scheduler m ())
globalSchedulerMVar Scheduler m ()
scheduler
((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall b. ((forall a. IO a -> IO a) -> IO b) -> IO b
mask (((forall a. IO a -> IO a) -> IO ()) -> IO ())
-> ((forall a. IO a -> IO a) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \forall a. IO a -> IO a
restore ->
MVar (Scheduler m ()) -> IO (Maybe (Scheduler m ()))
forall a. MVar a -> IO (Maybe a)
tryTakeMVar MVar (Scheduler m ())
globalSchedulerMVar IO (Maybe (Scheduler m ()))
-> (Maybe (Scheduler m ()) -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (Scheduler m ())
Nothing -> IO () -> IO ()
forall a. IO a -> IO a
restore (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ m () -> IO ()
forall a. m a -> IO a
run (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ Comp -> (Scheduler m () -> m a) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler m a -> m b) -> m ()
withScheduler_ Comp
globalSchedulerComp Scheduler m () -> m a
action
Just Scheduler m ()
scheduler -> do
let runScheduler :: IO (Maybe (Results ()))
runScheduler =
m (Maybe (Results ())) -> IO (Maybe (Results ()))
forall a. m a -> IO a
run (m (Maybe (Results ())) -> IO (Maybe (Results ())))
-> m (Maybe (Results ())) -> IO (Maybe (Results ()))
forall a b. (a -> b) -> a -> b
$ do
a
_ <- Scheduler m () -> m a
action Scheduler m ()
scheduler
Maybe (Results ())
mEarly <- Scheduler m () -> m (Maybe (Results ()))
forall (m :: * -> *) a. Scheduler m a -> m (Maybe (Results a))
_earlyResults Scheduler m ()
scheduler
Maybe (Results ())
mEarly Maybe (Results ()) -> m () -> m (Maybe (Results ()))
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Maybe (Results ()) -> Bool
forall a. Maybe a -> Bool
isNothing Maybe (Results ())
mEarly) (m (Results ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Scheduler m () -> m (Results ())
forall (m :: * -> *) a. Scheduler m a -> m (Results a)
_waitForCurrentBatch Scheduler m ()
scheduler))
Maybe (Results ())
mEarly <- IO (Maybe (Results ())) -> IO (Maybe (Results ()))
forall a. IO a -> IO a
restore IO (Maybe (Results ()))
runScheduler IO (Maybe (Results ())) -> IO () -> IO (Maybe (Results ()))
forall a b. IO a -> IO b -> IO a
`onException` m () -> IO ()
forall a. m a -> IO a
run m ()
initializeNewScheduler
case Maybe (Results ())
mEarly of
Maybe (Results ())
Nothing -> MVar (Scheduler m ()) -> Scheduler m () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Scheduler m ())
globalSchedulerMVar Scheduler m ()
scheduler
Just Results ()
_ -> m () -> IO ()
forall a. m a -> IO a
run m ()
initializeNewScheduler