{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MonoLocalBinds #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
-- |
-- Module      : Control.Scheduler
-- Copyright   : (c) Alexey Kuleshevich 2018-2021
-- License     : BSD3
-- Maintainer  : Alexey Kuleshevich <lehins@yandex.ru>
-- Stability   : experimental
-- Portability : non-portable
--
module Control.Scheduler
  ( -- * Scheduler
    Scheduler
  , SchedulerWS
  , Results(..)
    -- ** Regular
  , withScheduler
  , withScheduler_
  , withSchedulerR
    -- ** Stateful workers
  , withSchedulerWS
  , withSchedulerWS_
  , withSchedulerWSR
  , unwrapSchedulerWS
    -- ** Trivial (no parallelism)
  , trivialScheduler_
  , withTrivialScheduler
  , withTrivialSchedulerR
  -- * Scheduling computation
  , scheduleWork
  , scheduleWork_
  , scheduleWorkId
  , scheduleWorkId_
  , scheduleWorkState
  , scheduleWorkState_
  , replicateWork
  , replicateWork_
  -- * Batches
  , Batch
  , runBatch
  , runBatch_
  , runBatchR
  , cancelBatch
  , cancelBatch_
  , cancelBatchWith
  , hasBatchFinished
  , getCurrentBatch
  -- * Early termination
  , terminate
  , terminate_
  , terminateWith
  -- * Workers
  , WorkerId(..)
  , WorkerStates
  , numWorkers
  , workerStatesComp
  , initWorkerStates
  -- * Computation strategies
  , Comp(..)
  , getCompWorkers
  -- * Useful functions
  , replicateConcurrently
  , replicateConcurrently_
  , traverseConcurrently
  , traverseConcurrently_
  , traverse_
  -- * Exceptions
  -- $exceptions
  , MutexException(..)
  ) where

import Control.Monad
import Control.Monad.ST
import Control.Monad.IO.Unlift
import Control.Monad.Primitive
import Control.Scheduler.Computation
import Control.Scheduler.Internal
import Control.Scheduler.Types
import Control.Scheduler.Queue
import qualified Data.Foldable as F (traverse_, toList)
import Data.Primitive.SmallArray
import Data.Traversable


-- | Get the underlying `Scheduler`, which cannot access `WorkerStates`.
--
-- @since 1.4.0
unwrapSchedulerWS :: SchedulerWS ws a -> Scheduler RealWorld a
unwrapSchedulerWS :: SchedulerWS ws a -> Scheduler RealWorld a
unwrapSchedulerWS = SchedulerWS ws a -> Scheduler RealWorld a
forall ws a. SchedulerWS ws a -> Scheduler RealWorld a
_getScheduler


-- | Get the computation strategy the states where initialized with.
--
-- @since 1.4.0
workerStatesComp :: WorkerStates ws -> Comp
workerStatesComp :: WorkerStates ws -> Comp
workerStatesComp = WorkerStates ws -> Comp
forall ws. WorkerStates ws -> Comp
_workerStatesComp


-- | Run a scheduler with stateful workers. Throws `MutexException` if an attempt is made
-- to concurrently use the same `WorkerStates` with another `SchedulerWS`.
--
-- ==== __Examples__
--
-- A good example of using stateful workers would be generation of random number
-- in parallel. A lof of times random number generators are not thread safe, so
-- we can work around this problem with a separate stateful generator for
-- each of the workers.
--
-- >>> import Control.Monad as M ((>=>), replicateM)
-- >>> import Control.Concurrent (yield, threadDelay)
-- >>> import Data.List (sort)
-- >>> -- ^ Above imports are used to make sure output is deterministic, which is needed for doctest
-- >>> import System.Random.MWC as MWC
-- >>> import Data.Vector.Unboxed as V (singleton)
-- >>> states <- initWorkerStates (ParN 4) (MWC.initialize . V.singleton . fromIntegral . getWorkerId)
-- >>> let scheduleGen scheduler = scheduleWorkState scheduler (MWC.uniform >=> \r -> yield >> threadDelay 200000 >> pure r)
-- >>> sort <$> withSchedulerWS states (M.replicateM 4 . scheduleGen) :: IO [Double]
-- [0.21734983682025255,0.5000843862105709,0.5759825622603018,0.8587171114177893]
-- >>> sort <$> withSchedulerWS states (M.replicateM 4 . scheduleGen) :: IO [Double]
-- [2.3598617298033475e-2,9.949679290089553e-2,0.38223134248645885,0.7408640677124702]
--
-- In the above example we use four different random number generators from
-- [`mwc-random`](https://www.stackage.org/package/mwc-random) in order to generate 4
-- numbers, all in separate threads. The subsequent call to the `withSchedulerWS` function
-- with the same @states@ is allowed to reuse the same generators, thus avoiding expensive
-- initialization.
--
-- /Side note/ - The example presented was crafted with slight trickery in order to
-- guarantee that the output is deterministic, so if you run instructions exactly the same
-- way in GHCI you will get the exact same output. Non-determinism comes from thread
-- scheduling, rather than from random number generator, because we use exactly the same
-- seed for each worker, but workers run concurrently. Exact output is not really needed,
-- except for the doctests to pass.
--
-- @since 1.4.0
withSchedulerWS ::
     MonadUnliftIO m => WorkerStates ws -> (SchedulerWS ws a -> m b) -> m [a]
withSchedulerWS :: WorkerStates ws -> (SchedulerWS ws a -> m b) -> m [a]
withSchedulerWS = (Comp -> (Scheduler RealWorld a -> m b) -> m [a])
-> WorkerStates ws -> (SchedulerWS ws a -> m b) -> m [a]
forall (m :: * -> *) a t b ws.
MonadUnliftIO m =>
(Comp -> (Scheduler RealWorld a -> t) -> m b)
-> WorkerStates ws -> (SchedulerWS ws a -> t) -> m b
withSchedulerWSInternal Comp -> (Scheduler RealWorld a -> m b) -> m [a]
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m [a]
withScheduler

-- | Run a scheduler with stateful workers, while discarding computation results.
--
-- @since 1.4.0
withSchedulerWS_ ::
     MonadUnliftIO m => WorkerStates ws -> (SchedulerWS ws () -> m b) -> m ()
withSchedulerWS_ :: WorkerStates ws -> (SchedulerWS ws () -> m b) -> m ()
withSchedulerWS_ = (Comp -> (Scheduler RealWorld () -> m b) -> m ())
-> WorkerStates ws -> (SchedulerWS ws () -> m b) -> m ()
forall (m :: * -> *) a t b ws.
MonadUnliftIO m =>
(Comp -> (Scheduler RealWorld a -> t) -> m b)
-> WorkerStates ws -> (SchedulerWS ws a -> t) -> m b
withSchedulerWSInternal Comp -> (Scheduler RealWorld () -> m b) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m ()
withScheduler_

-- | Same as `withSchedulerWS`, except instead of a list it produces `Results`, which
-- allows for distinguishing between the ways computation was terminated.
--
-- @since 1.4.2
withSchedulerWSR ::
     MonadUnliftIO m
  => WorkerStates ws
  -> (SchedulerWS ws a -> m b)
  -> m (Results a)
withSchedulerWSR :: WorkerStates ws -> (SchedulerWS ws a -> m b) -> m (Results a)
withSchedulerWSR = (Comp -> (Scheduler RealWorld a -> m b) -> m (Results a))
-> WorkerStates ws -> (SchedulerWS ws a -> m b) -> m (Results a)
forall (m :: * -> *) a t b ws.
MonadUnliftIO m =>
(Comp -> (Scheduler RealWorld a -> t) -> m b)
-> WorkerStates ws -> (SchedulerWS ws a -> t) -> m b
withSchedulerWSInternal Comp -> (Scheduler RealWorld a -> m b) -> m (Results a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m (Results a)
withSchedulerR


-- | Schedule a job that will get a worker state passed as an argument
--
-- @since 1.4.0
scheduleWorkState :: MonadPrimBase RealWorld m => SchedulerWS ws a -> (ws -> m a) -> m ()
scheduleWorkState :: SchedulerWS ws a -> (ws -> m a) -> m ()
scheduleWorkState SchedulerWS ws a
schedulerS ws -> m a
withState =
  Scheduler RealWorld a -> (WorkerId -> m a) -> m ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> (WorkerId -> m a) -> m ()
scheduleWorkId (SchedulerWS ws a -> Scheduler RealWorld a
forall ws a. SchedulerWS ws a -> Scheduler RealWorld a
_getScheduler SchedulerWS ws a
schedulerS) ((WorkerId -> m a) -> m ()) -> (WorkerId -> m a) -> m ()
forall a b. (a -> b) -> a -> b
$ \(WorkerId Int
i) ->
    ws -> m a
withState (SmallArray ws -> Int -> ws
forall a. SmallArray a -> Int -> a
indexSmallArray (WorkerStates ws -> SmallArray ws
forall ws. WorkerStates ws -> SmallArray ws
_workerStatesArray (SchedulerWS ws a -> WorkerStates ws
forall ws a. SchedulerWS ws a -> WorkerStates ws
_workerStates SchedulerWS ws a
schedulerS)) Int
i)

-- | Same as `scheduleWorkState`, but dont' keep the result of computation.
--
-- @since 1.4.0
scheduleWorkState_ :: MonadPrimBase RealWorld m => SchedulerWS ws () -> (ws -> m ()) -> m ()
scheduleWorkState_ :: SchedulerWS ws () -> (ws -> m ()) -> m ()
scheduleWorkState_ SchedulerWS ws ()
schedulerS ws -> m ()
withState =
  Scheduler RealWorld () -> (WorkerId -> m ()) -> m ()
forall s (m :: * -> *).
MonadPrimBase s m =>
Scheduler s () -> (WorkerId -> m ()) -> m ()
scheduleWorkId_ (SchedulerWS ws () -> Scheduler RealWorld ()
forall ws a. SchedulerWS ws a -> Scheduler RealWorld a
_getScheduler SchedulerWS ws ()
schedulerS) ((WorkerId -> m ()) -> m ()) -> (WorkerId -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(WorkerId Int
i) ->
    ws -> m ()
withState (SmallArray ws -> Int -> ws
forall a. SmallArray a -> Int -> a
indexSmallArray (WorkerStates ws -> SmallArray ws
forall ws. WorkerStates ws -> SmallArray ws
_workerStatesArray (SchedulerWS ws () -> WorkerStates ws
forall ws a. SchedulerWS ws a -> WorkerStates ws
_workerStates SchedulerWS ws ()
schedulerS)) Int
i)


-- | Get the number of workers. Will mainly depend on the computation strategy and/or number of
-- capabilities you have. Related function is `getCompWorkers`.
--
-- @since 1.0.0
numWorkers :: Scheduler s a -> Int
numWorkers :: Scheduler s a -> Int
numWorkers = Scheduler s a -> Int
forall s a. Scheduler s a -> Int
_numWorkers


-- | Schedule an action to be picked up and computed by a worker from a pool of
-- jobs. Argument supplied to the job will be the id of the worker doing the job. This is
-- useful for identification of a thread that will be doing the work, since there is
-- one-to-one mapping from `Control.Concurrent.ThreadId` to `WorkerId` for a particular
-- scheduler.
--
-- @since 1.2.0
scheduleWorkId :: MonadPrimBase s m => Scheduler s a -> (WorkerId -> m a) -> m ()
scheduleWorkId :: Scheduler s a -> (WorkerId -> m a) -> m ()
scheduleWorkId Scheduler s a
s WorkerId -> m a
f = ST (PrimState m) () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (Scheduler s a -> (WorkerId -> ST s a) -> ST s ()
forall s a. Scheduler s a -> (WorkerId -> ST s a) -> ST s ()
_scheduleWorkId Scheduler s a
s (m a -> ST s a
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim (m a -> ST s a) -> (WorkerId -> m a) -> WorkerId -> ST s a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerId -> m a
f))

-- | As soon as possible try to terminate any computation that is being performed by all
-- workers managed by this scheduler and collect whatever results have been computed, with
-- supplied element guaranteed to being the last one. In case when `Results` type is
-- returned this function will cause the scheduler to produce `FinishedEarly`
--
-- /Important/ - With `Seq` strategy this will not stop other scheduled tasks from being computed,
-- although it will make sure their results are discarded.
--
-- @since 1.1.0
terminate :: MonadPrim s m => Scheduler s a -> a -> m a
terminate :: Scheduler s a -> a -> m a
terminate Scheduler s a
scheduler a
a = ST (PrimState m) a -> m a
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (Scheduler s a -> Early a -> ST s a
forall s a. Scheduler s a -> Early a -> ST s a
_terminate Scheduler s a
scheduler (a -> Early a
forall a. a -> Early a
Early a
a))

-- | Same as `terminate`, but returning a single element list containing the supplied
-- argument. This can be very useful for parallel search algorithms. In case when
-- `Results` is the return type this function will cause the scheduler to produce
-- `FinishedEarlyWith`
--
-- /Important/ - Same as with `terminate`, when `Seq` strategy is used, this will not prevent
-- computation from continuing, but the scheduler will return only the result supplied to this
-- function.
--
-- @since 1.1.0
terminateWith :: MonadPrim s m => Scheduler s a -> a -> m a
terminateWith :: Scheduler s a -> a -> m a
terminateWith Scheduler s a
scheduler a
a = ST (PrimState m) a -> m a
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) a -> m a) -> ST (PrimState m) a -> m a
forall a b. (a -> b) -> a -> b
$ Scheduler s a -> Early a -> ST s a
forall s a. Scheduler s a -> Early a -> ST s a
_terminate Scheduler s a
scheduler (a -> Early a
forall a. a -> Early a
EarlyWith a
a)

-- | Schedule an action to be picked up and computed by a worker from a pool of
-- jobs. Similar to `scheduleWorkId`, except the job doesn't get the worker id.
--
-- @since 1.0.0
scheduleWork :: MonadPrimBase s m => Scheduler s a -> m a -> m ()
scheduleWork :: Scheduler s a -> m a -> m ()
scheduleWork Scheduler s a
scheduler m a
f = ST (PrimState m) () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) () -> m ()) -> ST (PrimState m) () -> m ()
forall a b. (a -> b) -> a -> b
$ Scheduler s a -> (WorkerId -> ST s a) -> ST s ()
forall s a. Scheduler s a -> (WorkerId -> ST s a) -> ST s ()
_scheduleWorkId Scheduler s a
scheduler (ST s a -> WorkerId -> ST s a
forall a b. a -> b -> a
const (m a -> ST s a
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim m a
f))


-- FIXME: get rid of scheduleJob and decide at `scheduleWork` level if we should use Job or Job_
-- Type here should be `scheduleWork_ :: Scheduler s a -> m () -> m ()
-- | Same as `scheduleWork`, but only for a `Scheduler` that doesn't keep the results.
--
-- @since 1.1.0
scheduleWork_ :: MonadPrimBase s m => Scheduler s () -> m () -> m ()
scheduleWork_ :: Scheduler s () -> m () -> m ()
scheduleWork_ Scheduler s ()
s = ST s () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST s () -> m ()) -> (m () -> ST s ()) -> m () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Scheduler s () -> ST s () -> ST s ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler s ()
s (ST s () -> ST s ()) -> (m () -> ST s ()) -> m () -> ST s ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> ST s ()
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim

-- | Same as `scheduleWorkId`, but only for a `Scheduler` that doesn't keep the results.
--
-- @since 1.2.0
scheduleWorkId_ :: MonadPrimBase s m => Scheduler s () -> (WorkerId -> m ()) -> m ()
scheduleWorkId_ :: Scheduler s () -> (WorkerId -> m ()) -> m ()
scheduleWorkId_ Scheduler s ()
scheduler WorkerId -> m ()
f = ST (PrimState m) () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) () -> m ()) -> ST (PrimState m) () -> m ()
forall a b. (a -> b) -> a -> b
$ Scheduler s () -> (WorkerId -> ST s ()) -> ST s ()
forall s a. Scheduler s a -> (WorkerId -> ST s a) -> ST s ()
_scheduleWorkId Scheduler s ()
scheduler (m () -> ST s ()
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim (m () -> ST s ()) -> (WorkerId -> m ()) -> WorkerId -> ST s ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. WorkerId -> m ()
f)

-- | Schedule the same action to run @n@ times concurrently. This differs from
-- `replicateConcurrently` by allowing the caller to use the `Scheduler` freely,
-- or to allow early termination via `terminate` across all (identical) threads.
-- To be called within a `withScheduler` block.
--
-- @since 2.0.0
replicateWork :: MonadPrimBase s m => Scheduler s a -> Int -> m a -> m ()
replicateWork :: Scheduler s a -> Int -> m a -> m ()
replicateWork Scheduler s a
scheduler Int
n m a
f = Int -> m ()
go Int
n
  where
    go :: Int -> m ()
go !Int
k
      | Int
k Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      | Bool
otherwise = ST (PrimState m) () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (Scheduler s a -> ST s a -> ST s ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler s a
scheduler (m a -> ST s a
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim m a
f)) m () -> m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Int -> m ()
go (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)


-- | Same as `replicateWork`, but it does not retain the results of scheduled jobs
--
-- @since 2.0.0
replicateWork_ :: MonadPrimBase s m => Scheduler s () -> Int -> m a -> m ()
replicateWork_ :: Scheduler s () -> Int -> m a -> m ()
replicateWork_ Scheduler s ()
scheduler Int
n m a
f = Int -> m ()
go Int
n
  where
    go :: Int -> m ()
go !Int
k
      | Int
k Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
0 = () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      | Bool
otherwise = ST (PrimState m) () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (Scheduler s () -> ST s () -> ST s ()
forall s (m :: * -> *).
MonadPrimBase s m =>
Scheduler s () -> m () -> m ()
scheduleWork_ Scheduler s ()
scheduler (m () -> ST s ()
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim (m a -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void m a
f))) m () -> m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> Int -> m ()
go (Int
k Int -> Int -> Int
forall a. Num a => a -> a -> a
- Int
1)

-- | Similar to `terminate`, but for a `Scheduler` that does not keep any results of computation.
--
-- /Important/ - In case of `Seq` computation strategy this function has no affect.
--
-- @since 1.1.0
terminate_ :: MonadPrim s m => Scheduler s () -> m ()
terminate_ :: Scheduler s () -> m ()
terminate_ Scheduler s ()
s = ST (PrimState m) () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) () -> m ()) -> ST (PrimState m) () -> m ()
forall a b. (a -> b) -> a -> b
$ Scheduler s () -> Early () -> ST s ()
forall s a. Scheduler s a -> Early a -> ST s a
_terminate Scheduler s ()
s (() -> Early ()
forall a. a -> Early a
Early ())


-- | This trivial scheduler will behave in the same way as `withScheduler` with `Seq`
-- computation strategy, except it is restricted to `PrimMonad`, instead of `MonadUnliftIO`.
--
-- @since 1.4.2
withTrivialScheduler :: MonadPrim s m => (Scheduler s a -> m b) -> m [a]
withTrivialScheduler :: (Scheduler s a -> m b) -> m [a]
withTrivialScheduler Scheduler s a -> m b
action = Results a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
F.toList (Results a -> [a]) -> m (Results a) -> m [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Scheduler s a -> m b) -> m (Results a)
forall a b (m :: * -> *) s.
MonadPrim s m =>
(Scheduler s a -> m b) -> m (Results a)
withTrivialSchedulerR Scheduler s a -> m b
action



-- | Map an action over each element of the `Traversable` @t@ acccording to the supplied computation
-- strategy.
--
-- @since 1.0.0
traverseConcurrently :: (MonadUnliftIO m, Traversable t) => Comp -> (a -> m b) -> t a -> m (t b)
traverseConcurrently :: Comp -> (a -> m b) -> t a -> m (t b)
traverseConcurrently Comp
comp a -> m b
f t a
xs = ((forall a. m a -> IO a) -> IO (t b)) -> m (t b)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (t b)) -> m (t b))
-> ((forall a. m a -> IO a) -> IO (t b)) -> m (t b)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
  [b]
ys <- Comp -> (Scheduler RealWorld b -> IO ()) -> IO [b]
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m [a]
withScheduler Comp
comp ((Scheduler RealWorld b -> IO ()) -> IO [b])
-> (Scheduler RealWorld b -> IO ()) -> IO [b]
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld b
s -> (a -> IO ()) -> t a -> IO ()
forall (f :: * -> *) (t :: * -> *) a.
(Applicative f, Foldable t) =>
(a -> f ()) -> t a -> f ()
traverse_ (Scheduler RealWorld b -> IO b -> IO ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler RealWorld b
s (IO b -> IO ()) -> (a -> IO b) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (a -> m b) -> a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f) t a
xs
  t b -> IO (t b)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (t b -> IO (t b)) -> t b -> IO (t b)
forall a b. (a -> b) -> a -> b
$ [b] -> t a -> t b
forall (t :: * -> *) a b. Traversable t => [a] -> t b -> t a
transList [b]
ys t a
xs

transList :: Traversable t => [a] -> t b -> t a
transList :: [a] -> t b -> t a
transList [a]
xs' = ([a], t a) -> t a
forall a b. (a, b) -> b
snd (([a], t a) -> t a) -> (t b -> ([a], t a)) -> t b -> t a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([a] -> b -> ([a], a)) -> [a] -> t b -> ([a], t a)
forall (t :: * -> *) a b c.
Traversable t =>
(a -> b -> (a, c)) -> a -> t b -> (a, t c)
mapAccumL [a] -> b -> ([a], a)
forall b p. [b] -> p -> ([b], b)
withR [a]
xs'
  where
    withR :: [b] -> p -> ([b], b)
withR (b
x:[b]
xs) p
_ = ([b]
xs, b
x)
    withR [b]
_      p
_ = [Char] -> ([b], b)
forall a. [Char] -> a
errorWithoutStackTrace [Char]
"Impossible<traverseConcurrently> - Mismatched sizes"

-- | Just like `traverseConcurrently`, but restricted to `Foldable` and discards the results of
-- computation.
--
-- @since 1.0.0
traverseConcurrently_ :: (MonadUnliftIO m, Foldable t) => Comp -> (a -> m b) -> t a -> m ()
traverseConcurrently_ :: Comp -> (a -> m b) -> t a -> m ()
traverseConcurrently_ Comp
comp a -> m b
f t a
xs =
  ((forall a. m a -> IO a) -> IO ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run ->
    Comp -> (Scheduler RealWorld () -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m ()
withScheduler_ Comp
comp ((Scheduler RealWorld () -> IO ()) -> IO ())
-> (Scheduler RealWorld () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld ()
s -> Scheduler RealWorld () -> IO () -> IO ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler RealWorld ()
s (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ (a -> IO ()) -> t a -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
F.traverse_ (Scheduler RealWorld () -> IO () -> IO ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler RealWorld ()
s (IO () -> IO ()) -> (a -> IO ()) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO b -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO b -> IO ()) -> (a -> IO b) -> a -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (a -> m b) -> a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f) t a
xs

-- | Replicate an action @n@ times and schedule them acccording to the supplied computation
-- strategy.
--
-- @since 1.1.0
replicateConcurrently :: MonadUnliftIO m => Comp -> Int -> m a -> m [a]
replicateConcurrently :: Comp -> Int -> m a -> m [a]
replicateConcurrently Comp
comp Int
n m a
f =
  ((forall a. m a -> IO a) -> IO [a]) -> m [a]
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO [a]) -> m [a])
-> ((forall a. m a -> IO a) -> IO [a]) -> m [a]
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run ->
    Comp -> (Scheduler RealWorld a -> IO ()) -> IO [a]
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m [a]
withScheduler Comp
comp ((Scheduler RealWorld a -> IO ()) -> IO [a])
-> (Scheduler RealWorld a -> IO ()) -> IO [a]
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld a
s -> Int -> IO () -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Scheduler RealWorld a -> IO a -> IO ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler RealWorld a
s (m a -> IO a
forall a. m a -> IO a
run m a
f)

-- | Just like `replicateConcurrently`, but discards the results of computation.
--
-- @since 1.1.0
replicateConcurrently_ :: MonadUnliftIO m => Comp -> Int -> m a -> m ()
replicateConcurrently_ :: Comp -> Int -> m a -> m ()
replicateConcurrently_ Comp
comp Int
n m a
f =
  ((forall a. m a -> IO a) -> IO ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
    Comp -> (Scheduler RealWorld () -> IO ()) -> IO ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler RealWorld a -> m b) -> m ()
withScheduler_ Comp
comp ((Scheduler RealWorld () -> IO ()) -> IO ())
-> (Scheduler RealWorld () -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \Scheduler RealWorld ()
s -> Scheduler RealWorld () -> IO () -> IO ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler RealWorld ()
s (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ Int -> IO () -> IO ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (Scheduler RealWorld () -> IO () -> IO ()
forall s (m :: * -> *) a.
MonadPrimBase s m =>
Scheduler s a -> m a -> m ()
scheduleWork Scheduler RealWorld ()
s (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ IO a -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO a -> IO ()) -> IO a -> IO ()
forall a b. (a -> b) -> a -> b
$ m a -> IO a
forall a. m a -> IO a
run m a
f)




-- | Initialize a scheduler and submit jobs that will be computed sequentially or in parallelel,
-- which is determined by the `Comp`utation strategy.
--
-- Here are some cool properties about the `withScheduler`:
--
-- * This function will block until all of the submitted jobs have finished or at least one of them
--   resulted in an exception, which will be re-thrown at the callsite.
--
-- * It is totally fine for nested jobs to submit more jobs for the same or other scheduler
--
-- * It is ok to initialize multiple schedulers at the same time, although that will likely result
--   in suboptimal performance, unless workers are pinned to different capabilities.
--
-- * __Warning__ It is pretty dangerous to schedule jobs that can block, because it might
--   lead to a potential deadlock, if you are not careful. Consider this example. First
--   execution works fine, since there are two scheduled workers, and one can unblock the
--   other, but the second scenario immediately results in a deadlock.
--
-- >>> withScheduler (ParOn [1,2]) $ \s -> newEmptyMVar >>= (\ mv -> scheduleWork s (readMVar mv) >> scheduleWork s (putMVar mv ()))
-- [(),()]
-- >>> import System.Timeout
-- >>> timeout 1000000 $ withScheduler (ParOn [1]) $ \s -> newEmptyMVar >>= (\ mv -> scheduleWork s (readMVar mv) >> scheduleWork s (putMVar mv ()))
-- Nothing
--
-- __Important__: In order to get work done truly in parallel, program needs to be compiled with
-- @-threaded@ GHC flag and executed with @+RTS -N -RTS@ to use all available cores.
--
-- @since 1.0.0
withScheduler ::
     MonadUnliftIO m
  => Comp -- ^ Computation strategy
  -> (Scheduler RealWorld a -> m b)
     -- ^ Action that will be scheduling all the work.
  -> m [a]
withScheduler :: Comp -> (Scheduler RealWorld a -> m b) -> m [a]
withScheduler Comp
Seq Scheduler RealWorld a -> m b
f =
  ((forall a. m a -> IO a) -> IO [a]) -> m [a]
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO [a]) -> m [a])
-> ((forall a. m a -> IO a) -> IO [a]) -> m [a]
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
    [a] -> [a]
forall a. [a] -> [a]
reverse ([a] -> [a]) -> (Results a -> [a]) -> Results a -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Results a -> [a]
forall a. Results a -> [a]
resultsToList (Results a -> [a]) -> IO (Results a) -> IO [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Scheduler RealWorld a -> IO b) -> IO (Results a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
(Scheduler RealWorld a -> m b) -> m (Results a)
withTrivialSchedulerRIO (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b)
-> (Scheduler RealWorld a -> m b) -> Scheduler RealWorld a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Scheduler RealWorld a -> m b
f)
withScheduler Comp
comp Scheduler RealWorld a -> m b
f =
  ((forall a. m a -> IO a) -> IO [a]) -> m [a]
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO [a]) -> m [a])
-> ((forall a. m a -> IO a) -> IO [a]) -> m [a]
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
    [a] -> [a]
forall a. [a] -> [a]
reverse ([a] -> [a]) -> (Results a -> [a]) -> Results a -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Results a -> [a]
forall a. Results a -> [a]
resultsToList (Results a -> [a]) -> IO (Results a) -> IO [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> (Scheduler RealWorld a -> IO b)
-> IO (Results a)
forall a b.
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> (Scheduler RealWorld a -> IO b)
-> IO (Results a)
withSchedulerInternal Comp
comp Jobs IO a -> (WorkerId -> IO a) -> IO ()
forall (m :: * -> *) a.
MonadIO m =>
Jobs m a -> (WorkerId -> m a) -> m ()
scheduleJobs JQueue IO a -> IO [a]
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m [a]
readResults (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b)
-> (Scheduler RealWorld a -> m b) -> Scheduler RealWorld a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Scheduler RealWorld a -> m b
f)
{-# INLINE withScheduler #-}

-- | Same as `withScheduler`, except instead of a list it produces `Results`, which allows
-- for distinguishing between the ways computation was terminated.
--
-- @since 1.4.2
withSchedulerR ::
     MonadUnliftIO m
  => Comp -- ^ Computation strategy
  -> (Scheduler RealWorld a -> m b)
     -- ^ Action that will be scheduling all the work.
  -> m (Results a)
withSchedulerR :: Comp -> (Scheduler RealWorld a -> m b) -> m (Results a)
withSchedulerR Comp
Seq Scheduler RealWorld a -> m b
f =
  ((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a))
-> ((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
    Results a -> Results a
forall a. Results a -> Results a
reverseResults (Results a -> Results a) -> IO (Results a) -> IO (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Scheduler RealWorld a -> IO b) -> IO (Results a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
(Scheduler RealWorld a -> m b) -> m (Results a)
withTrivialSchedulerRIO (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b)
-> (Scheduler RealWorld a -> m b) -> Scheduler RealWorld a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Scheduler RealWorld a -> m b
f)
withSchedulerR Comp
comp Scheduler RealWorld a -> m b
f =
  ((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a))
-> ((forall a. m a -> IO a) -> IO (Results a)) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
    Results a -> Results a
forall a. Results a -> Results a
reverseResults (Results a -> Results a) -> IO (Results a) -> IO (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> (Scheduler RealWorld a -> IO b)
-> IO (Results a)
forall a b.
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> (Scheduler RealWorld a -> IO b)
-> IO (Results a)
withSchedulerInternal Comp
comp Jobs IO a -> (WorkerId -> IO a) -> IO ()
forall (m :: * -> *) a.
MonadIO m =>
Jobs m a -> (WorkerId -> m a) -> m ()
scheduleJobs JQueue IO a -> IO [a]
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m [a]
readResults (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b)
-> (Scheduler RealWorld a -> m b) -> Scheduler RealWorld a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
.Scheduler RealWorld a -> m b
f)
{-# INLINE withSchedulerR #-}


-- | Same as `withScheduler`, but discards results of submitted jobs.
--
-- @since 1.0.0
withScheduler_ ::
     MonadUnliftIO m
  => Comp -- ^ Computation strategy
  -> (Scheduler RealWorld a -> m b)
     -- ^ Action that will be scheduling all the work.
  -> m ()
withScheduler_ :: Comp -> (Scheduler RealWorld a -> m b) -> m ()
withScheduler_ Comp
Seq Scheduler RealWorld a -> m b
f =
  ((forall a. m a -> IO a) -> IO ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
    IO (Results a) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Results a) -> IO ()) -> IO (Results a) -> IO ()
forall a b. (a -> b) -> a -> b
$ (Scheduler RealWorld a -> IO b) -> IO (Results a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
(Scheduler RealWorld a -> m b) -> m (Results a)
withTrivialSchedulerRIO (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b)
-> (Scheduler RealWorld a -> m b) -> Scheduler RealWorld a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Scheduler RealWorld a -> m b
f)
withScheduler_ Comp
comp Scheduler RealWorld a -> m b
f =
  ((forall a. m a -> IO a) -> IO ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
    IO (Results a) -> IO ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (IO (Results a) -> IO ()) -> IO (Results a) -> IO ()
forall a b. (a -> b) -> a -> b
$ Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> (Scheduler RealWorld a -> IO b)
-> IO (Results a)
forall a b.
Comp
-> (Jobs IO a -> (WorkerId -> IO a) -> IO ())
-> (JQueue IO a -> IO [a])
-> (Scheduler RealWorld a -> IO b)
-> IO (Results a)
withSchedulerInternal Comp
comp Jobs IO a -> (WorkerId -> IO a) -> IO ()
forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ (IO [a] -> JQueue IO a -> IO [a]
forall a b. a -> b -> a
const ([a] -> IO [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure [])) (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b)
-> (Scheduler RealWorld a -> m b) -> Scheduler RealWorld a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Scheduler RealWorld a -> m b
f)
{-# INLINE withScheduler_ #-}



-- | Check if the supplied batch has already finished.
--
-- @since 1.5.0
hasBatchFinished :: MonadPrim s m => Batch s a -> m Bool
hasBatchFinished :: Batch s a -> m Bool
hasBatchFinished = ST s Bool -> m Bool
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST s Bool -> m Bool)
-> (Batch s a -> ST s Bool) -> Batch s a -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Batch s a -> ST s Bool
forall s a. Batch s a -> ST s Bool
batchHasFinished
{-# INLINE hasBatchFinished #-}


-- | Cancel batch with supplied identifier, which will lead to scheduler to return
-- `FinishedEarly` result. This is an idempotent operation and has no affect if currently
-- running batch does not match supplied identifier. Returns `False` when cancelling did
-- not succeed due to mismatched identifier or does not return at all since all jobs get
-- cancelled immediately. For trivial schedulers however there is no way to perform
-- concurrent cancelation and it will return `True`.
--
-- @since 1.5.0
cancelBatch :: MonadPrim s m => Batch s a -> a -> m Bool
cancelBatch :: Batch s a -> a -> m Bool
cancelBatch Batch s a
b = ST s Bool -> m Bool
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST s Bool -> m Bool) -> (a -> ST s Bool) -> a -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Batch s a -> a -> ST s Bool
forall s a. Batch s a -> a -> ST s Bool
batchCancel Batch s a
b
{-# INLINE cancelBatch #-}

-- | Same as `cancelBatch`, but only works with schedulers that don't care about results
--
-- @since 1.5.0
cancelBatch_ :: MonadPrim s m => Batch s () -> m Bool
cancelBatch_ :: Batch s () -> m Bool
cancelBatch_ Batch s ()
b = ST (PrimState m) Bool -> m Bool
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) Bool -> m Bool)
-> ST (PrimState m) Bool -> m Bool
forall a b. (a -> b) -> a -> b
$ Batch s () -> () -> ST s Bool
forall s a. Batch s a -> a -> ST s Bool
batchCancel Batch s ()
b ()
{-# INLINE cancelBatch_ #-}

-- | Same as `cancelBatch_`, but the result of computation will be set to `FinishedEarlyWith`
--
-- @since 1.5.0
cancelBatchWith :: MonadPrim s m => Batch s a -> a -> m Bool
cancelBatchWith :: Batch s a -> a -> m Bool
cancelBatchWith Batch s a
b = ST s Bool -> m Bool
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST s Bool -> m Bool) -> (a -> ST s Bool) -> a -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Batch s a -> a -> ST s Bool
forall s a. Batch s a -> a -> ST s Bool
batchCancelWith Batch s a
b
{-# INLINE cancelBatchWith #-}


-- | This function gives a way to get access to the main batch that started implicitely.
--
-- @since 1.5.0
getCurrentBatch ::
     MonadPrim s m => Scheduler s a -> m (Batch s a)
getCurrentBatch :: Scheduler s a -> m (Batch s a)
getCurrentBatch Scheduler s a
scheduler = ST (PrimState m) (Batch s a) -> m (Batch s a)
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) (Batch s a) -> m (Batch s a))
-> ST (PrimState m) (Batch s a) -> m (Batch s a)
forall a b. (a -> b) -> a -> b
$ do
  BatchId
batchId <- Scheduler s a -> ST s BatchId
forall s a. Scheduler s a -> ST s BatchId
_currentBatchId Scheduler s a
scheduler
  Batch s a -> ST s (Batch s a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Batch s a -> ST s (Batch s a)) -> Batch s a -> ST s (Batch s a)
forall a b. (a -> b) -> a -> b
$ Batch :: forall s a.
(a -> ST s Bool) -> (a -> ST s Bool) -> ST s Bool -> Batch s a
Batch
    { batchCancel :: a -> ST s Bool
batchCancel = Scheduler s a -> BatchId -> Early a -> ST s Bool
forall s a. Scheduler s a -> BatchId -> Early a -> ST s Bool
_cancelBatch Scheduler s a
scheduler BatchId
batchId (Early a -> ST s Bool) -> (a -> Early a) -> a -> ST s Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Early a
forall a. a -> Early a
Early
    , batchCancelWith :: a -> ST s Bool
batchCancelWith = Scheduler s a -> BatchId -> Early a -> ST s Bool
forall s a. Scheduler s a -> BatchId -> Early a -> ST s Bool
_cancelBatch Scheduler s a
scheduler BatchId
batchId (Early a -> ST s Bool) -> (a -> Early a) -> a -> ST s Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Early a
forall a. a -> Early a
EarlyWith
    , batchHasFinished :: ST s Bool
batchHasFinished = (BatchId
batchId BatchId -> BatchId -> Bool
forall a. Eq a => a -> a -> Bool
/=) (BatchId -> Bool) -> ST s BatchId -> ST s Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Scheduler s a -> ST s BatchId
forall s a. Scheduler s a -> ST s BatchId
_currentBatchId Scheduler s a
scheduler
    }
{-# INLINE getCurrentBatch #-}


-- | Run a single batch of jobs. Supplied action will not return until all jobs placed on
-- the queue are done or the whole batch is cancelled with one of these `cancelBatch`,
-- `cancelBatch_` or `cancelBatchWith`.
--
-- It waits for all scheduled jobs to finish and collects the computed results into a
-- list. It is a blocking operation, but if there are no jobs in progress it will return
-- immediately. It is safe to continue using the supplied scheduler after this function
-- returns. However, if any of the jobs resulted in an exception it will be rethrown by this
-- function, which, unless caught, will further put the scheduler in a terminated state.
--
-- It is important to note that any job that hasn't had its results collected from the
-- scheduler prior to starting the batch it will end up on the batch result list.
--
-- @since 1.5.0
runBatch :: MonadPrimBase s m => Scheduler s a -> (Batch s a -> m c) -> m [a]
runBatch :: Scheduler s a -> (Batch s a -> m c) -> m [a]
runBatch Scheduler s a
scheduler Batch s a -> m c
f = ST (PrimState m) [a] -> m [a]
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) [a] -> m [a]) -> ST (PrimState m) [a] -> m [a]
forall a b. (a -> b) -> a -> b
$ do
  c
_ <- m c -> ST s c
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim (m c -> ST s c) -> (Batch s a -> m c) -> Batch s a -> ST s c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Batch s a -> m c
f (Batch s a -> ST s c) -> ST s (Batch s a) -> ST s c
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Scheduler s a -> ST s (Batch s a)
forall s (m :: * -> *) a.
MonadPrim s m =>
Scheduler s a -> m (Batch s a)
getCurrentBatch Scheduler s a
scheduler
  [a] -> [a]
forall a. [a] -> [a]
reverse ([a] -> [a]) -> (Results a -> [a]) -> Results a -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Results a -> [a]
forall a. Results a -> [a]
resultsToList (Results a -> [a]) -> ST s (Results a) -> ST s [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Scheduler s a -> ST s (Results a)
forall s a. Scheduler s a -> ST s (Results a)
_waitForCurrentBatch Scheduler s a
scheduler
{-# INLINE runBatch #-}

-- | Same as `runBatch`, except it ignores results of computation
--
-- @since 1.5.0
runBatch_ ::
     MonadPrimBase s m => Scheduler s () -> (Batch s () -> m c) -> m ()
runBatch_ :: Scheduler s () -> (Batch s () -> m c) -> m ()
runBatch_ Scheduler s ()
scheduler Batch s () -> m c
f = ST (PrimState m) () -> m ()
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) () -> m ()) -> ST (PrimState m) () -> m ()
forall a b. (a -> b) -> a -> b
$ do
  c
_ <- m c -> ST s c
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim (m c -> ST s c) -> (Batch s () -> m c) -> Batch s () -> ST s c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Batch s () -> m c
f (Batch s () -> ST s c) -> ST s (Batch s ()) -> ST s c
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Scheduler s () -> ST s (Batch s ())
forall s (m :: * -> *) a.
MonadPrim s m =>
Scheduler s a -> m (Batch s a)
getCurrentBatch Scheduler s ()
scheduler
  ST s (Results ()) -> ST s ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Scheduler s () -> ST s (Results ())
forall s a. Scheduler s a -> ST s (Results a)
_waitForCurrentBatch Scheduler s ()
scheduler)
{-# INLINE runBatch_ #-}


-- | Same as `runBatch`, except it produces `Results` instead of a list.
--
-- @since 1.5.0
runBatchR ::
     MonadPrimBase s m => Scheduler s a -> (Batch s a -> m c) -> m (Results a)
runBatchR :: Scheduler s a -> (Batch s a -> m c) -> m (Results a)
runBatchR Scheduler s a
scheduler Batch s a -> m c
f = ST (PrimState m) (Results a) -> m (Results a)
forall (m :: * -> *) a. PrimMonad m => ST (PrimState m) a -> m a
stToPrim (ST (PrimState m) (Results a) -> m (Results a))
-> ST (PrimState m) (Results a) -> m (Results a)
forall a b. (a -> b) -> a -> b
$ do
  c
_ <- m c -> ST s c
forall (m1 :: * -> *) (m2 :: * -> *) a.
(PrimBase m1, PrimMonad m2, PrimState m1 ~ PrimState m2) =>
m1 a -> m2 a
primToPrim (m c -> ST s c) -> (Batch s a -> m c) -> Batch s a -> ST s c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Batch s a -> m c
f (Batch s a -> ST s c) -> ST s (Batch s a) -> ST s c
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Scheduler s a -> ST s (Batch s a)
forall s (m :: * -> *) a.
MonadPrim s m =>
Scheduler s a -> m (Batch s a)
getCurrentBatch Scheduler s a
scheduler
  Results a -> Results a
forall a. Results a -> Results a
reverseResults (Results a -> Results a) -> ST s (Results a) -> ST s (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Scheduler s a -> ST s (Results a)
forall s a. Scheduler s a -> ST s (Results a)
_waitForCurrentBatch Scheduler s a
scheduler
{-# INLINE runBatchR #-}

{- $setup

>>> import Control.Exception
>>> import Control.Concurrent
>>> import Control.Concurrent.MVar

-}


{- $exceptions

If any one of the workers dies with an exception, even if that exceptions is asynchronous, it will be
re-thrown in the scheduling thread.


>>> let didAWorkerDie = handleJust asyncExceptionFromException (return . (== ThreadKilled)) . fmap or
>>> :t didAWorkerDie
didAWorkerDie :: Foldable t => IO (t Bool) -> IO Bool
>>> didAWorkerDie $ withScheduler Par $ \ s -> scheduleWork s $ pure False
False
>>> didAWorkerDie $ withScheduler Par $ \ s -> scheduleWork s $ myThreadId >>= killThread >> pure False
True
>>> withScheduler Par $ \ s -> scheduleWork s $ myThreadId >>= killThread >> pure False
*** Exception: thread killed

-}