{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE Unsafe #-}
{-# OPTIONS_HADDOCK hide, not-home #-}
-- |
-- Module      : Control.Scheduler.Internal
-- Copyright   : (c) Alexey Kuleshevich 2018-2020
-- License     : BSD3
-- Maintainer  : Alexey Kuleshevich <lehins@yandex.ru>
-- Stability   : experimental
-- Portability : non-portable
--
module Control.Scheduler.Internal
  ( withSchedulerInternal
  , initWorkerStates
  , withSchedulerWSInternal
  , trivialScheduler_
  , withTrivialSchedulerR
  , withTrivialSchedulerRIO
  , initScheduler
  , spawnWorkers
  , terminateWorkers
  , scheduleJobs
  , scheduleJobs_
  , scheduleJobsWith
  , reverseResults
  , resultsToList
  , traverse_
  , safeBracketOnError
  ) where

import Data.Coerce
import Control.Concurrent
import Control.Exception
import Control.Monad
import Control.Monad.IO.Unlift
import Control.Scheduler.Computation
import Control.Scheduler.Types
import Control.Scheduler.Queue
import Data.Atomics (atomicModifyIORefCAS, atomicModifyIORefCAS_)
import qualified Data.Foldable as F (foldl')
import Data.IORef
import Data.Primitive.SmallArray
import Data.Primitive.MutVar
import Data.Primitive.PVar



-- | Initialize a separate state for each worker.
--
-- @since 1.4.0
initWorkerStates :: MonadIO m => Comp -> (WorkerId -> m s) -> m (WorkerStates s)
initWorkerStates :: Comp -> (WorkerId -> m s) -> m (WorkerStates s)
initWorkerStates Comp
comp WorkerId -> m s
initState = do
  Int
nWorkers <- Comp -> m Int
forall (m :: * -> *). MonadIO m => Comp -> m Int
getCompWorkers Comp
comp
  SmallMutableArray RealWorld s
arr <- IO (SmallMutableArray RealWorld s)
-> m (SmallMutableArray RealWorld s)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SmallMutableArray RealWorld s)
 -> m (SmallMutableArray RealWorld s))
-> IO (SmallMutableArray RealWorld s)
-> m (SmallMutableArray RealWorld s)
forall a b. (a -> b) -> a -> b
$ Int -> s -> IO (SmallMutableArray (PrimState IO) s)
forall (m :: * -> *) a.
PrimMonad m =>
Int -> a -> m (SmallMutableArray (PrimState m) a)
newSmallArray Int
nWorkers ([Char] -> s
forall a. HasCallStack => [Char] -> a
error [Char]
"Uninitialized")
  let go :: Int -> m ()
go Int
i =
        Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
i Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
nWorkers) (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
          s
state <- WorkerId -> m s
initState (Int -> WorkerId
WorkerId Int
i)
          IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ SmallMutableArray (PrimState IO) s -> Int -> s -> IO ()
forall (m :: * -> *) a.
PrimMonad m =>
SmallMutableArray (PrimState m) a -> Int -> a -> m ()
writeSmallArray SmallMutableArray RealWorld s
SmallMutableArray (PrimState IO) s
arr Int
i s
state
          Int -> m ()
go (Int
i Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
  Int -> m ()
go Int
0
  SmallArray s
workerStates <- IO (SmallArray s) -> m (SmallArray s)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (SmallArray s) -> m (SmallArray s))
-> IO (SmallArray s) -> m (SmallArray s)
forall a b. (a -> b) -> a -> b
$ SmallMutableArray (PrimState IO) s -> IO (SmallArray s)
forall (m :: * -> *) a.
PrimMonad m =>
SmallMutableArray (PrimState m) a -> m (SmallArray a)
unsafeFreezeSmallArray SmallMutableArray RealWorld s
SmallMutableArray (PrimState IO) s
arr
  IORef Bool
mutex <- IO (IORef Bool) -> m (IORef Bool)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef Bool) -> m (IORef Bool))
-> IO (IORef Bool) -> m (IORef Bool)
forall a b. (a -> b) -> a -> b
$ Bool -> IO (IORef Bool)
forall a. a -> IO (IORef a)
newIORef Bool
False
  WorkerStates s -> m (WorkerStates s)
forall (f :: * -> *) a. Applicative f => a -> f a
pure
    WorkerStates :: forall s. Comp -> SmallArray s -> IORef Bool -> WorkerStates s
WorkerStates
      {_workerStatesComp :: Comp
_workerStatesComp = Comp
comp, _workerStatesArray :: SmallArray s
_workerStatesArray = SmallArray s
workerStates, _workerStatesMutex :: IORef Bool
_workerStatesMutex = IORef Bool
mutex}

withSchedulerWSInternal ::
     MonadUnliftIO m
  => (Comp -> (Scheduler m a -> t) -> m b)
  -> WorkerStates s
  -> (SchedulerWS s m a -> t)
  -> m b
withSchedulerWSInternal :: (Comp -> (Scheduler m a -> t) -> m b)
-> WorkerStates s -> (SchedulerWS s m a -> t) -> m b
withSchedulerWSInternal Comp -> (Scheduler m a -> t) -> m b
withScheduler' WorkerStates s
states SchedulerWS s m a -> t
action =
  ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO Bool -> (Bool -> IO ()) -> (Bool -> IO b) -> IO b
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket IO Bool
lockState Bool -> IO ()
unlockState (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (Bool -> m b) -> Bool -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Bool -> m b
runSchedulerWS)
  where
    mutex :: IORef Bool
mutex = WorkerStates s -> IORef Bool
forall s. WorkerStates s -> IORef Bool
_workerStatesMutex WorkerStates s
states
    lockState :: IO Bool
lockState = IORef Bool -> (Bool -> (Bool, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef Bool
mutex ((Bool -> (Bool, Bool)) -> IO Bool)
-> (Bool -> (Bool, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ (,) Bool
True
    unlockState :: Bool -> IO ()
unlockState Bool
wasLocked
      | Bool
wasLocked = () -> IO ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      | Bool
otherwise = IORef Bool -> Bool -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef Bool
mutex Bool
False
    runSchedulerWS :: Bool -> m b
runSchedulerWS Bool
isLocked
      | Bool
isLocked = IO b -> m b
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO b -> m b) -> IO b -> m b
forall a b. (a -> b) -> a -> b
$ MutexException -> IO b
forall e a. Exception e => e -> IO a
throwIO MutexException
MutexException
      | Bool
otherwise =
        Comp -> (Scheduler m a -> t) -> m b
withScheduler' (WorkerStates s -> Comp
forall s. WorkerStates s -> Comp
_workerStatesComp WorkerStates s
states) ((Scheduler m a -> t) -> m b) -> (Scheduler m a -> t) -> m b
forall a b. (a -> b) -> a -> b
$ \Scheduler m a
scheduler ->
          SchedulerWS s m a -> t
action (WorkerStates s -> Scheduler m a -> SchedulerWS s m a
forall s (m :: * -> *) a.
WorkerStates s -> Scheduler m a -> SchedulerWS s m a
SchedulerWS WorkerStates s
states Scheduler m a
scheduler)


-- | The most basic scheduler that simply runs the task instead of scheduling it. Early termination
-- requests are bluntly ignored.
--
-- @since 1.1.0
trivialScheduler_ :: Applicative f => Scheduler f ()
trivialScheduler_ :: Scheduler f ()
trivialScheduler_ =
  Scheduler :: forall (m :: * -> *) a.
Int
-> ((WorkerId -> m a) -> m ())
-> (Early a -> m a)
-> m (Results a)
-> m (Maybe (Results a))
-> m BatchId
-> (BatchId -> Early a -> m Bool)
-> m (Maybe (Early a))
-> Scheduler m a
Scheduler
    { _numWorkers :: Int
_numWorkers = Int
1
    , _scheduleWorkId :: (WorkerId -> f ()) -> f ()
_scheduleWorkId = \WorkerId -> f ()
f -> WorkerId -> f ()
f (Int -> WorkerId
WorkerId Int
0)
    , _terminate :: Early () -> f ()
_terminate = f () -> Early () -> f ()
forall a b. a -> b -> a
const (f () -> Early () -> f ()) -> f () -> Early () -> f ()
forall a b. (a -> b) -> a -> b
$ () -> f ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    , _waitForCurrentBatch :: f (Results ())
_waitForCurrentBatch = Results () -> f (Results ())
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results () -> f (Results ())) -> Results () -> f (Results ())
forall a b. (a -> b) -> a -> b
$ [()] -> Results ()
forall a. [a] -> Results a
Finished []
    , _earlyResults :: f (Maybe (Results ()))
_earlyResults = Maybe (Results ()) -> f (Maybe (Results ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Results ())
forall a. Maybe a
Nothing
    , _currentBatchId :: f BatchId
_currentBatchId = BatchId -> f BatchId
forall (f :: * -> *) a. Applicative f => a -> f a
pure (BatchId -> f BatchId) -> BatchId -> f BatchId
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
    , _cancelBatch :: BatchId -> Early () -> f Bool
_cancelBatch = \BatchId
_ Early ()
_ -> Bool -> f Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
False
    , _batchEarly :: f (Maybe (Early ()))
_batchEarly = Maybe (Early ()) -> f (Maybe (Early ()))
forall (f :: * -> *) a. Applicative f => a -> f a
pure Maybe (Early ())
forall a. Maybe a
Nothing
    }


-- | This trivial scheduler will behave in a similar way as
-- `Control.Scheduler.withSchedulerR` with `Seq` computation strategy, except it is
-- restricted to `PrimMonad`, instead of `MonadUnliftIO` and the work isn't scheduled, but
-- rather computed immediately.
--
-- @since 1.4.2
withTrivialSchedulerR :: PrimMonad m => (Scheduler m a -> m b) -> m (Results a)
withTrivialSchedulerR :: (Scheduler m a -> m b) -> m (Results a)
withTrivialSchedulerR Scheduler m a -> m b
action = do
  MutVar (PrimState m) [a]
resVar <- [a] -> m (MutVar (PrimState m) [a])
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar []
  MutVar (PrimState m) BatchId
batchVar <- BatchId -> m (MutVar (PrimState m) BatchId)
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar (BatchId -> m (MutVar (PrimState m) BatchId))
-> BatchId -> m (MutVar (PrimState m) BatchId)
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
  MutVar (PrimState m) (Maybe (Results a))
finResVar <- Maybe (Results a) -> m (MutVar (PrimState m) (Maybe (Results a)))
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar Maybe (Results a)
forall a. Maybe a
Nothing
  MutVar (PrimState m) (Maybe (Early a))
batchEarlyVar <- Maybe (Early a) -> m (MutVar (PrimState m) (Maybe (Early a)))
forall (m :: * -> *) a.
PrimMonad m =>
a -> m (MutVar (PrimState m) a)
newMutVar Maybe (Early a)
forall a. Maybe a
Nothing
  let bumpCurrentBatchId :: m ()
bumpCurrentBatchId = MutVar (PrimState m) BatchId -> (BatchId -> (BatchId, ())) -> m ()
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar (PrimState m) BatchId
batchVar (\(BatchId Int
x) -> (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), ()))
      bumpBatchId :: BatchId -> m Bool
bumpBatchId (BatchId Int
c) =
        MutVar (PrimState m) BatchId
-> (BatchId -> (BatchId, Bool)) -> m Bool
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar (PrimState m) BatchId
batchVar ((BatchId -> (BatchId, Bool)) -> m Bool)
-> (BatchId -> (BatchId, Bool)) -> m Bool
forall a b. (a -> b) -> a -> b
$ \b :: BatchId
b@(BatchId Int
x) ->
          if Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
c
            then (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Bool
True)
            else (BatchId
b, Bool
False)
      takeBatchEarly :: m (Maybe (Early a))
takeBatchEarly = MutVar (PrimState m) (Maybe (Early a))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> m (Maybe (Early a))
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar (PrimState m) (Maybe (Early a))
batchEarlyVar ((Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
 -> m (Maybe (Early a)))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> m (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ \Maybe (Early a)
mEarly -> (Maybe (Early a)
forall a. Maybe a
Nothing, Maybe (Early a)
mEarly)
      takeResults :: m [a]
takeResults = MutVar (PrimState m) [a] -> ([a] -> ([a], [a])) -> m [a]
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar (PrimState m) [a]
resVar (([a] -> ([a], [a])) -> m [a]) -> ([a] -> ([a], [a])) -> m [a]
forall a b. (a -> b) -> a -> b
$ \[a]
res -> ([], [a]
res)
  b
_ <-
    Scheduler m a -> m b
action (Scheduler m a -> m b) -> Scheduler m a -> m b
forall a b. (a -> b) -> a -> b
$
    Scheduler :: forall (m :: * -> *) a.
Int
-> ((WorkerId -> m a) -> m ())
-> (Early a -> m a)
-> m (Results a)
-> m (Maybe (Results a))
-> m BatchId
-> (BatchId -> Early a -> m Bool)
-> m (Maybe (Early a))
-> Scheduler m a
Scheduler
      { _numWorkers :: Int
_numWorkers = Int
1
      , _scheduleWorkId :: (WorkerId -> m a) -> m ()
_scheduleWorkId =
          \WorkerId -> m a
f -> do
            a
r <- WorkerId -> m a
f (Int -> WorkerId
WorkerId Int
0)
            a
r a -> m () -> m ()
`seq` MutVar (PrimState m) [a] -> ([a] -> ([a], ())) -> m ()
forall (m :: * -> *) a b.
PrimMonad m =>
MutVar (PrimState m) a -> (a -> (a, b)) -> m b
atomicModifyMutVar' MutVar (PrimState m) [a]
resVar (\[a]
rs -> (a
r a -> [a] -> [a]
forall a. a -> [a] -> [a]
: [a]
rs, ()))
      , _terminate :: Early a -> m a
_terminate =
          \Early a
early -> do
            m ()
bumpCurrentBatchId
            Results a
finishEarly <- Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early) m [a]
takeResults
            Early a -> a
forall a. Early a -> a
unEarly Early a
early a -> m () -> m a
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ MutVar (PrimState m) (Maybe (Results a))
-> Maybe (Results a) -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (Maybe (Results a))
finResVar (Results a -> Maybe (Results a)
forall a. a -> Maybe a
Just Results a
finishEarly)
      , _waitForCurrentBatch :: m (Results a)
_waitForCurrentBatch =
          do Maybe (Early a)
mEarly <- m (Maybe (Early a))
takeBatchEarly
             m ()
bumpCurrentBatchId
             Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (m [a] -> m (Results a)) -> ([a] -> m [a]) -> [a] -> m (Results a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> m [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> m (Results a)) -> m [a] -> m (Results a)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< m [a]
takeResults
      , _earlyResults :: m (Maybe (Results a))
_earlyResults = MutVar (PrimState m) (Maybe (Results a)) -> m (Maybe (Results a))
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (Maybe (Results a))
finResVar
      , _currentBatchId :: m BatchId
_currentBatchId = MutVar (PrimState m) BatchId -> m BatchId
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) BatchId
batchVar
      , _batchEarly :: m (Maybe (Early a))
_batchEarly = m (Maybe (Early a))
takeBatchEarly
      , _cancelBatch :: BatchId -> Early a -> m Bool
_cancelBatch =
          \BatchId
batchId Early a
early -> do
            Bool
b <- BatchId -> m Bool
bumpBatchId BatchId
batchId
            Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ MutVar (PrimState m) (Maybe (Early a)) -> Maybe (Early a) -> m ()
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> a -> m ()
writeMutVar MutVar (PrimState m) (Maybe (Early a))
batchEarlyVar (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early)
            Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
b
      }
  MutVar (PrimState m) (Maybe (Results a)) -> m (Maybe (Results a))
forall (m :: * -> *) a.
PrimMonad m =>
MutVar (PrimState m) a -> m a
readMutVar MutVar (PrimState m) (Maybe (Results a))
finResVar m (Maybe (Results a))
-> (Maybe (Results a) -> m (Results a)) -> m (Results a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just Results a
rs -> Results a -> m (Results a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results a -> m (Results a)) -> Results a -> m (Results a)
forall a b. (a -> b) -> a -> b
$ Results a -> Results a
forall a. Results a -> Results a
reverseResults Results a
rs
    Maybe (Results a)
Nothing -> do
      Maybe (Early a)
mEarly <- m (Maybe (Early a))
takeBatchEarly
      Results a -> Results a
forall a. Results a -> Results a
reverseResults (Results a -> Results a) -> m (Results a) -> m (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly m [a]
takeResults



-- | Same as `Control.Scheduler.withTrivialScheduler`, but works in `MonadUnliftIO` and
-- returns results in an original LIFO order.
--
-- @since 1.4.2
withTrivialSchedulerRIO :: MonadUnliftIO m => (Scheduler m a -> m b) -> m (Results a)
withTrivialSchedulerRIO :: (Scheduler m a -> m b) -> m (Results a)
withTrivialSchedulerRIO Scheduler m a -> m b
action = do
  IORef [a]
resRef <- IO (IORef [a]) -> m (IORef [a])
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef [a]) -> m (IORef [a]))
-> IO (IORef [a]) -> m (IORef [a])
forall a b. (a -> b) -> a -> b
$ [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef []
  IORef BatchId
batchRef <- IO (IORef BatchId) -> m (IORef BatchId)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef BatchId) -> m (IORef BatchId))
-> IO (IORef BatchId) -> m (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ BatchId -> IO (IORef BatchId)
forall a. a -> IO (IORef a)
newIORef (BatchId -> IO (IORef BatchId)) -> BatchId -> IO (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
  IORef (Maybe (Results a))
finResRef <- IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a))))
-> IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Results a) -> IO (IORef (Maybe (Results a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Results a)
forall a. Maybe a
Nothing
  IORef (Maybe (Early a))
batchEarlyRef <- IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a))))
-> IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Early a) -> IO (IORef (Maybe (Early a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Early a)
forall a. Maybe a
Nothing
  let bumpCurrentBatchId :: IO ()
bumpCurrentBatchId = IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (IORef BatchId -> IORef Int
coerce IORef BatchId
batchRef) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
1 :: Int))
      bumpBatchId :: BatchId -> IO Bool
bumpBatchId (BatchId Int
c) =
        IORef BatchId -> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef BatchId
batchRef ((BatchId -> (BatchId, Bool)) -> IO Bool)
-> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \b :: BatchId
b@(BatchId Int
x) ->
          if Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
c
            then (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Bool
True)
            else (BatchId
b, Bool
False)
      takeBatchEarly :: IO (Maybe (Early a))
takeBatchEarly = IORef (Maybe (Early a))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Maybe (Early a))
batchEarlyRef ((Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
 -> IO (Maybe (Early a)))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ \Maybe (Early a)
mEarly -> (Maybe (Early a)
forall a. Maybe a
Nothing, Maybe (Early a)
mEarly)
      takeResults :: IO [a]
takeResults = IORef [a] -> ([a] -> ([a], [a])) -> IO [a]
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef [a]
resRef (([a] -> ([a], [a])) -> IO [a]) -> ([a] -> ([a], [a])) -> IO [a]
forall a b. (a -> b) -> a -> b
$ \[a]
res -> ([], [a]
res)
      scheduler :: Scheduler m a
scheduler =
        Scheduler :: forall (m :: * -> *) a.
Int
-> ((WorkerId -> m a) -> m ())
-> (Early a -> m a)
-> m (Results a)
-> m (Maybe (Results a))
-> m BatchId
-> (BatchId -> Early a -> m Bool)
-> m (Maybe (Early a))
-> Scheduler m a
Scheduler
          { _numWorkers :: Int
_numWorkers = Int
1
          , _scheduleWorkId :: (WorkerId -> m a) -> m ()
_scheduleWorkId =
              \WorkerId -> m a
f -> do
                a
r <- WorkerId -> m a
f (Int -> WorkerId
WorkerId Int
0)
                a
r a -> m () -> m ()
`seq` IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef [a] -> ([a] -> [a]) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ IORef [a]
resRef (a
r a -> [a] -> [a]
forall a. a -> [a] -> [a]
:))
          , _terminate :: Early a -> m a
_terminate =
              \ !Early a
early ->
                IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ do
                  IO ()
bumpCurrentBatchId
                  Results a
finishEarly <- Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early) IO [a]
takeResults
                  IORef (Maybe (Results a)) -> Maybe (Results a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Results a))
finResRef (Results a -> Maybe (Results a)
forall a. a -> Maybe a
Just Results a
finishEarly)
                  TerminateEarlyException -> IO a
forall e a. Exception e => e -> IO a
throwIO TerminateEarlyException
TerminateEarlyException
          , _waitForCurrentBatch :: m (Results a)
_waitForCurrentBatch =
              IO (Results a) -> m (Results a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Results a) -> m (Results a))
-> IO (Results a) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ do
                IO ()
bumpCurrentBatchId
                Maybe (Early a)
mEarly <- IO (Maybe (Early a))
takeBatchEarly
                Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (IO [a] -> IO (Results a))
-> ([a] -> IO [a]) -> [a] -> IO (Results a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> IO [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> IO (Results a)) -> IO [a] -> IO (Results a)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< IO [a]
takeResults
          , _earlyResults :: m (Maybe (Results a))
_earlyResults = IO (Maybe (Results a)) -> m (Maybe (Results a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Maybe (Results a)) -> IO (Maybe (Results a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Results a))
finResRef)
          , _currentBatchId :: m BatchId
_currentBatchId = IO BatchId -> m BatchId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef BatchId -> IO BatchId
forall a. IORef a -> IO a
readIORef IORef BatchId
batchRef)
          , _batchEarly :: m (Maybe (Early a))
_batchEarly = IO (Maybe (Early a)) -> m (Maybe (Early a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (Maybe (Early a))
takeBatchEarly
          , _cancelBatch :: BatchId -> Early a -> m Bool
_cancelBatch =
              \BatchId
batchId Early a
early -> IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ do
                Bool
b <- BatchId -> IO Bool
bumpBatchId BatchId
batchId
                Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
b (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IORef (Maybe (Early a)) -> Maybe (Early a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Early a))
batchEarlyRef (Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early)
                Bool -> IO Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
b
          }
  Either TerminateEarlyException b
_ :: Either TerminateEarlyException b <- ((forall a. m a -> IO a) -> IO (Either TerminateEarlyException b))
-> m (Either TerminateEarlyException b)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Either TerminateEarlyException b))
 -> m (Either TerminateEarlyException b))
-> ((forall a. m a -> IO a)
    -> IO (Either TerminateEarlyException b))
-> m (Either TerminateEarlyException b)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO b -> IO (Either TerminateEarlyException b)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO b -> IO (Either TerminateEarlyException b))
-> IO b -> IO (Either TerminateEarlyException b)
forall a b. (a -> b) -> a -> b
$ m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> m b -> IO b
forall a b. (a -> b) -> a -> b
$ Scheduler m a -> m b
action Scheduler m a
scheduler
  IO (Maybe (Results a)) -> m (Maybe (Results a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Maybe (Results a)) -> IO (Maybe (Results a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Results a))
finResRef) m (Maybe (Results a))
-> (Maybe (Results a) -> m (Results a)) -> m (Results a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
    Just Results a
rs -> Results a -> m (Results a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Results a
rs
    Maybe (Results a)
Nothing ->
      IO (Results a) -> m (Results a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Results a) -> m (Results a))
-> IO (Results a) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ do
        Maybe (Early a)
mEarly <- IO (Maybe (Early a))
takeBatchEarly
        Maybe (Early a) -> IO [a] -> IO (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly IO [a]
takeResults
{-# INLINEABLE withTrivialSchedulerRIO #-}


-- | This is generally a faster way to traverse while ignoring the result rather than using `mapM_`.
--
-- @since 1.0.0
traverse_ :: (Applicative f, Foldable t) => (a -> f ()) -> t a -> f ()
traverse_ :: (a -> f ()) -> t a -> f ()
traverse_ a -> f ()
f = (f () -> a -> f ()) -> f () -> t a -> f ()
forall (t :: * -> *) b a.
Foldable t =>
(b -> a -> b) -> b -> t a -> b
F.foldl' (\f ()
c a
a -> f ()
c f () -> f () -> f ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> a -> f ()
f a
a) (() -> f ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ())
{-# INLINE traverse_ #-}

scheduleJobs :: MonadIO m => Jobs m a -> (WorkerId -> m a) -> m ()
scheduleJobs :: Jobs m a -> (WorkerId -> m a) -> m ()
scheduleJobs = (((a -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m a) -> m ()
forall (m :: * -> *) b a.
MonadIO m =>
(((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobsWith ((a -> m ()) -> WorkerId -> m ()) -> m (Job m a)
forall (m :: * -> *) a.
MonadIO m =>
((a -> m ()) -> WorkerId -> m ()) -> m (Job m a)
mkJob
{-# INLINEABLE scheduleJobs #-}

-- | Ignores the result of computation, thus avoiding some overhead.
scheduleJobs_ :: MonadIO m => Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ :: Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ = (((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
forall (m :: * -> *) b a.
MonadIO m =>
(((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobsWith (\(b -> m ()) -> WorkerId -> m ()
job -> Job m a -> m (Job m a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((WorkerId -> m ()) -> Job m a
forall (m :: * -> *) a. (WorkerId -> m ()) -> Job m a
Job_ (m () -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m () -> m ()) -> (WorkerId -> m ()) -> WorkerId -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (b -> m ()) -> WorkerId -> m ()
job (\b
_ -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()))))
{-# INLINEABLE scheduleJobs_ #-}

scheduleJobsWith ::
     MonadIO m
  => (((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
  -> Jobs m a
  -> (WorkerId -> m b)
  -> m ()
scheduleJobsWith :: (((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobsWith ((b -> m ()) -> WorkerId -> m ()) -> m (Job m a)
mkJob' Jobs {Int
MVar SchedulerStatus
PVar Int RealWorld
JQueue m a
jobsSchedulerStatus :: forall (m :: * -> *) a. Jobs m a -> MVar SchedulerStatus
jobsQueueCount :: forall (m :: * -> *) a. Jobs m a -> PVar Int RealWorld
jobsQueue :: forall (m :: * -> *) a. Jobs m a -> JQueue m a
jobsNumWorkers :: forall (m :: * -> *) a. Jobs m a -> Int
jobsSchedulerStatus :: MVar SchedulerStatus
jobsQueueCount :: PVar Int RealWorld
jobsQueue :: JQueue m a
jobsNumWorkers :: Int
..} WorkerId -> m b
action = do
  Job m a
job <-
    ((b -> m ()) -> WorkerId -> m ()) -> m (Job m a)
mkJob' (((b -> m ()) -> WorkerId -> m ()) -> m (Job m a))
-> ((b -> m ()) -> WorkerId -> m ()) -> m (Job m a)
forall a b. (a -> b) -> a -> b
$ \b -> m ()
storeResult WorkerId
wid -> do
      b
res <- WorkerId -> m b
action WorkerId
wid
      b
res b -> m () -> m ()
`seq` b -> m ()
storeResult b
res
  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicAddIntPVar PVar Int RealWorld
jobsQueueCount Int
1
  JQueue m a -> Job m a -> m ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> Job m a -> m ()
pushJQueue JQueue m a
jobsQueue Job m a
job
{-# INLINEABLE scheduleJobsWith #-}


-- | Runs the worker until it is terminated with a `WorkerTerminateException` or is killed
-- by some other asynchronous exception, which will propagate to the user calling thread.
runWorker ::
     MonadUnliftIO m
  => (forall b. m b -> IO b)
  -> (forall c. IO c -> IO c)
  -> WorkerId
  -> Jobs m a
  -> IO ()
runWorker :: (forall b. m b -> IO b)
-> (forall c. IO c -> IO c) -> WorkerId -> Jobs m a -> IO ()
runWorker forall b. m b -> IO b
run forall c. IO c -> IO c
unmask WorkerId
wId Jobs {JQueue m a
jobsQueue :: JQueue m a
jobsQueue :: forall (m :: * -> *) a. Jobs m a -> JQueue m a
jobsQueue, PVar Int RealWorld
jobsQueueCount :: PVar Int RealWorld
jobsQueueCount :: forall (m :: * -> *) a. Jobs m a -> PVar Int RealWorld
jobsQueueCount, MVar SchedulerStatus
jobsSchedulerStatus :: MVar SchedulerStatus
jobsSchedulerStatus :: forall (m :: * -> *) a. Jobs m a -> MVar SchedulerStatus
jobsSchedulerStatus} = IO ()
go
  where
    onBlockedMVar :: Either SomeException () -> IO ()
onBlockedMVar Either SomeException ()
eUnblocked =
      case Either SomeException ()
eUnblocked of
        Right () -> IO ()
go
        Left SomeException
uExc
          | Just WorkerTerminateException
WorkerTerminateException <- SomeException -> Maybe WorkerTerminateException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
uExc -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Left SomeException
uExc
          | Just CancelBatchException
CancelBatchException <- SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
uExc -> IO ()
go
        Left SomeException
uExc -> SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
uExc
    go :: IO ()
go = do
      Either SomeException Int
eRes <- IO Int -> IO (Either SomeException Int)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO Int -> IO (Either SomeException Int))
-> IO Int -> IO (Either SomeException Int)
forall a b. (a -> b) -> a -> b
$ do
        WorkerId -> m ()
job <- m (WorkerId -> m ()) -> IO (WorkerId -> m ())
forall b. m b -> IO b
run (JQueue m a -> m (WorkerId -> m ())
forall (m :: * -> *) a.
MonadUnliftIO m =>
JQueue m a -> m (WorkerId -> m ())
popJQueue JQueue m a
jobsQueue)
        IO Int -> IO Int
forall c. IO c -> IO c
unmask (m () -> IO ()
forall b. m b -> IO b
run (WorkerId -> m ()
job WorkerId
wId) IO () -> IO Int -> IO Int
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicSubIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
      -- \ popJQueue can block, but it is still interruptable
      case Either SomeException Int
eRes of
        Right Int
1 -> IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (MVar SchedulerStatus -> SchedulerStatus -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar SchedulerStatus
jobsSchedulerStatus SchedulerStatus
SchedulerIdle) IO (Either SomeException ())
-> (Either SomeException () -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Either SomeException () -> IO ()
onBlockedMVar
        Right Int
_ -> IO ()
go
        Left SomeException
exc
          | Just WorkerTerminateException
WorkerTerminateException <- SomeException -> Maybe WorkerTerminateException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
exc -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Left SomeException
exc
          | Just CancelBatchException
CancelBatchException <- SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
asyncExceptionFromException SomeException
exc -> IO ()
go
        Left SomeException
exc -> do
          Either SomeException ()
eUnblocked <-
            IO () -> IO (Either SomeException ())
forall e a. Exception e => IO a -> IO (Either e a)
try (IO () -> IO (Either SomeException ()))
-> IO () -> IO (Either SomeException ())
forall a b. (a -> b) -> a -> b
$ MVar SchedulerStatus -> SchedulerStatus -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar SchedulerStatus
jobsSchedulerStatus (WorkerException -> SchedulerStatus
SchedulerWorkerException (SomeException -> WorkerException
WorkerException SomeException
exc))
          -- \ without blocking with putMVar here we would not be able to report an
          -- exception in the main thread, especially if `exc` is asynchronous.
          Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (SomeException -> Bool
forall e. Exception e => e -> Bool
isSyncException SomeException
exc) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ SomeException -> IO ()
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
          Either SomeException () -> IO ()
onBlockedMVar Either SomeException ()
eUnblocked
{-# INLINEABLE runWorker #-}


initScheduler ::
     MonadIO m
  => Comp
  -> (Jobs m a -> (WorkerId -> m a) -> m ())
  -> (JQueue m a -> m [a])
  -> m (Jobs m a, [ThreadId] -> Scheduler m a)
initScheduler :: Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> m (Jobs m a, [ThreadId] -> Scheduler m a)
initScheduler Comp
comp Jobs m a -> (WorkerId -> m a) -> m ()
submitWork JQueue m a -> m [a]
collect = do
  Int
jobsNumWorkers <- Comp -> m Int
forall (m :: * -> *). MonadIO m => Comp -> m Int
getCompWorkers Comp
comp
  JQueue m a
jobsQueue <- m (JQueue m a)
forall (m :: * -> *) a. MonadIO m => m (JQueue m a)
newJQueue
  PVar Int RealWorld
jobsQueueCount <- IO (PVar Int RealWorld) -> m (PVar Int RealWorld)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (PVar Int RealWorld) -> m (PVar Int RealWorld))
-> IO (PVar Int RealWorld) -> m (PVar Int RealWorld)
forall a b. (a -> b) -> a -> b
$ Int -> IO (PVar Int RealWorld)
forall s (m :: * -> *) a.
(MonadPrim s m, Prim a) =>
a -> m (PVar a s)
newPVar Int
1
  MVar SchedulerStatus
jobsSchedulerStatus <- IO (MVar SchedulerStatus) -> m (MVar SchedulerStatus)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO (MVar SchedulerStatus)
forall a. IO (MVar a)
newEmptyMVar
  IORef (Maybe (Results a))
earlyTerminationResultRef <- IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a))))
-> IO (IORef (Maybe (Results a))) -> m (IORef (Maybe (Results a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Results a) -> IO (IORef (Maybe (Results a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Results a)
forall a. Maybe a
Nothing
  IORef BatchId
batchIdRef <- IO (IORef BatchId) -> m (IORef BatchId)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef BatchId) -> m (IORef BatchId))
-> IO (IORef BatchId) -> m (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ BatchId -> IO (IORef BatchId)
forall a. a -> IO (IORef a)
newIORef (BatchId -> IO (IORef BatchId)) -> BatchId -> IO (IORef BatchId)
forall a b. (a -> b) -> a -> b
$ Int -> BatchId
BatchId Int
0
  IORef (Maybe (Early a))
batchEarlyRef <- IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a)))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a))))
-> IO (IORef (Maybe (Early a))) -> m (IORef (Maybe (Early a)))
forall a b. (a -> b) -> a -> b
$ Maybe (Early a) -> IO (IORef (Maybe (Early a)))
forall a. a -> IO (IORef a)
newIORef Maybe (Early a)
forall a. Maybe a
Nothing
  let jobs :: Jobs m a
jobs =
        Jobs :: forall (m :: * -> *) a.
Int
-> JQueue m a
-> PVar Int RealWorld
-> MVar SchedulerStatus
-> Jobs m a
Jobs
          { jobsNumWorkers :: Int
jobsNumWorkers = Int
jobsNumWorkers
          , jobsQueue :: JQueue m a
jobsQueue = JQueue m a
jobsQueue
          , jobsQueueCount :: PVar Int RealWorld
jobsQueueCount = PVar Int RealWorld
jobsQueueCount
          , jobsSchedulerStatus :: MVar SchedulerStatus
jobsSchedulerStatus = MVar SchedulerStatus
jobsSchedulerStatus
          }
      bumpCurrentBatchId :: IO ()
bumpCurrentBatchId = IORef Int -> (Int -> Int) -> IO ()
forall t. IORef t -> (t -> t) -> IO ()
atomicModifyIORefCAS_ (IORef BatchId -> IORef Int
coerce IORef BatchId
batchIdRef) (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ (Int
1 :: Int))
      bumpBatchId :: BatchId -> IO Bool
bumpBatchId (BatchId Int
c) =
        IORef BatchId -> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef BatchId
batchIdRef ((BatchId -> (BatchId, Bool)) -> IO Bool)
-> (BatchId -> (BatchId, Bool)) -> IO Bool
forall a b. (a -> b) -> a -> b
$ \b :: BatchId
b@(BatchId Int
x) ->
          if Int
x Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
c
            then (Int -> BatchId
BatchId (Int
x Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1), Bool
True)
            else (BatchId
b, Bool
False)
      mkScheduler :: t ThreadId -> Scheduler m a
mkScheduler t ThreadId
tids =
        Scheduler :: forall (m :: * -> *) a.
Int
-> ((WorkerId -> m a) -> m ())
-> (Early a -> m a)
-> m (Results a)
-> m (Maybe (Results a))
-> m BatchId
-> (BatchId -> Early a -> m Bool)
-> m (Maybe (Early a))
-> Scheduler m a
Scheduler
          { _numWorkers :: Int
_numWorkers = Int
jobsNumWorkers
          , _scheduleWorkId :: (WorkerId -> m a) -> m ()
_scheduleWorkId = Jobs m a -> (WorkerId -> m a) -> m ()
submitWork Jobs m a
jobs
          , _terminate :: Early a -> m a
_terminate =
              \Early a
early -> do
                Results a
finishEarly <-
                  case Early a
early of
                    Early a
r -> [a] -> a -> Results a
forall a. [a] -> a -> Results a
FinishedEarly ([a] -> a -> Results a) -> m [a] -> m (a -> Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> JQueue m a -> m [a]
collect JQueue m a
jobsQueue m (a -> Results a) -> m a -> m (Results a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> a -> m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
                    EarlyWith a
r -> Results a -> m (Results a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results a -> m (Results a)) -> Results a -> m (Results a)
forall a b. (a -> b) -> a -> b
$ a -> Results a
forall a. a -> Results a
FinishedEarlyWith a
r
                IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> IO a -> m a
forall a b. (a -> b) -> a -> b
$ do
                  IO ()
bumpCurrentBatchId
                  IORef (Maybe (Results a)) -> Maybe (Results a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Results a))
earlyTerminationResultRef (Maybe (Results a) -> IO ()) -> Maybe (Results a) -> IO ()
forall a b. (a -> b) -> a -> b
$ Results a -> Maybe (Results a)
forall a. a -> Maybe a
Just Results a
finishEarly
                  TerminateEarlyException -> IO a
forall e a. Exception e => e -> IO a
throwIO TerminateEarlyException
TerminateEarlyException
          , _waitForCurrentBatch :: m (Results a)
_waitForCurrentBatch =
              do Jobs m a -> (WorkerId -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ Jobs m a
jobs (\WorkerId
_ -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicSubIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
                 JQueue m a -> m ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
unblockPopJQueue JQueue m a
jobsQueue
                 SchedulerStatus
status <- IO SchedulerStatus -> m SchedulerStatus
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO SchedulerStatus -> m SchedulerStatus)
-> IO SchedulerStatus -> m SchedulerStatus
forall a b. (a -> b) -> a -> b
$ MVar SchedulerStatus -> IO SchedulerStatus
forall a. MVar a -> IO a
takeMVar MVar SchedulerStatus
jobsSchedulerStatus
                 Maybe (Early a)
mEarly <- IO (Maybe (Early a)) -> m (Maybe (Early a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Early a)) -> m (Maybe (Early a)))
-> IO (Maybe (Early a)) -> m (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ IORef (Maybe (Early a))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORefCAS IORef (Maybe (Early a))
batchEarlyRef ((Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
 -> IO (Maybe (Early a)))
-> (Maybe (Early a) -> (Maybe (Early a), Maybe (Early a)))
-> IO (Maybe (Early a))
forall a b. (a -> b) -> a -> b
$ \Maybe (Early a)
mEarly -> (Maybe (Early a)
forall a. Maybe a
Nothing, Maybe (Early a)
mEarly)
                 Results a
rs <-
                   case SchedulerStatus
status of
                     SchedulerWorkerException (WorkerException exc) ->
                       case SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc of
                         Just CancelBatchException
CancelBatchException -> do
                           ()
_ <- JQueue m a -> m ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
clearPendingJQueue JQueue m a
jobsQueue
                           IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$
                             (ThreadId -> IO ()) -> t ThreadId -> IO ()
forall (f :: * -> *) (t :: * -> *) a.
(Applicative f, Foldable t) =>
(a -> f ()) -> t a -> f ()
traverse_ (ThreadId -> SomeAsyncException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` CancelBatchException -> SomeAsyncException
forall e. Exception e => e -> SomeAsyncException
SomeAsyncException CancelBatchException
CancelBatchException) t ThreadId
tids
                           Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (m [a] -> m (Results a)) -> ([a] -> m [a]) -> [a] -> m (Results a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [a] -> m [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure ([a] -> m (Results a)) -> m [a] -> m (Results a)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< JQueue m a -> m [a]
collect JQueue m a
jobsQueue
                         Maybe CancelBatchException
Nothing -> IO (Results a) -> m (Results a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Results a) -> m (Results a))
-> IO (Results a) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ SomeException -> IO (Results a)
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
                     SchedulerStatus
SchedulerIdle -> do
                       JQueue m a -> m ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
blockPopJQueue JQueue m a
jobsQueue
                       IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ()
bumpCurrentBatchId
                       [a]
res <- JQueue m a -> m [a]
collect JQueue m a
jobsQueue
                       [a]
res [a] -> m (Results a) -> m (Results a)
`seq` Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly ([a] -> m [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure [a]
res)
                 Results a
rs Results a -> m () -> m (Results a)
forall (f :: * -> *) a b. Functor f => a -> f b -> f a
<$ IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (PVar Int RealWorld -> Int -> IO ()
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m ()
atomicWriteIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
          , _earlyResults :: m (Maybe (Results a))
_earlyResults = IO (Maybe (Results a)) -> m (Maybe (Results a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Maybe (Results a)) -> IO (Maybe (Results a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Results a))
earlyTerminationResultRef)
          , _currentBatchId :: m BatchId
_currentBatchId = IO BatchId -> m BatchId
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef BatchId -> IO BatchId
forall a. IORef a -> IO a
readIORef IORef BatchId
batchIdRef)
          , _batchEarly :: m (Maybe (Early a))
_batchEarly = IO (Maybe (Early a)) -> m (Maybe (Early a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IORef (Maybe (Early a)) -> IO (Maybe (Early a))
forall a. IORef a -> IO a
readIORef IORef (Maybe (Early a))
batchEarlyRef)
          , _cancelBatch :: BatchId -> Early a -> m Bool
_cancelBatch =
              \BatchId
batchId Early a
early -> do
                Bool
b <- IO Bool -> m Bool
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO Bool -> m Bool) -> IO Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ BatchId -> IO Bool
bumpBatchId BatchId
batchId
                Bool -> m () -> m ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when Bool
b (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                  JQueue m a -> m ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
blockPopJQueue JQueue m a
jobsQueue
                  IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ do
                    IORef (Maybe (Early a)) -> Maybe (Early a) -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef (Maybe (Early a))
batchEarlyRef (Maybe (Early a) -> IO ()) -> Maybe (Early a) -> IO ()
forall a b. (a -> b) -> a -> b
$ Early a -> Maybe (Early a)
forall a. a -> Maybe a
Just Early a
early
                    CancelBatchException -> IO ()
forall e a. Exception e => e -> IO a
throwIO CancelBatchException
CancelBatchException
                Bool -> m Bool
forall (f :: * -> *) a. Applicative f => a -> f a
pure Bool
b
          }
  (Jobs m a, [ThreadId] -> Scheduler m a)
-> m (Jobs m a, [ThreadId] -> Scheduler m a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Jobs m a
jobs, [ThreadId] -> Scheduler m a
forall (t :: * -> *). Foldable t => t ThreadId -> Scheduler m a
mkScheduler)
{-# INLINEABLE initScheduler #-}

withSchedulerInternal ::
     MonadUnliftIO m
  => Comp -- ^ Computation strategy
  -> (Jobs m a -> (WorkerId -> m a) -> m ()) -- ^ How to schedule work
  -> (JQueue m a -> m [a]) -- ^ How to collect results
  -> (Scheduler m a -> m b)
     -- ^ Action that will be scheduling all the work.
  -> m (Results a)
withSchedulerInternal :: Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> (Scheduler m a -> m b)
-> m (Results a)
withSchedulerInternal Comp
comp Jobs m a -> (WorkerId -> m a) -> m ()
submitWork JQueue m a -> m [a]
collect Scheduler m a -> m b
onScheduler = do
  (jobs :: Jobs m a
jobs@Jobs {Int
MVar SchedulerStatus
PVar Int RealWorld
JQueue m a
jobsSchedulerStatus :: MVar SchedulerStatus
jobsQueueCount :: PVar Int RealWorld
jobsQueue :: JQueue m a
jobsNumWorkers :: Int
jobsSchedulerStatus :: forall (m :: * -> *) a. Jobs m a -> MVar SchedulerStatus
jobsQueueCount :: forall (m :: * -> *) a. Jobs m a -> PVar Int RealWorld
jobsQueue :: forall (m :: * -> *) a. Jobs m a -> JQueue m a
jobsNumWorkers :: forall (m :: * -> *) a. Jobs m a -> Int
..}, [ThreadId] -> Scheduler m a
mkScheduler) <- Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> m (Jobs m a, [ThreadId] -> Scheduler m a)
forall (m :: * -> *) a.
MonadIO m =>
Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> m (Jobs m a, [ThreadId] -> Scheduler m a)
initScheduler Comp
comp Jobs m a -> (WorkerId -> m a) -> m ()
submitWork JQueue m a -> m [a]
collect
  -- / Wait for the initial jobs to get scheduled before spawining off the workers, otherwise it
  -- would be trickier to identify the beginning and the end of a job pool.
  ((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a))
-> ((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
    IO [ThreadId]
-> ([ThreadId] -> IO ())
-> ([ThreadId] -> IO (Results a))
-> IO (Results a)
forall a b c. IO a -> (a -> IO b) -> (a -> IO c) -> IO c
bracket (m [ThreadId] -> IO [ThreadId]
forall a. m a -> IO a
run (Jobs m a -> Comp -> m [ThreadId]
forall (m :: * -> *) a.
MonadUnliftIO m =>
Jobs m a -> Comp -> m [ThreadId]
spawnWorkers Jobs m a
jobs Comp
comp)) [ThreadId] -> IO ()
terminateWorkers (([ThreadId] -> IO (Results a)) -> IO (Results a))
-> ([ThreadId] -> IO (Results a)) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ \[ThreadId]
tids ->
      let scheduler :: Scheduler m a
scheduler = [ThreadId] -> Scheduler m a
mkScheduler [ThreadId]
tids
          readEarlyTermination :: m (Results a)
readEarlyTermination =
            Scheduler m a -> m (Maybe (Results a))
forall (m :: * -> *) a. Scheduler m a -> m (Maybe (Results a))
_earlyResults Scheduler m a
scheduler m (Maybe (Results a))
-> (Maybe (Results a) -> m (Results a)) -> m (Results a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
              Maybe (Results a)
Nothing -> [Char] -> m (Results a)
forall a. HasCallStack => [Char] -> a
error [Char]
"Impossible: uninitialized early termination value"
              Just Results a
rs -> Results a -> m (Results a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure Results a
rs
       in IO b -> IO (Either TerminateEarlyException b)
forall e a. Exception e => IO a -> IO (Either e a)
try (m b -> IO b
forall a. m a -> IO a
run (Scheduler m a -> m b
onScheduler Scheduler m a
scheduler)) IO (Either TerminateEarlyException b)
-> (Either TerminateEarlyException b -> IO (Results a))
-> IO (Results a)
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
            Left TerminateEarlyException
TerminateEarlyException -> m (Results a) -> IO (Results a)
forall a. m a -> IO a
run m (Results a)
readEarlyTermination
            Right b
_ -> do
              m () -> IO ()
forall a. m a -> IO a
run (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ Jobs m a -> (WorkerId -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ Jobs m a
jobs (\WorkerId
_ -> IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> IO () -> m ()
forall a b. (a -> b) -> a -> b
$ IO Int -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO Int -> IO ()) -> IO Int -> IO ()
forall a b. (a -> b) -> a -> b
$ PVar Int RealWorld -> Int -> IO Int
forall s (m :: * -> *). MonadPrim s m => PVar Int s -> Int -> m Int
atomicSubIntPVar PVar Int RealWorld
jobsQueueCount Int
1)
              m () -> IO ()
forall a. m a -> IO a
run (m () -> IO ()) -> m () -> IO ()
forall a b. (a -> b) -> a -> b
$ JQueue m a -> m ()
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m ()
unblockPopJQueue JQueue m a
jobsQueue
              SchedulerStatus
status <- MVar SchedulerStatus -> IO SchedulerStatus
forall a. MVar a -> IO a
takeMVar MVar SchedulerStatus
jobsSchedulerStatus
                -- \ wait for all worker to finish. If any one of the workers had a problem, then
                -- this MVar will contain an exception
              case SchedulerStatus
status of
                SchedulerWorkerException (WorkerException SomeException
exc)
                  | Just TerminateEarlyException
TerminateEarlyException <- SomeException -> Maybe TerminateEarlyException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc -> m (Results a) -> IO (Results a)
forall a. m a -> IO a
run m (Results a)
readEarlyTermination
                  | Just CancelBatchException
CancelBatchException <- SomeException -> Maybe CancelBatchException
forall e. Exception e => SomeException -> Maybe e
fromException SomeException
exc ->
                    m (Results a) -> IO (Results a)
forall a. m a -> IO a
run (m (Results a) -> IO (Results a))
-> m (Results a) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ do
                      Maybe (Early a)
mEarly <- Scheduler m a -> m (Maybe (Early a))
forall (m :: * -> *) a. Scheduler m a -> m (Maybe (Early a))
_batchEarly Scheduler m a
scheduler
                      Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (JQueue m a -> m [a]
collect JQueue m a
jobsQueue)
                  | Bool
otherwise -> SomeException -> IO (Results a)
forall e a. Exception e => e -> IO a
throwIO SomeException
exc
                  -- \ Here we need to unwrap the legit worker exception and rethrow it, so
                  -- the main thread will think like it's his own
                SchedulerStatus
SchedulerIdle ->
                  m (Results a) -> IO (Results a)
forall a. m a -> IO a
run (m (Results a) -> IO (Results a))
-> m (Results a) -> IO (Results a)
forall a b. (a -> b) -> a -> b
$ do
                    Maybe (Early a)
mEarly <- Scheduler m a -> m (Maybe (Early a))
forall (m :: * -> *) a. Scheduler m a -> m (Maybe (Early a))
_batchEarly Scheduler m a
scheduler
                    Maybe (Early a) -> m [a] -> m (Results a)
forall (f :: * -> *) a.
Applicative f =>
Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly (JQueue m a -> m [a]
collect JQueue m a
jobsQueue)
                  -- \ Now we are sure all workers have done their job we can safely read
                  -- all of the IORefs with results
{-# INLINEABLE withSchedulerInternal #-}


collectResults :: Applicative f => Maybe (Early a) -> f [a] -> f (Results a)
collectResults :: Maybe (Early a) -> f [a] -> f (Results a)
collectResults Maybe (Early a)
mEarly f [a]
collect =
  case Maybe (Early a)
mEarly of
    Maybe (Early a)
Nothing -> [a] -> Results a
forall a. [a] -> Results a
Finished ([a] -> Results a) -> f [a] -> f (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> f [a]
collect
    Just (Early a
r) -> [a] -> a -> Results a
forall a. [a] -> a -> Results a
FinishedEarly ([a] -> a -> Results a) -> f [a] -> f (a -> Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> f [a]
collect f (a -> Results a) -> f a -> f (Results a)
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> a -> f a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
r
    Just (EarlyWith a
r) -> Results a -> f (Results a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Results a -> f (Results a)) -> Results a -> f (Results a)
forall a b. (a -> b) -> a -> b
$ a -> Results a
forall a. a -> Results a
FinishedEarlyWith a
r
{-# INLINEABLE collectResults #-}


spawnWorkers :: forall m a. MonadUnliftIO m => Jobs m a -> Comp -> m [ThreadId]
spawnWorkers :: Jobs m a -> Comp -> m [ThreadId]
spawnWorkers jobs :: Jobs m a
jobs@Jobs {Int
jobsNumWorkers :: Int
jobsNumWorkers :: forall (m :: * -> *) a. Jobs m a -> Int
jobsNumWorkers} =
  \case
    Comp
Par -> MonadUnliftIO m =>
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
forkOnWithUnmask [Int
1 .. Int
jobsNumWorkers]
    ParOn [Int]
ws -> MonadUnliftIO m =>
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
forkOnWithUnmask [Int]
ws
    ParN Word16
_ -> MonadUnliftIO m =>
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith (\Int
_ -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
forkIOWithUnmask) [Int
1 .. Int
jobsNumWorkers]
    Comp
Seq -> MonadUnliftIO m =>
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
(Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith (\Int
_ -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
forkIOWithUnmask) [Int
1 :: Int]
    -- \ sequential computation is suboptimal when used in this way.
  where
    spawnWorkersWith ::
         MonadUnliftIO m
      => (Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
      -> [Int]
      -> m [ThreadId]
    spawnWorkersWith :: (Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> [Int] -> m [ThreadId]
spawnWorkersWith Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
fork [Int]
ws =
      ((forall a. m a -> IO a) -> IO [ThreadId]) -> m [ThreadId]
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO [ThreadId]) -> m [ThreadId])
-> ((forall a. m a -> IO a) -> IO [ThreadId]) -> m [ThreadId]
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run ->
        [(WorkerId, Int)]
-> ((WorkerId, Int) -> IO ThreadId) -> IO [ThreadId]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM ([WorkerId] -> [Int] -> [(WorkerId, Int)]
forall a b. [a] -> [b] -> [(a, b)]
zip [WorkerId
0 ..] [Int]
ws) (((WorkerId, Int) -> IO ThreadId) -> IO [ThreadId])
-> ((WorkerId, Int) -> IO ThreadId) -> IO [ThreadId]
forall a b. (a -> b) -> a -> b
$ \(WorkerId
wId, Int
on) ->
          Int -> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
fork Int
on (((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId)
-> ((forall c. IO c -> IO c) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall c. IO c -> IO c
unmask -> (forall a. m a -> IO a)
-> (forall c. IO c -> IO c) -> WorkerId -> Jobs m a -> IO ()
forall (m :: * -> *) a.
MonadUnliftIO m =>
(forall b. m b -> IO b)
-> (forall c. IO c -> IO c) -> WorkerId -> Jobs m a -> IO ()
runWorker forall a. m a -> IO a
run forall c. IO c -> IO c
unmask WorkerId
wId Jobs m a
jobs
{-# INLINEABLE spawnWorkers #-}

terminateWorkers :: [ThreadId] -> IO ()
terminateWorkers :: [ThreadId] -> IO ()
terminateWorkers = (ThreadId -> IO ()) -> [ThreadId] -> IO ()
forall (f :: * -> *) (t :: * -> *) a.
(Applicative f, Foldable t) =>
(a -> f ()) -> t a -> f ()
traverse_ (ThreadId -> SomeAsyncException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
`throwTo` WorkerTerminateException -> SomeAsyncException
forall e. Exception e => e -> SomeAsyncException
SomeAsyncException WorkerTerminateException
WorkerTerminateException)

-- | Conversion to a list. Elements are expected to be in the orignal LIFO order, so
-- calling `reverse` is still necessary for getting the results in FIFO order.
resultsToList :: Results a -> [a]
resultsToList :: Results a -> [a]
resultsToList = \case
  Finished [a]
rs -> [a]
rs
  FinishedEarly [a]
rs a
r -> a
ra -> [a] -> [a]
forall a. a -> [a] -> [a]
:[a]
rs
  FinishedEarlyWith a
r -> [a
r]
{-# INLINEABLE resultsToList #-}


reverseResults :: Results a -> Results a
reverseResults :: Results a -> Results a
reverseResults = \case
  Finished [a]
rs -> [a] -> Results a
forall a. [a] -> Results a
Finished ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
rs)
  FinishedEarly [a]
rs a
r -> [a] -> a -> Results a
forall a. [a] -> a -> Results a
FinishedEarly ([a] -> [a]
forall a. [a] -> [a]
reverse [a]
rs) a
r
  Results a
res -> Results a
res
{-# INLINEABLE reverseResults #-}



-- Copies from unliftio

isSyncException :: Exception e => e -> Bool
isSyncException :: e -> Bool
isSyncException e
exc =
  case SomeException -> Maybe SomeAsyncException
forall e. Exception e => SomeException -> Maybe e
fromException (e -> SomeException
forall e. Exception e => e -> SomeException
toException e
exc) of
    Just (SomeAsyncException e
_) -> Bool
False
    Maybe SomeAsyncException
Nothing -> Bool
True

safeBracketOnError :: MonadUnliftIO m => m a -> (a -> m b) -> (a -> m c) -> m c
safeBracketOnError :: m a -> (a -> m b) -> (a -> m c) -> m c
safeBracketOnError m a
before a -> m b
after a -> m c
thing = ((forall a. m a -> IO a) -> IO c) -> m c
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO c) -> m c)
-> ((forall a. m a -> IO a) -> IO c) -> m c
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> ((forall c. IO c -> IO c) -> IO c) -> IO c
forall b. ((forall c. IO c -> IO c) -> IO b) -> IO b
mask (((forall c. IO c -> IO c) -> IO c) -> IO c)
-> ((forall c. IO c -> IO c) -> IO c) -> IO c
forall a b. (a -> b) -> a -> b
$ \forall c. IO c -> IO c
restore -> do
  a
x <- m a -> IO a
forall a. m a -> IO a
run m a
before
  Either SomeException c
res1 <- IO c -> IO (Either SomeException c)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO c -> IO (Either SomeException c))
-> IO c -> IO (Either SomeException c)
forall a b. (a -> b) -> a -> b
$ IO c -> IO c
forall c. IO c -> IO c
restore (IO c -> IO c) -> IO c -> IO c
forall a b. (a -> b) -> a -> b
$ m c -> IO c
forall a. m a -> IO a
run (m c -> IO c) -> m c -> IO c
forall a b. (a -> b) -> a -> b
$ a -> m c
thing a
x
  case Either SomeException c
res1 of
    Left (SomeException
e1 :: SomeException) -> do
      Either SomeException b
_ :: Either SomeException b <-
        IO b -> IO (Either SomeException b)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO b -> IO (Either SomeException b))
-> IO b -> IO (Either SomeException b)
forall a b. (a -> b) -> a -> b
$ IO b -> IO b
forall c. IO c -> IO c
uninterruptibleMask_ (IO b -> IO b) -> IO b -> IO b
forall a b. (a -> b) -> a -> b
$ m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> m b -> IO b
forall a b. (a -> b) -> a -> b
$ a -> m b
after a
x
      SomeException -> IO c
forall e a. Exception e => e -> IO a
throwIO SomeException
e1
    Right c
y -> c -> IO c
forall (m :: * -> *) a. Monad m => a -> m a
return c
y