{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE Unsafe #-}
{-# OPTIONS_HADDOCK hide, not-home #-}
module Control.Scheduler.Internal
( withSchedulerInternal
, initWorkerStates
, withSchedulerWSInternal
, trivialScheduler_
, withTrivialSchedulerR
, withTrivialSchedulerRIO
, initScheduler
, spawnWorkers
, terminateWorkers
, scheduleJobs
, scheduleJobs_
, scheduleJobsWith
, reverseResults
, resultsToList
, traverse_
, safeBracketOnError
) where
import Data.Coerce
import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.IO.Unlift
import Control.Scheduler.Computation
import Control.Scheduler.Types
import Control.Scheduler.Queue
import Data.Atomics (atomicModifyIORefCAS, atomicModifyIORefCAS_)
import qualified Data.Foldable as F (foldl')
import Data.IORef
import Data.Primitive.SmallArray
import Data.Primitive.MutVar
import Data.Primitive.PVar
initWorkerStates :: MonadIO m => Comp -> (WorkerId -> m s) -> m (WorkerStates s)
initWorkerStates :: Comp -> (WorkerId -> m s) -> m (WorkerStates s)
initWorkerStates Comp
comp WorkerId -> m s
initState = do
Int
nWorkers <- Comp -> m Int
forall (m :: * -> *). MonadIO m => Comp -> m Int
getCompWorkers Comp
comp
SmallMutableArray RealWorld s
arr <- IO (SmallMutableArray RealWorld s)
-> m (SmallMutableArray RealWorld s)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SmallMutableArray RealWorld s)
-> m (SmallMutableArray RealWorld s))
-> IO (SmallMutableArray RealWorld s)
-> m (SmallMutableArray RealWorld s)
forall a b. (a -> b) -> a -> b
$ Int -> s -> IO (SmallMutableArray (PrimState IO) s)
forall (m :: * -> *) a.
PrimMonad m =>
Int -> a -> m (SmallMutableArray (PrimState m) a)
newSmallArray Int
nWorkers ([Char] -> s
forall a. HasCallStack => [Char] -> a
error [Char]
"Uninitialized")
let go :: Int -> m ()
go Int
i =
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
nWorkers) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
s
state <- WorkerId -> m s
initState (Int -> WorkerId
WorkerId Int
i)
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SmallMutableArray (PrimState IO) s -> Int -> s -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
SmallMutableArray (PrimState m) a -> Int -> a -> m ()
writeSmallArray SmallMutableArray RealWorld s
SmallMutableArray (PrimState IO) s
arr Int
i s
state
Int -> m ()
go (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
Int -> m ()
go Int
0
SmallArray s
workerStates <- IO (SmallArray s) -> m (SmallArray s)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SmallArray s) -> m (SmallArray s))
-> IO (SmallArray s) -> m (SmallArray s)
forall a b. (a -> b) -> a -> b
$ SmallMutableArray (PrimState IO) s -> IO (SmallArray s)
forall (m :: * -> *) a.
PrimMonad m =>
SmallMutableArray (PrimState m) a -> m (SmallArray a)
unsafeFreezeSmallArray SmallMutableArray RealWorld s
SmallMutableArray (PrimState IO) s
arr
IORef Bool
mutex <- IO (IORef Bool) -> m (IORef Bool)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef Bool) -> m (IORef Bool))
-> IO (IORef Bool) -> m (IORef Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
WorkerStates s -> m (WorkerStates s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure
WorkerStates :: forall s. Comp -> SmallArray s -> IORef Bool -> WorkerStates s
WorkerStates
{_workerStatesComp :: Comp
_workerStatesComp = Comp
comp, _workerStatesArray :: SmallArray s
_workerStatesArray = SmallArray s
workerStates, _workerStatesMutex :: IORef Bool
_workerStatesMutex = IORef Bool
mutex}
withSchedulerWSInternal ::
MonadUnliftIO m
=> (Comp -> (Scheduler m a -> t) -> m b)
-> WorkerStates s
-> (SchedulerWS s m a -> t)
-> m b
withSchedulerWSInternal :: (Comp -> (Scheduler m a -> t) -> m b)
-> WorkerStates s -> (SchedulerWS s m a -> t) -> m b
withSchedulerWSInternal Comp -> (Scheduler m a -> t) -> m b
withScheduler' WorkerStates s
states SchedulerWS s m a -> t
action =
((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO Bool -> (Bool -> IO ()) -> (Bool -> IO b) -> IO b
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO Bool
lockState Bool -> IO ()
unlockState (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (Bool -> m b) -> Bool -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> m b
runSchedulerWS)
where
mutex :: IORef Bool
mutex = WorkerStates s -> IORef Bool
forall s. WorkerStates s -> IORef Bool
_workerStatesMutex WorkerStates s
states
lockState :: IO Bool
lockState = IORef Bool -> (Bool -> (Bool, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Bool
mutex ((Bool -> (Bool, Bool)) -> IO Bool)
-> (Bool -> (Bool, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ (,) Bool
True
unlockState :: Bool -> IO ()
unlockState Bool
wasLocked
| Bool
wasLocked = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
| Bool
otherwise = IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef Bool
mutex Bool
False
runSchedulerWS :: Bool -> m b
runSchedulerWS Bool
isLocked
| Bool
isLocked = IO b -> m b
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO b -> m b) -> IO b -> m b
forall a b. (a -> b) -> a -> b
$ MutexException -> IO b
forall e a. Exception e => e -> IO a
throwIO MutexException
MutexException
| Bool
otherwise =
Comp -> (Scheduler m a -> t) -> m b
withScheduler' (WorkerStates s -> Comp
forall s. WorkerStates s -> Comp
_workerStatesComp WorkerStates s
states) ((Scheduler m a -> t) -> m b) -> (Scheduler m a -> t) -> m b
forall a b. (a -> b) -> a -> b
$ \Scheduler m a
scheduler ->
SchedulerWS s m a -> t
action (WorkerStates s -> Scheduler m a -> SchedulerWS s m a
forall s (m :: * -> *) a.
WorkerStates s -> Scheduler m a -> SchedulerWS s m a
SchedulerWS WorkerStates s
states Scheduler m a
scheduler)
trivialScheduler_ :: Applicative f => Scheduler f ()
trivialScheduler_ :: Scheduler f ()
trivialScheduler_ =
Scheduler :: forall (m :: * -> *) a.
Int
-> ((WorkerId -> m a) -> m ())
-> (Early a -> m a)
-> m (Results a)
-> m (Maybe (Results a))
-> m BatchId
-> (BatchId -> Early a -> m Bool)
-> m (Maybe (Early a))
-> Scheduler m a
Scheduler
{ _numWorkers :: Int
_numWorkers = Int
1
, _scheduleWorkId :: (WorkerId -> f ()) -> f ()
_scheduleWorkId = \WorkerId -> f ()
f -> WorkerId -> f ()
f (Int -> WorkerId
WorkerId Int
0)
, _terminate :: Early () -> f ()
_terminate = f () -> Early () -> f ()
forall a b. a -> b -> a
const (f () -> Early () -> f ()) -> f () -> Early () -> f ()
forall a b. (a -> b) -> a -> b
$ () -> f ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
, _waitForCurrentBatch :: f (Results ())
_waitForCurrentBatch = Results () -> f (Results ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results () -> f (Results ())) -> Results () -> f (Results ())
forall a b. (a -> b) -> a -> b
$ [()] -> Results ()
forall a. [a] -> Results a
Finished []
, _earlyResults :: f (Maybe (Results ()))
_earlyResults = Maybe (Results ()) -> f (Maybe (Results ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Results ())
forall a. Maybe a
Nothing
, _currentBatchId :: f BatchId
_currentBatchId = BatchId -> f BatchId
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BatchId -> f BatchId) -> BatchId -> f BatchId
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
, _cancelBatch :: BatchId -> Early () -> f Bool
_cancelBatch = \BatchId
_ Early ()
_ -> Bool -> f Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
, _batchEarly :: f (Maybe (Early ()))
_batchEarly = Maybe (Early ()) -> f (Maybe (Early ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Early ())
forall a. Maybe a
Nothing
}
withTrivialSchedulerR :: PrimMonad m => (Scheduler m a -> m b) -> m (Results a)
withTrivialSchedulerR :: (Scheduler m a -> m b) -> m (Results a)
withTrivialSchedulerR Scheduler m a -> m b
action = do
MutVar (PrimState m) [a]
resVar <- [a] -> m (MutVar (PrimState m) [a])
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar []
MutVar (PrimState m) BatchId
batchVar <- BatchId -> m (MutVar (PrimState m) BatchId)
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar (BatchId -> m (MutVar (PrimState m) BatchId))
-> BatchId -> m (MutVar (PrimState m) BatchId)
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
MutVar (PrimState m) (Maybe (Results a))
finResVar <- Maybe (Results a) -> m (MutVar (PrimState m) (Maybe (Results a)))
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar Maybe (Results a)
forall a. Maybe a
Nothing
MutVar (PrimState m) (Maybe (Early a))
batchEarlyVar <- Maybe (Early a) -> m (MutVar (PrimState m) (Maybe (Early a)))
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar Maybe (Early a)
forall a. Maybe a
Nothing
let bumpCurrentBatchId :: m ()
bumpCurrentBatchId = MutVar (PrimState m) BatchId -> (BatchId -> (BatchId, ())) -> m ()
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar (PrimState m) BatchId
batchVar (\(BatchId Int
x) -> (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), ()))
bumpBatchId :: BatchId -> m Bool
bumpBatchId (BatchId Int
c) =
MutVar (PrimState m) BatchId
-> (BatchId -> (BatchId, Bool)) -> m Bool
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar (PrimState m) BatchId
batchVar ((BatchId -> (BatchId, Bool)) -> m Bool)
-> (BatchId -> (BatchId, Bool)) -> m Bool
forall a b. (a -> b) -> a -> b
$ \b :: BatchId
b@(BatchId Int
x) ->
if Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
c
then (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Bool
True)
else (BatchId
b, Bool
False)
takeBatchEarly :: m (Maybe (Early a))
takeBatchEarly = MutVar (PrimState m) (Maybe (Early a))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> m (Maybe (Early a))
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar (PrimState m) (Maybe (Early a))
batchEarlyVar ((Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> m (Maybe (Early a)))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> m (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ \Maybe (Early a)
mEarly -> (Maybe (Early a)
forall a. Maybe a
Nothing, Maybe (Early a)
mEarly)
takeResults :: m [a]
takeResults = MutVar (PrimState m) [a] -> ([a] -> ([a], [a])) -> m [a]
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar (PrimState m) [a]
resVar (([a] -> ([a], [a])) -> m [a]) -> ([a] -> ([a], [a])) -> m [a]
forall a b. (a -> b) -> a -> b
$ \[a]
res -> ([], [a]
res)
b
_ <-
Scheduler m a -> m b
action (Scheduler m a -> m b) -> Scheduler m a -> m b
forall a b. (a -> b) -> a -> b
$
Scheduler :: forall (m :: * -> *) a.
Int
-> ((WorkerId -> m a) -> m ())
-> (Early a -> m a)
-> m (Results a)
-> m (Maybe (Results a))
-> m BatchId
-> (BatchId -> Early a -> m Bool)
-> m (Maybe (Early a))
-> Scheduler m a
Scheduler
{ _numWorkers :: Int
_numWorkers = Int
1
, _scheduleWorkId :: (WorkerId -> m a) -> m ()
_scheduleWorkId =
\WorkerId -> m a
f -> do
a
r <- WorkerId -> m a
f (Int -> WorkerId
WorkerId Int
0)
a
r a -> m () -> m ()
`seq` MutVar (PrimState m) [a] -> ([a] -> ([a], ())) -> m ()
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar (PrimState m) [a]
resVar (\[a]
rs -> (a
r a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
rs, ()))
, _terminate :: Early a -> m a
_terminate =
\Early a
early -> do
m ()
bumpCurrentBatchId
Results a
finishEarly <- Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early) m [a]
takeResults
Early a -> a
forall a. Early a -> a
unEarly Early a
early a -> m () -> m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ MutVar (PrimState m) (Maybe (Results a))
-> Maybe (Results a) -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (Maybe (Results a))
finResVar (Results a -> Maybe (Results a)
forall a. a -> Maybe a
Just Results a
finishEarly)
, _waitForCurrentBatch :: m (Results a)
_waitForCurrentBatch =
do Maybe (Early a)
mEarly <- m (Maybe (Early a))
takeBatchEarly
m ()
bumpCurrentBatchId
Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (m [a] -> m (Results a)) -> ([a] -> m [a]) -> [a] -> m (Results a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> m [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> m (Results a)) -> m [a] -> m (Results a)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< m [a]
takeResults
, _earlyResults :: m (Maybe (Results a))
_earlyResults = MutVar (PrimState m) (Maybe (Results a)) -> m (Maybe (Results a))
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (Maybe (Results a))
finResVar
, _currentBatchId :: m BatchId
_currentBatchId = MutVar (PrimState m) BatchId -> m BatchId
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) BatchId
batchVar
, _batchEarly :: m (Maybe (Early a))
_batchEarly = m (Maybe (Early a))
takeBatchEarly
, _cancelBatch :: BatchId -> Early a -> m Bool
_cancelBatch =
\BatchId
batchId Early a
early -> do
Bool
b <- BatchId -> m Bool
bumpBatchId BatchId
batchId
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ MutVar (PrimState m) (Maybe (Early a)) -> Maybe (Early a) -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (Maybe (Early a))
batchEarlyVar (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early)
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
b
}
MutVar (PrimState m) (Maybe (Results a)) -> m (Maybe (Results a))
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (Maybe (Results a))
finResVar m (Maybe (Results a))
-> (Maybe (Results a) -> m (Results a)) -> m (Results a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Results a
rs -> Results a -> m (Results a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results a -> m (Results a)) -> Results a -> m (Results a)
forall a b. (a -> b) -> a -> b
$ Results a -> Results a
forall a. Results a -> Results a
reverseResults Results a
rs
Maybe (Results a)
Nothing -> do
Maybe (Early a)
mEarly <- m (Maybe (Early a))
takeBatchEarly
Results a -> Results a
forall a. Results a -> Results a
reverseResults (Results a -> Results a) -> m (Results a) -> m (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly m [a]
takeResults
withTrivialSchedulerRIO :: MonadUnliftIO m => (Scheduler m a -> m b) -> m (Results a)
withTrivialSchedulerRIO :: (Scheduler m a -> m b) -> m (Results a)
withTrivialSchedulerRIO Scheduler m a -> m b
action = do
IORef [a]
resRef <- IO (IORef [a]) -> m (IORef [a])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef [a]) -> m (IORef [a]))
-> IO (IORef [a]) -> m (IORef [a])
forall a b. (a -> b) -> a -> b
$ [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef []
IORef BatchId
batchRef <- IO (IORef BatchId) -> m (IORef BatchId)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef BatchId) -> m (IORef BatchId))
-> IO (IORef BatchId) -> m (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ BatchId -> IO (IORef BatchId)
forall a. a -> IO (IORef a)
newIORef (BatchId -> IO (IORef BatchId)) -> BatchId -> IO (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
IORef (Maybe (Results a))
finResRef <- IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a))))
-> IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Results a) -> IO (IORef (Maybe (Results a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Results a)
forall a. Maybe a
Nothing
IORef (Maybe (Early a))
batchEarlyRef <- IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a))))
-> IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Early a) -> IO (IORef (Maybe (Early a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Early a)
forall a. Maybe a
Nothing
let bumpCurrentBatchId :: IO ()
bumpCurrentBatchId = IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (IORef BatchId -> IORef Int
coerce IORef BatchId
batchRef) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
1 :: Int))
bumpBatchId :: BatchId -> IO Bool
bumpBatchId (BatchId Int
c) =
IORef BatchId -> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef BatchId
batchRef ((BatchId -> (BatchId, Bool)) -> IO Bool)
-> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \b :: BatchId
b@(BatchId Int
x) ->
if Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
c
then (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Bool
True)
else (BatchId
b, Bool
False)
takeBatchEarly :: IO (Maybe (Early a))
takeBatchEarly = IORef (Maybe (Early a))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Maybe (Early a))
batchEarlyRef ((Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a)))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ \Maybe (Early a)
mEarly -> (Maybe (Early a)
forall a. Maybe a
Nothing, Maybe (Early a)
mEarly)
takeResults :: IO [a]
takeResults = IORef [a] -> ([a] -> ([a], [a])) -> IO [a]
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [a]
resRef (([a] -> ([a], [a])) -> IO [a]) -> ([a] -> ([a], [a])) -> IO [a]
forall a b. (a -> b) -> a -> b
$ \[a]
res -> ([], [a]
res)
scheduler :: Scheduler m a
scheduler =
Scheduler :: forall (m :: * -> *) a.
Int
-> ((WorkerId -> m a) -> m ())
-> (Early a -> m a)
-> m (Results a)
-> m (Maybe (Results a))
-> m BatchId
-> (BatchId -> Early a -> m Bool)
-> m (Maybe (Early a))
-> Scheduler m a
Scheduler
{ _numWorkers :: Int
_numWorkers = Int
1
, _scheduleWorkId :: (WorkerId -> m a) -> m ()
_scheduleWorkId =
\WorkerId -> m a
f -> do
a
r <- WorkerId -> m a
f (Int -> WorkerId
WorkerId Int
0)
a
r a -> m () -> m ()
`seq` IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef [a] -> ([a] -> [a]) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef [a]
resRef (a
r a -> [a] -> [a]
forall a. a -> [a] -> [a]
:))
, _terminate :: Early a -> m a
_terminate =
\ !Early a
early ->
IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ do
IO ()
bumpCurrentBatchId
Results a
finishEarly <- Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early) IO [a]
takeResults
IORef (Maybe (Results a)) -> Maybe (Results a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Results a))
finResRef (Results a -> Maybe (Results a)
forall a. a -> Maybe a
Just Results a
finishEarly)
TerminateEarlyException -> IO a
forall e a. Exception e => e -> IO a
throwIO TerminateEarlyException
TerminateEarlyException
, _waitForCurrentBatch :: m (Results a)
_waitForCurrentBatch =
IO (Results a) -> m (Results a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Results a) -> m (Results a))
-> IO (Results a) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ do
IO ()
bumpCurrentBatchId
Maybe (Early a)
mEarly <- IO (Maybe (Early a))
takeBatchEarly
Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (IO [a] -> IO (Results a))
-> ([a] -> IO [a]) -> [a] -> IO (Results a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> IO [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> IO (Results a)) -> IO [a] -> IO (Results a)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO [a]
takeResults
, _earlyResults :: m (Maybe (Results a))
_earlyResults = IO (Maybe (Results a)) -> m (Maybe (Results a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Maybe (Results a)) -> IO (Maybe (Results a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Results a))
finResRef)
, _currentBatchId :: m BatchId
_currentBatchId = IO BatchId -> m BatchId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef BatchId -> IO BatchId
forall a. IORef a -> IO a
readIORef IORef BatchId
batchRef)
, _batchEarly :: m (Maybe (Early a))
_batchEarly = IO (Maybe (Early a)) -> m (Maybe (Early a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Maybe (Early a))
takeBatchEarly
, _cancelBatch :: BatchId -> Early a -> m Bool
_cancelBatch =
\BatchId
batchId Early a
early -> IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
Bool
b <- BatchId -> IO Bool
bumpBatchId BatchId
batchId
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
b (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe (Early a)) -> Maybe (Early a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Early a))
batchEarlyRef (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early)
Bool -> IO Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
b
}
Either TerminateEarlyException b
_ :: Either TerminateEarlyException b <- ((forall a. m a -> IO a) -> IO (Either TerminateEarlyException b))
-> m (Either TerminateEarlyException b)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Either TerminateEarlyException b))
-> m (Either TerminateEarlyException b))
-> ((forall a. m a -> IO a)
-> IO (Either TerminateEarlyException b))
-> m (Either TerminateEarlyException b)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO b -> IO (Either TerminateEarlyException b)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO b -> IO (Either TerminateEarlyException b))
-> IO b -> IO (Either TerminateEarlyException b)
forall a b. (a -> b) -> a -> b
$ m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> m b -> IO b
forall a b. (a -> b) -> a -> b
$ Scheduler m a -> m b
action Scheduler m a
scheduler
IO (Maybe (Results a)) -> m (Maybe (Results a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Maybe (Results a)) -> IO (Maybe (Results a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Results a))
finResRef) m (Maybe (Results a))
-> (Maybe (Results a) -> m (Results a)) -> m (Results a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Just Results a
rs -> Results a -> m (Results a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Results a
rs
Maybe (Results a)
Nothing ->
IO (Results a) -> m (Results a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Results a) -> m (Results a))
-> IO (Results a) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ do
Maybe (Early a)
mEarly <- IO (Maybe (Early a))
takeBatchEarly
Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly IO [a]
takeResults
{-# INLINEABLE withTrivialSchedulerRIO #-}
traverse_ :: (Applicative f, Foldable t) => (a -> f ()) -> t a -> f ()
traverse_ :: (a -> f ()) -> t a -> f ()
traverse_ a -> f ()
f = (f () -> a -> f ()) -> f () -> t a -> f ()
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
F.foldl' (\f ()
c a
a -> f ()
c f () -> f () -> f ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> a -> f ()
f a
a) (() -> f ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
{-# INLINE traverse_ #-}
scheduleJobs :: MonadIO m => Jobs m a -> (WorkerId -> m a) -> m ()
scheduleJobs :: Jobs m a -> (WorkerId -> m a) -> m ()
scheduleJobs = (((a -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m a) -> m ()
forall (m :: * -> *) b a.
MonadIO m =>
(((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobsWith ((a -> m ()) -> WorkerId -> m ()) -> m (Job m a)
forall (m :: * -> *) a.
MonadIO m =>
((a -> m ()) -> WorkerId -> m ()) -> m (Job m a)
mkJob
{-# INLINEABLE scheduleJobs #-}
scheduleJobs_ :: MonadIO m => Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ :: Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ = (((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
forall (m :: * -> *) b a.
MonadIO m =>
(((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobsWith (\(b -> m ()) -> WorkerId -> m ()
job -> Job m a -> m (Job m a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((WorkerId -> m ()) -> Job m a
forall (m :: * -> *) a. (WorkerId -> m ()) -> Job m a
Job_ (m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> (WorkerId -> m ()) -> WorkerId -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (b -> m ()) -> WorkerId -> m ()
job (\b
_ -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()))))
{-# INLINEABLE scheduleJobs_ #-}
scheduleJobsWith ::
MonadIO m
=> (((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a
-> (WorkerId -> m b)
-> m ()
scheduleJobsWith :: (((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobsWith ((b -> m ()) -> WorkerId -> m ()) -> m (Job m a)
mkJob' Jobs {Int
MVar SchedulerStatus
PVar Int RealWorld
JQueue m a
jobsSchedulerStatus :: forall (m :: * -> *) a. Jobs m a -> MVar SchedulerStatus
jobsQueueCount :: forall (m :: * -> *) a. Jobs m a -> PVar Int RealWorld
jobsQueue :: forall (m :: * -> *) a. Jobs m a -> JQueue m a
jobsNumWorkers :: forall (m :: * -> *) a. Jobs m a -> Int
jobsSchedulerStatus :: MVar SchedulerStatus
jobsQueueCount :: PVar Int RealWorld
jobsQueue :: JQueue m a
jobsNumWorkers :: Int
..} WorkerId -> m b
action = do
Job m a
job <-
((b -> m ()) -> WorkerId -> m ()) -> m (Job m a)
mkJob' (((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> ((b -> m ()) -> WorkerId -> m ()) -> m (Job m a)
forall a b. (a -> b) -> a -> b
$ \b -> m ()
storeResult WorkerId
wid -> do
b
res <- WorkerId -> m b
action WorkerId
wid
b
res b -> m () -> m ()
`seq` b -> m ()
storeResult b
res
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicAddIntPVar PVar Int RealWorld
jobsQueueCount Int
1
JQueue m a -> Job m a -> m ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> Job m a -> m ()
pushJQueue JQueue m a
jobsQueue Job m a
job
{-# INLINEABLE scheduleJobsWith #-}
runWorker ::
MonadUnliftIO m
=> (forall b. m b -> IO b)
-> (forall c. IO c -> IO c)
-> WorkerId
-> Jobs m a
-> IO ()
runWorker :: (forall b. m b -> IO b)
-> (forall c. IO c -> IO c) -> WorkerId -> Jobs m a -> IO ()
runWorker forall b. m b -> IO b
run forall c. IO c -> IO c
unmask WorkerId
wId Jobs {JQueue m a
jobsQueue :: JQueue m a
jobsQueue :: forall (m :: * -> *) a. Jobs m a -> JQueue m a
jobsQueue, PVar Int RealWorld
jobsQueueCount :: PVar Int RealWorld
jobsQueueCount :: forall (m :: * -> *) a. Jobs m a -> PVar Int RealWorld
jobsQueueCount, MVar SchedulerStatus
jobsSchedulerStatus :: MVar SchedulerStatus
jobsSchedulerStatus :: forall (m :: * -> *) a. Jobs m a -> MVar SchedulerStatus
jobsSchedulerStatus} = IO ()
go
where
onBlockedMVar :: Either SomeException () -> IO ()
onBlockedMVar Either SomeException ()
eUnblocked =
case Either SomeException ()
eUnblocked of
Right () -> IO ()
go
Left SomeException
uExc
| Just WorkerTerminateException
WorkerTerminateException <- SomeException -> Maybe WorkerTerminateException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
uExc -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Left SomeException
uExc
| Just CancelBatchException
CancelBatchException <- SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
uExc -> IO ()
go
Left SomeException
uExc -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
uExc
go :: IO ()
go = do
Either SomeException Int
eRes <- IO Int -> IO (Either SomeException Int)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO Int -> IO (Either SomeException Int))
-> IO Int -> IO (Either SomeException Int)
forall a b. (a -> b) -> a -> b
$ do
WorkerId -> m ()
job <- m (WorkerId -> m ()) -> IO (WorkerId -> m ())
forall b. m b -> IO b
run (JQueue m a -> m (WorkerId -> m ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
JQueue m a -> m (WorkerId -> m ())
popJQueue JQueue m a
jobsQueue)
IO Int -> IO Int
forall c. IO c -> IO c
unmask (m () -> IO ()
forall b. m b -> IO b
run (WorkerId -> m ()
job WorkerId
wId) IO () -> IO Int -> IO Int
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicSubIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
case Either SomeException Int
eRes of
Right Int
1 -> IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (MVar SchedulerStatus -> SchedulerStatus -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar SchedulerStatus
jobsSchedulerStatus SchedulerStatus
SchedulerIdle) IO (Either SomeException ())
-> (Either SomeException () -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either SomeException () -> IO ()
onBlockedMVar
Right Int
_ -> IO ()
go
Left SomeException
exc
| Just WorkerTerminateException
WorkerTerminateException <- SomeException -> Maybe WorkerTerminateException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
exc -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Left SomeException
exc
| Just CancelBatchException
CancelBatchException <- SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
exc -> IO ()
go
Left SomeException
exc -> do
Either SomeException ()
eUnblocked <-
IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ MVar SchedulerStatus -> SchedulerStatus -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar SchedulerStatus
jobsSchedulerStatus (WorkerException -> SchedulerStatus
SchedulerWorkerException (SomeException -> WorkerException
WorkerException SomeException
exc))
Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (SomeException -> Bool
forall e. Exception e => e -> Bool
isSyncException SomeException
exc) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
Either SomeException () -> IO ()
onBlockedMVar Either SomeException ()
eUnblocked
{-# INLINEABLE runWorker #-}
initScheduler ::
MonadIO m
=> Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> m (Jobs m a, [ThreadId] -> Scheduler m a)
initScheduler :: Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> m (Jobs m a, [ThreadId] -> Scheduler m a)
initScheduler Comp
comp Jobs m a -> (WorkerId -> m a) -> m ()
submitWork JQueue m a -> m [a]
collect = do
Int
jobsNumWorkers <- Comp -> m Int
forall (m :: * -> *). MonadIO m => Comp -> m Int
getCompWorkers Comp
comp
JQueue m a
jobsQueue <- m (JQueue m a)
forall (m :: * -> *) a. MonadIO m => m (JQueue m a)
newJQueue
PVar Int RealWorld
jobsQueueCount <- IO (PVar Int RealWorld) -> m (PVar Int RealWorld)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (PVar Int RealWorld) -> m (PVar Int RealWorld))
-> IO (PVar Int RealWorld) -> m (PVar Int RealWorld)
forall a b. (a -> b) -> a -> b
$ Int -> IO (PVar Int RealWorld)
forall s (m :: * -> *) a.
(MonadPrim s m, Prim a) =>
a -> m (PVar a s)
newPVar Int
1
MVar SchedulerStatus
jobsSchedulerStatus <- IO (MVar SchedulerStatus) -> m (MVar SchedulerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar SchedulerStatus)
forall a. IO (MVar a)
newEmptyMVar
IORef (Maybe (Results a))
earlyTerminationResultRef <- IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a))))
-> IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Results a) -> IO (IORef (Maybe (Results a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Results a)
forall a. Maybe a
Nothing
IORef BatchId
batchIdRef <- IO (IORef BatchId) -> m (IORef BatchId)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef BatchId) -> m (IORef BatchId))
-> IO (IORef BatchId) -> m (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ BatchId -> IO (IORef BatchId)
forall a. a -> IO (IORef a)
newIORef (BatchId -> IO (IORef BatchId)) -> BatchId -> IO (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
IORef (Maybe (Early a))
batchEarlyRef <- IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a))))
-> IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Early a) -> IO (IORef (Maybe (Early a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Early a)
forall a. Maybe a
Nothing
let jobs :: Jobs m a
jobs =
Jobs :: forall (m :: * -> *) a.
Int
-> JQueue m a
-> PVar Int RealWorld
-> MVar SchedulerStatus
-> Jobs m a
Jobs
{ jobsNumWorkers :: Int
jobsNumWorkers = Int
jobsNumWorkers
, jobsQueue :: JQueue m a
jobsQueue = JQueue m a
jobsQueue
, jobsQueueCount :: PVar Int RealWorld
jobsQueueCount = PVar Int RealWorld
jobsQueueCount
, jobsSchedulerStatus :: MVar SchedulerStatus
jobsSchedulerStatus = MVar SchedulerStatus
jobsSchedulerStatus
}
bumpCurrentBatchId :: IO ()
bumpCurrentBatchId = IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (IORef BatchId -> IORef Int
coerce IORef BatchId
batchIdRef) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
1 :: Int))
bumpBatchId :: BatchId -> IO Bool
bumpBatchId (BatchId Int
c) =
IORef BatchId -> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef BatchId
batchIdRef ((BatchId -> (BatchId, Bool)) -> IO Bool)
-> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \b :: BatchId
b@(BatchId Int
x) ->
if Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
c
then (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Bool
True)
else (BatchId
b, Bool
False)
mkScheduler :: t ThreadId -> Scheduler m a
mkScheduler t ThreadId
tids =
Scheduler :: forall (m :: * -> *) a.
Int
-> ((WorkerId -> m a) -> m ())
-> (Early a -> m a)
-> m (Results a)
-> m (Maybe (Results a))
-> m BatchId
-> (BatchId -> Early a -> m Bool)
-> m (Maybe (Early a))
-> Scheduler m a
Scheduler
{ _numWorkers :: Int
_numWorkers = Int
jobsNumWorkers
, _scheduleWorkId :: (WorkerId -> m a) -> m ()
_scheduleWorkId = Jobs m a -> (WorkerId -> m a) -> m ()
submitWork Jobs m a
jobs
, _terminate :: Early a -> m a
_terminate =
\Early a
early -> do
Results a
finishEarly <-
case Early a
early of
Early a
r -> [a] -> a -> Results a
forall a. [a] -> a -> Results a
FinishedEarly ([a] -> a -> Results a) -> m [a] -> m (a -> Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> JQueue m a -> m [a]
collect JQueue m a
jobsQueue m (a -> Results a) -> m a -> m (Results a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
EarlyWith a
r -> Results a -> m (Results a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results a -> m (Results a)) -> Results a -> m (Results a)
forall a b. (a -> b) -> a -> b
$ a -> Results a
forall a. a -> Results a
FinishedEarlyWith a
r
IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ do
IO ()
bumpCurrentBatchId
IORef (Maybe (Results a)) -> Maybe (Results a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Results a))
earlyTerminationResultRef (Maybe (Results a) -> IO ()) -> Maybe (Results a) -> IO ()
forall a b. (a -> b) -> a -> b
$ Results a -> Maybe (Results a)
forall a. a -> Maybe a
Just Results a
finishEarly
TerminateEarlyException -> IO a
forall e a. Exception e => e -> IO a
throwIO TerminateEarlyException
TerminateEarlyException
, _waitForCurrentBatch :: m (Results a)
_waitForCurrentBatch =
do Jobs m a -> (WorkerId -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ Jobs m a
jobs (\WorkerId
_ -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicSubIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
JQueue m a -> m ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
unblockPopJQueue JQueue m a
jobsQueue
SchedulerStatus
status <- IO SchedulerStatus -> m SchedulerStatus
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SchedulerStatus -> m SchedulerStatus)
-> IO SchedulerStatus -> m SchedulerStatus
forall a b. (a -> b) -> a -> b
$ MVar SchedulerStatus -> IO SchedulerStatus
forall a. MVar a -> IO a
takeMVar MVar SchedulerStatus
jobsSchedulerStatus
Maybe (Early a)
mEarly <- IO (Maybe (Early a)) -> m (Maybe (Early a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Early a)) -> m (Maybe (Early a)))
-> IO (Maybe (Early a)) -> m (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ IORef (Maybe (Early a))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Maybe (Early a))
batchEarlyRef ((Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a)))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ \Maybe (Early a)
mEarly -> (Maybe (Early a)
forall a. Maybe a
Nothing, Maybe (Early a)
mEarly)
Results a
rs <-
case SchedulerStatus
status of
SchedulerWorkerException (WorkerException exc) ->
case SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc of
Just CancelBatchException
CancelBatchException -> do
()
_ <- JQueue m a -> m ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
clearPendingJQueue JQueue m a
jobsQueue
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
(ThreadId -> IO ()) -> t ThreadId -> IO ()
forall (f :: * -> *) (t :: * -> *) a.
(Applicative f, Foldable t) =>
(a -> f ()) -> t a -> f ()
traverse_ (ThreadId -> SomeAsyncException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` CancelBatchException -> SomeAsyncException
forall e. Exception e => e -> SomeAsyncException
SomeAsyncException CancelBatchException
CancelBatchException) t ThreadId
tids
Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (m [a] -> m (Results a)) -> ([a] -> m [a]) -> [a] -> m (Results a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> m [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> m (Results a)) -> m [a] -> m (Results a)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< JQueue m a -> m [a]
collect JQueue m a
jobsQueue
Maybe CancelBatchException
Nothing -> IO (Results a) -> m (Results a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Results a) -> m (Results a))
-> IO (Results a) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Results a)
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
SchedulerStatus
SchedulerIdle -> do
JQueue m a -> m ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
blockPopJQueue JQueue m a
jobsQueue
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
bumpCurrentBatchId
[a]
res <- JQueue m a -> m [a]
collect JQueue m a
jobsQueue
[a]
res [a] -> m (Results a) -> m (Results a)
`seq` Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly ([a] -> m [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure [a]
res)
Results a
rs Results a -> m () -> m (Results a)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (PVar Int RealWorld -> Int -> IO ()
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m ()
atomicWriteIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
, _earlyResults :: m (Maybe (Results a))
_earlyResults = IO (Maybe (Results a)) -> m (Maybe (Results a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Maybe (Results a)) -> IO (Maybe (Results a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Results a))
earlyTerminationResultRef)
, _currentBatchId :: m BatchId
_currentBatchId = IO BatchId -> m BatchId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef BatchId -> IO BatchId
forall a. IORef a -> IO a
readIORef IORef BatchId
batchIdRef)
, _batchEarly :: m (Maybe (Early a))
_batchEarly = IO (Maybe (Early a)) -> m (Maybe (Early a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Maybe (Early a)) -> IO (Maybe (Early a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Early a))
batchEarlyRef)
, _cancelBatch :: BatchId -> Early a -> m Bool
_cancelBatch =
\BatchId
batchId Early a
early -> do
Bool
b <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ BatchId -> IO Bool
bumpBatchId BatchId
batchId
Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
JQueue m a -> m ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
blockPopJQueue JQueue m a
jobsQueue
IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
IORef (Maybe (Early a)) -> Maybe (Early a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Early a))
batchEarlyRef (Maybe (Early a) -> IO ()) -> Maybe (Early a) -> IO ()
forall a b. (a -> b) -> a -> b
$ Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early
CancelBatchException -> IO ()
forall e a. Exception e => e -> IO a
throwIO CancelBatchException
CancelBatchException
Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
b
}
(Jobs m a, [ThreadId] -> Scheduler m a)
-> m (Jobs m a, [ThreadId] -> Scheduler m a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Jobs m a
jobs, [ThreadId] -> Scheduler m a
forall (t :: * -> *). Foldable t => t ThreadId -> Scheduler m a
mkScheduler)
{-# INLINEABLE initScheduler #-}
withSchedulerInternal ::
MonadUnliftIO m
=> Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> (Scheduler m a -> m b)
-> m (Results a)
withSchedulerInternal :: Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> (Scheduler m a -> m b)
-> m (Results a)
withSchedulerInternal Comp
comp Jobs m a -> (WorkerId -> m a) -> m ()
submitWork JQueue m a -> m [a]
collect Scheduler m a -> m b
onScheduler = do
(jobs :: Jobs m a
jobs@Jobs {Int
MVar SchedulerStatus
PVar Int RealWorld
JQueue m a
jobsSchedulerStatus :: MVar SchedulerStatus
jobsQueueCount :: PVar Int RealWorld
jobsQueue :: JQueue m a
jobsNumWorkers :: Int
jobsSchedulerStatus :: forall (m :: * -> *) a. Jobs m a -> MVar SchedulerStatus
jobsQueueCount :: forall (m :: * -> *) a. Jobs m a -> PVar Int RealWorld
jobsQueue :: forall (m :: * -> *) a. Jobs m a -> JQueue m a
jobsNumWorkers :: forall (m :: * -> *) a. Jobs m a -> Int
..}, [ThreadId] -> Scheduler m a
mkScheduler) <- Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> m (Jobs m a, [ThreadId] -> Scheduler m a)
forall (m :: * -> *) a.
MonadIO m =>
Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> m (Jobs m a, [ThreadId] -> Scheduler m a)
initScheduler Comp
comp Jobs m a -> (WorkerId -> m a) -> m ()
submitWork JQueue m a -> m [a]
collect
((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a))
-> ((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
IO [ThreadId]
-> ([ThreadId] -> IO ())
-> ([ThreadId] -> IO (Results a))
-> IO (Results a)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (m [ThreadId] -> IO [ThreadId]
forall a. m a -> IO a
run (Jobs m a -> Comp -> m [ThreadId]
forall (m :: * -> *) a.
MonadUnliftIO m =>
Jobs m a -> Comp -> m [ThreadId]
spawnWorkers Jobs m a
jobs Comp
comp)) [ThreadId] -> IO ()
terminateWorkers (([ThreadId] -> IO (Results a)) -> IO (Results a))
-> ([ThreadId] -> IO (Results a)) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ \[ThreadId]
tids ->
let scheduler :: Scheduler m a
scheduler = [ThreadId] -> Scheduler m a
mkScheduler [ThreadId]
tids
readEarlyTermination :: m (Results a)
readEarlyTermination =
Scheduler m a -> m (Maybe (Results a))
forall (m :: * -> *) a. Scheduler m a -> m (Maybe (Results a))
_earlyResults Scheduler m a
scheduler m (Maybe (Results a))
-> (Maybe (Results a) -> m (Results a)) -> m (Results a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe (Results a)
Nothing -> [Char] -> m (Results a)
forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible: uninitialized early termination value"
Just Results a
rs -> Results a -> m (Results a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Results a
rs
in IO b -> IO (Either TerminateEarlyException b)
forall e a. Exception e => IO a -> IO (Either e a)
try (m b -> IO b
forall a. m a -> IO a
run (Scheduler m a -> m b
onScheduler Scheduler m a
scheduler)) IO (Either TerminateEarlyException b)
-> (Either TerminateEarlyException b -> IO (Results a))
-> IO (Results a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Left TerminateEarlyException
TerminateEarlyException -> m (Results a) -> IO (Results a)
forall a. m a -> IO a
run m (Results a)
readEarlyTermination
Right b
_ -> do
m () -> IO ()
forall a. m a -> IO a
run (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ Jobs m a -> (WorkerId -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ Jobs m a
jobs (\WorkerId
_ -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicSubIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
m () -> IO ()
forall a. m a -> IO a
run (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ JQueue m a -> m ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
unblockPopJQueue JQueue m a
jobsQueue
SchedulerStatus
status <- MVar SchedulerStatus -> IO SchedulerStatus
forall a. MVar a -> IO a
takeMVar MVar SchedulerStatus
jobsSchedulerStatus
case SchedulerStatus
status of
SchedulerWorkerException (WorkerException SomeException
exc)
| Just TerminateEarlyException
TerminateEarlyException <- SomeException -> Maybe TerminateEarlyException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc -> m (Results a) -> IO (Results a)
forall a. m a -> IO a
run m (Results a)
readEarlyTermination
| Just CancelBatchException
CancelBatchException <- SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc ->
m (Results a) -> IO (Results a)
forall a. m a -> IO a
run (m (Results a) -> IO (Results a))
-> m (Results a) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ do
Maybe (Early a)
mEarly <- Scheduler m a -> m (Maybe (Early a))
forall (m :: * -> *) a. Scheduler m a -> m (Maybe (Early a))
_batchEarly Scheduler m a
scheduler
Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (JQueue m a -> m [a]
collect JQueue m a
jobsQueue)
| Bool
otherwise -> SomeException -> IO (Results a)
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
SchedulerStatus
SchedulerIdle ->
m (Results a) -> IO (Results a)
forall a. m a -> IO a
run (m (Results a) -> IO (Results a))
-> m (Results a) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ do
Maybe (Early a)
mEarly <- Scheduler m a -> m (Maybe (Early a))
forall (m :: * -> *) a. Scheduler m a -> m (Maybe (Early a))
_batchEarly Scheduler m a
scheduler
Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (JQueue m a -> m [a]
collect JQueue m a
jobsQueue)
{-# INLINEABLE withSchedulerInternal #-}
collectResults :: Applicative f => Maybe (Early a) -> f [a] -> f (Results a)
collectResults :: Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly f [a]
collect =
case Maybe (Early a)
mEarly of
Maybe (Early a)
Nothing -> [a] -> Results a
forall a. [a] -> Results a
Finished ([a] -> Results a) -> f [a] -> f (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> f [a]
collect
Just (Early a
r) -> [a] -> a -> Results a
forall a. [a] -> a -> Results a
FinishedEarly ([a] -> a -> Results a) -> f [a] -> f (a -> Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> f [a]
collect f (a -> Results a) -> f a -> f (Results a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
Just (EarlyWith a
r) -> Results a -> f (Results a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results a -> f (Results a)) -> Results a -> f (Results a)
forall a b. (a -> b) -> a -> b
$ a -> Results a
forall a. a -> Results a
FinishedEarlyWith a
r
{-# INLINEABLE collectResults #-}
spawnWorkers :: forall m a. MonadUnliftIO m => Jobs m a -> Comp -> m [ThreadId]
spawnWorkers :: Jobs m a -> Comp -> m [ThreadId]
spawnWorkers jobs :: Jobs m a
jobs@Jobs {Int
jobsNumWorkers :: Int
jobsNumWorkers :: forall (m :: * -> *) a. Jobs m a -> Int
jobsNumWorkers} =
\case
Comp
Par -> MonadUnliftIO m =>
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
forkOnWithUnmask [Int
1 .. Int
jobsNumWorkers]
ParOn [Int]
ws -> MonadUnliftIO m =>
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
forkOnWithUnmask [Int]
ws
ParN Word16
_ -> MonadUnliftIO m =>
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith (\Int
_ -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
forkIOWithUnmask) [Int
1 .. Int
jobsNumWorkers]
Comp
Seq -> MonadUnliftIO m =>
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith (\Int
_ -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
forkIOWithUnmask) [Int
1 :: Int]
where
spawnWorkersWith ::
MonadUnliftIO m
=> (Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int]
-> m [ThreadId]
spawnWorkersWith :: (Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
fork [Int]
ws =
((forall a. m a -> IO a) -> IO [ThreadId]) -> m [ThreadId]
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO [ThreadId]) -> m [ThreadId])
-> ((forall a. m a -> IO a) -> IO [ThreadId]) -> m [ThreadId]
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run ->
[(WorkerId, Int)]
-> ((WorkerId, Int) -> IO ThreadId) -> IO [ThreadId]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM ([WorkerId] -> [Int] -> [(WorkerId, Int)]
forall a b. [a] -> [b] -> [(a, b)]
zip [WorkerId
0 ..] [Int]
ws) (((WorkerId, Int) -> IO ThreadId) -> IO [ThreadId])
-> ((WorkerId, Int) -> IO ThreadId) -> IO [ThreadId]
forall a b. (a -> b) -> a -> b
$ \(WorkerId
wId, Int
on) ->
Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
fork Int
on (((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall c. IO c -> IO c
unmask -> (forall a. m a -> IO a)
-> (forall c. IO c -> IO c) -> WorkerId -> Jobs m a -> IO ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
(forall b. m b -> IO b)
-> (forall c. IO c -> IO c) -> WorkerId -> Jobs m a -> IO ()
runWorker forall a. m a -> IO a
run forall c. IO c -> IO c
unmask WorkerId
wId Jobs m a
jobs
{-# INLINEABLE spawnWorkers #-}
terminateWorkers :: [ThreadId] -> IO ()
terminateWorkers :: [ThreadId] -> IO ()
terminateWorkers = (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (f :: * -> *) (t :: * -> *) a.
(Applicative f, Foldable t) =>
(a -> f ()) -> t a -> f ()
traverse_ (ThreadId -> SomeAsyncException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` WorkerTerminateException -> SomeAsyncException
forall e. Exception e => e -> SomeAsyncException
SomeAsyncException WorkerTerminateException
WorkerTerminateException)
resultsToList :: Results a -> [a]
resultsToList :: Results a -> [a]
resultsToList = \case
Finished [a]
rs -> [a]
rs
FinishedEarly [a]
rs a
r -> a
ra -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
rs
FinishedEarlyWith a
r -> [a
r]
{-# INLINEABLE resultsToList #-}
reverseResults :: Results a -> Results a
reverseResults :: Results a -> Results a
reverseResults = \case
Finished [a]
rs -> [a] -> Results a
forall a. [a] -> Results a
Finished ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
rs)
FinishedEarly [a]
rs a
r -> [a] -> a -> Results a
forall a. [a] -> a -> Results a
FinishedEarly ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
rs) a
r
Results a
res -> Results a
res
{-# INLINEABLE reverseResults #-}
isSyncException :: Exception e => e -> Bool
isSyncException :: e -> Bool
isSyncException e
exc =
case SomeException -> Maybe SomeAsyncException
forall e. Exception e => SomeException -> Maybe e
fromException (e -> SomeException
forall e. Exception e => e -> SomeException
toException e
exc) of
Just (SomeAsyncException e
_) -> Bool
False
Maybe SomeAsyncException
Nothing -> Bool
True
safeBracketOnError :: MonadUnliftIO m => m a -> (a -> m b) -> (a -> m c) -> m c
safeBracketOnError :: m a -> (a -> m b) -> (a -> m c) -> m c
safeBracketOnError m a
before a -> m b
after a -> m c
thing = ((forall a. m a -> IO a) -> IO c) -> m c
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO c) -> m c)
-> ((forall a. m a -> IO a) -> IO c) -> m c
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> ((forall c. IO c -> IO c) -> IO c) -> IO c
forall b. ((forall c. IO c -> IO c) -> IO b) -> IO b
mask (((forall c. IO c -> IO c) -> IO c) -> IO c)
-> ((forall c. IO c -> IO c) -> IO c) -> IO c
forall a b. (a -> b) -> a -> b
$ \forall c. IO c -> IO c
restore -> do
a
x <- m a -> IO a
forall a. m a -> IO a
run m a
before
Either SomeException c
res1 <- IO c -> IO (Either SomeException c)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO c -> IO (Either SomeException c))
-> IO c -> IO (Either SomeException c)
forall a b. (a -> b) -> a -> b
$ IO c -> IO c
forall c. IO c -> IO c
restore (IO c -> IO c) -> IO c -> IO c
forall a b. (a -> b) -> a -> b
$ m c -> IO c
forall a. m a -> IO a
run (m c -> IO c) -> m c -> IO c
forall a b. (a -> b) -> a -> b
$ a -> m c
thing a
x
case Either SomeException c
res1 of
Left (SomeException
e1 :: SomeException) -> do
Either SomeException b
_ :: Either SomeException b <-
IO b -> IO (Either SomeException b)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO b -> IO (Either SomeException b))
-> IO b -> IO (Either SomeException b)
forall a b. (a -> b) -> a -> b
$ IO b -> IO b
forall c. IO c -> IO c
uninterruptibleMask_ (IO b -> IO b) -> IO b -> IO b
forall a b. (a -> b) -> a -> b
$ m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> m b -> IO b
forall a b. (a -> b) -> a -> b
$ a -> m b
after a
x
SomeException -> IO c
forall e a. Exception e => e -> IO a
throwIO SomeException
e1
Right c
y -> c -> IO c
forall (m :: * -> *) a. Monad m => a -> m a
return c
y