{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}
-- |
-- Module      : Control.Scheduler
-- Copyright   : (c) Alexey Kuleshevich 2018-2020
-- License     : BSD3
-- Maintainer  : Alexey Kuleshevich <lehins@yandex.ru>
-- Stability   : experimental
-- Portability : non-portable
--
module Control.Scheduler
  ( -- * Scheduler
    Scheduler
  , SchedulerWS
  , Results(..)
    -- ** Regular
  , withScheduler
  , withScheduler_
  , withSchedulerR
    -- ** Stateful workers
  , withSchedulerWS
  , withSchedulerWS_
  , withSchedulerWSR
  , unwrapSchedulerWS
    -- ** Trivial (no parallelism)
  , trivialScheduler_
  , withTrivialScheduler
  , withTrivialSchedulerR
  -- * Scheduling computation
  , scheduleWork
  , scheduleWork_
  , scheduleWorkId
  , scheduleWorkId_
  , scheduleWorkState
  , scheduleWorkState_
  , replicateWork
  -- * Batches
  , Batch
  , runBatch
  , runBatch_
  , runBatchR
  , cancelBatch
  , cancelBatch_
  , cancelBatchWith
  , hasBatchFinished
  , getCurrentBatch
  -- * Early termination
  , terminate
  , terminate_
  , terminateWith
  -- * Workers
  , WorkerId(..)
  , WorkerStates
  , numWorkers
  , workerStatesComp
  , initWorkerStates
  -- * Computation strategies
  , Comp(..)
  , getCompWorkers
  -- * Useful functions
  , replicateConcurrently
  , replicateConcurrently_
  , traverseConcurrently
  , traverseConcurrently_
  , traverse_
  -- * Exceptions
  -- $exceptions
  , MutexException(..)
  ) where

import Control.Monad
import Control.Monad.IO.Unlift
import Control.Monad.Primitive (PrimMonad)
import Control.Scheduler.Computation
import Control.Scheduler.Internal
import Control.Scheduler.Types
import Control.Scheduler.Queue
import qualified Data.Foldable as F (traverse_, toList)
import Data.Primitive.SmallArray
import Data.Traversable


-- | Get the underlying `Scheduler`, which cannot access `WorkerStates`.
--
-- @since 1.4.0
unwrapSchedulerWS :: SchedulerWS s m a -> Scheduler m a
unwrapSchedulerWS :: SchedulerWS s m a -> Scheduler m a
unwrapSchedulerWS = SchedulerWS s m a -> Scheduler m a
forall s (m :: * -> *) a. SchedulerWS s m a -> Scheduler m a
_getScheduler


-- | Get the computation strategy the states where initialized with.
--
-- @since 1.4.0
workerStatesComp :: WorkerStates s -> Comp
workerStatesComp :: WorkerStates s -> Comp
workerStatesComp = WorkerStates s -> Comp
forall s. WorkerStates s -> Comp
_workerStatesComp


-- | Run a scheduler with stateful workers. Throws `MutexException` if an attempt is made
-- to concurrently use the same `WorkerStates` with another `SchedulerWS`.
--
-- ==== __Examples__
--
-- A good example of using stateful workers would be generation of random number in
-- parallel. A lof of times random number generators are not gonna be thread safe, so we
-- can work around this problem, by using a separate stateful generator for each of the
-- workers.
--
-- >>> import Control.Monad as M ((>=>), replicateM)
-- >>> import Control.Concurrent (yield, threadDelay)
-- >>> import Data.List (sort)
-- >>> -- ^ Above imports are used to make sure output is deterministic, which is needed for doctest
-- >>> import System.Random.MWC as MWC
-- >>> import Data.Vector.Unboxed as V (singleton)
-- >>> states <- initWorkerStates (ParN 4) (MWC.initialize . V.singleton . fromIntegral . getWorkerId)
-- >>> let scheduleGen scheduler = scheduleWorkState scheduler (MWC.uniform >=> \r -> yield >> threadDelay 200000 >> pure r)
-- >>> sort <$> withSchedulerWS states (M.replicateM 4 . scheduleGen) :: IO [Double]
-- [0.21734983682025255,0.5000843862105709,0.5759825622603018,0.8587171114177893]
-- >>> sort <$> withSchedulerWS states (M.replicateM 4 . scheduleGen) :: IO [Double]
-- [2.3598617298033475e-2,9.949679290089553e-2,0.38223134248645885,0.7408640677124702]
--
-- In the above example we use four different random number generators from
-- [`mwc-random`](https://www.stackage.org/package/mwc-random) in order to generate 4
-- numbers, all in separate threads. The subsequent call to the `withSchedulerWS` function
-- with the same @states@ is allowed to reuse the same generators, thus avoiding expensive
-- initialization.
--
-- /Side note/ - The example presented was crafted with slight trickery in order to
-- guarantee that the output is deterministic, so if you run instructions exactly the same
-- way in GHCI you will get the exact same output. Non-determinism comes from thread
-- scheduling, rather than from random number generator, because we use exactly the same
-- seed for each worker, but workers run concurrently. Exact output is not really needed,
-- except for the doctests to pass.
--
-- @since 1.4.0
withSchedulerWS :: MonadUnliftIO m => WorkerStates s -> (SchedulerWS s m a -> m b) -> m [a]
withSchedulerWS :: WorkerStates s -> (SchedulerWS s m a -> m b) -> m [a]
withSchedulerWS = (Comp -> (Scheduler m a -> m b) -> m [a])
-> WorkerStates s -> (SchedulerWS s m a -> m b) -> m [a]
forall (m :: * -> *) a t b s.
MonadUnliftIO m =>
(Comp -> (Scheduler m a -> t) -> m b)
-> WorkerStates s -> (SchedulerWS s m a -> t) -> m b
withSchedulerWSInternal Comp -> (Scheduler m a -> m b) -> m [a]
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler m a -> m b) -> m [a]
withScheduler

-- | Run a scheduler with stateful workers, while discarding computation results.
--
-- @since 1.4.0
withSchedulerWS_ :: MonadUnliftIO m => WorkerStates s -> (SchedulerWS s m () -> m b) -> m ()
withSchedulerWS_ :: WorkerStates s -> (SchedulerWS s m () -> m b) -> m ()
withSchedulerWS_ = (Comp -> (Scheduler m () -> m b) -> m ())
-> WorkerStates s -> (SchedulerWS s m () -> m b) -> m ()
forall (m :: * -> *) a t b s.
MonadUnliftIO m =>
(Comp -> (Scheduler m a -> t) -> m b)
-> WorkerStates s -> (SchedulerWS s m a -> t) -> m b
withSchedulerWSInternal Comp -> (Scheduler m () -> m b) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler m a -> m b) -> m ()
withScheduler_

-- | Same as `withSchedulerWS`, except instead of a list it produces `Results`, which
-- allows for distinguishing between the ways computation was terminated.
--
-- @since 1.4.2
withSchedulerWSR :: MonadUnliftIO m => WorkerStates s -> (SchedulerWS s m a -> m b) -> m (Results a)
withSchedulerWSR :: WorkerStates s -> (SchedulerWS s m a -> m b) -> m (Results a)
withSchedulerWSR = (Comp -> (Scheduler m a -> m b) -> m (Results a))
-> WorkerStates s -> (SchedulerWS s m a -> m b) -> m (Results a)
forall (m :: * -> *) a t b s.
MonadUnliftIO m =>
(Comp -> (Scheduler m a -> t) -> m b)
-> WorkerStates s -> (SchedulerWS s m a -> t) -> m b
withSchedulerWSInternal Comp -> (Scheduler m a -> m b) -> m (Results a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler m a -> m b) -> m (Results a)
withSchedulerR


-- | Schedule a job that will get a worker state passed as an argument
--
-- @since 1.4.0
scheduleWorkState :: SchedulerWS s m a -> (s -> m a) -> m ()
scheduleWorkState :: SchedulerWS s m a -> (s -> m a) -> m ()
scheduleWorkState SchedulerWS s m a
schedulerS s -> m a
withState =
  Scheduler m a -> (WorkerId -> m a) -> m ()
forall (m :: * -> *) a. Scheduler m a -> (WorkerId -> m a) -> m ()
scheduleWorkId (SchedulerWS s m a -> Scheduler m a
forall s (m :: * -> *) a. SchedulerWS s m a -> Scheduler m a
_getScheduler SchedulerWS s m a
schedulerS) ((WorkerId -> m a) -> m ()) -> (WorkerId -> m a) -> m ()
forall a b. (a -> b) -> a -> b
$ \(WorkerId Int
i) ->
    s -> m a
withState (SmallArray s -> Int -> s
forall a. SmallArray a -> Int -> a
indexSmallArray (WorkerStates s -> SmallArray s
forall s. WorkerStates s -> SmallArray s
_workerStatesArray (SchedulerWS s m a -> WorkerStates s
forall s (m :: * -> *) a. SchedulerWS s m a -> WorkerStates s
_workerStates SchedulerWS s m a
schedulerS)) Int
i)

-- | Same as `scheduleWorkState`, but dont' keep the result of computation.
--
-- @since 1.4.0
scheduleWorkState_ :: SchedulerWS s m () -> (s -> m ()) -> m ()
scheduleWorkState_ :: SchedulerWS s m () -> (s -> m ()) -> m ()
scheduleWorkState_ SchedulerWS s m ()
schedulerS s -> m ()
withState =
  Scheduler m () -> (WorkerId -> m ()) -> m ()
forall (m :: * -> *). Scheduler m () -> (WorkerId -> m ()) -> m ()
scheduleWorkId_ (SchedulerWS s m () -> Scheduler m ()
forall s (m :: * -> *) a. SchedulerWS s m a -> Scheduler m a
_getScheduler SchedulerWS s m ()
schedulerS) ((WorkerId -> m ()) -> m ()) -> (WorkerId -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \(WorkerId Int
i) ->
    s -> m ()
withState (SmallArray s -> Int -> s
forall a. SmallArray a -> Int -> a
indexSmallArray (WorkerStates s -> SmallArray s
forall s. WorkerStates s -> SmallArray s
_workerStatesArray (SchedulerWS s m () -> WorkerStates s
forall s (m :: * -> *) a. SchedulerWS s m a -> WorkerStates s
_workerStates SchedulerWS s m ()
schedulerS)) Int
i)


-- | Get the number of workers. Will mainly depend on the computation strategy and/or number of
-- capabilities you have. Related function is `getCompWorkers`.
--
-- @since 1.0.0
numWorkers :: Scheduler m a -> Int
numWorkers :: Scheduler m a -> Int
numWorkers = Scheduler m a -> Int
forall (m :: * -> *) a. Scheduler m a -> Int
_numWorkers


-- | Schedule an action to be picked up and computed by a worker from a pool of
-- jobs. Argument supplied to the job will be the id of the worker doing the job. This is
-- useful for identification of a thread that will be doing the work, since there is
-- one-to-one mapping from `Control.Concurrent.ThreadId` to `WorkerId` for a particular
-- scheduler.
--
-- @since 1.2.0
scheduleWorkId :: Scheduler m a -> (WorkerId -> m a) -> m ()
scheduleWorkId :: Scheduler m a -> (WorkerId -> m a) -> m ()
scheduleWorkId =Scheduler m a -> (WorkerId -> m a) -> m ()
forall (m :: * -> *) a. Scheduler m a -> (WorkerId -> m a) -> m ()
_scheduleWorkId

-- | As soon as possible try to terminate any computation that is being performed by all
-- workers managed by this scheduler and collect whatever results have been computed, with
-- supplied element guaranteed to being the last one. In case when `Results` type is
-- returned this function will cause the scheduler to produce `FinishedEarly`
--
-- /Important/ - With `Seq` strategy this will not stop other scheduled tasks from being computed,
-- although it will make sure their results are discarded.
--
-- @since 1.1.0
terminate :: Scheduler m a -> a -> m a
terminate :: Scheduler m a -> a -> m a
terminate Scheduler m a
scheduler a
a = Scheduler m a -> Early a -> m a
forall (m :: * -> *) a. Scheduler m a -> Early a -> m a
_terminate Scheduler m a
scheduler (a -> Early a
forall a. a -> Early a
Early a
a)

-- | Same as `terminate`, but returning a single element list containing the supplied
-- argument. This can be very useful for parallel search algorithms. In case when
-- `Results` is the return type this function will cause the scheduler to produce
-- `FinishedEarlyWith`
--
-- /Important/ - Same as with `terminate`, when `Seq` strategy is used, this will not prevent
-- computation from continuing, but the scheduler will return only the result supplied to this
-- function.
--
-- @since 1.1.0
terminateWith :: Scheduler m a -> a -> m a
terminateWith :: Scheduler m a -> a -> m a
terminateWith Scheduler m a
scheduler a
a = Scheduler m a -> Early a -> m a
forall (m :: * -> *) a. Scheduler m a -> Early a -> m a
_terminate Scheduler m a
scheduler (a -> Early a
forall a. a -> Early a
EarlyWith a
a)

-- | Schedule an action to be picked up and computed by a worker from a pool of
-- jobs. Similar to `scheduleWorkId`, except the job doesn't get the worker id.
--
-- @since 1.0.0
scheduleWork :: Scheduler m a -> m a -> m ()
scheduleWork :: Scheduler m a -> m a -> m ()
scheduleWork Scheduler m a
scheduler m a
f = Scheduler m a -> (WorkerId -> m a) -> m ()
forall (m :: * -> *) a. Scheduler m a -> (WorkerId -> m a) -> m ()
_scheduleWorkId Scheduler m a
scheduler (m a -> WorkerId -> m a
forall a b. a -> b -> a
const m a
f)


-- FIXME: get rid of scheduleJob and decide at `scheduleWork` level if we should use Job or Job_
-- Type here should be `scheduleWork_ :: Scheduler m a -> m () -> m ()
-- | Same as `scheduleWork`, but only for a `Scheduler` that doesn't keep the results.
--
-- @since 1.1.0
scheduleWork_ :: Scheduler m () -> m () -> m ()
scheduleWork_ :: Scheduler m () -> m () -> m ()
scheduleWork_ = Scheduler m () -> m () -> m ()
forall (m :: * -> *) a. Scheduler m a -> m a -> m ()
scheduleWork

-- | Same as `scheduleWorkId`, but only for a `Scheduler` that doesn't keep the results.
--
-- @since 1.2.0
scheduleWorkId_ :: Scheduler m () -> (WorkerId -> m ()) -> m ()
scheduleWorkId_ :: Scheduler m () -> (WorkerId -> m ()) -> m ()
scheduleWorkId_ = Scheduler m () -> (WorkerId -> m ()) -> m ()
forall (m :: * -> *) a. Scheduler m a -> (WorkerId -> m a) -> m ()
_scheduleWorkId

-- | Schedule the same action to run @n@ times concurrently. This differs from
-- `replicateConcurrently` by allowing the caller to use the `Scheduler` freely,
-- or to allow early termination via `terminate` across all (identical) threads.
-- To be called within a `withScheduler` block.
--
-- @since 1.4.1
replicateWork :: Applicative m => Int -> Scheduler m a -> m a -> m ()
replicateWork :: Int -> Scheduler m a -> m a -> m ()
replicateWork !Int
n Scheduler m a
scheduler m a
f = Int -> m ()
forall t. (Ord t, Num t) => t -> m ()
go Int
n
  where
    go :: t -> m ()
go !t
k
      | t
k t -> t -> Bool
forall a. Ord a => a -> a -> Bool
<= t
0 = () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
      | Bool
otherwise = Scheduler m a -> m a -> m ()
forall (m :: * -> *) a. Scheduler m a -> m a -> m ()
scheduleWork Scheduler m a
scheduler m a
f m () -> m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b -> f b
*> t -> m ()
go (t
k t -> t -> t
forall a. Num a => a -> a -> a
- t
1)

-- | Similar to `terminate`, but for a `Scheduler` that does not keep any results of computation.
--
-- /Important/ - In case of `Seq` computation strategy this function has no affect.
--
-- @since 1.1.0
terminate_ :: Scheduler m () -> m ()
terminate_ :: Scheduler m () -> m ()
terminate_ = (Scheduler m () -> Early () -> m ()
forall (m :: * -> *) a. Scheduler m a -> Early a -> m a
`_terminate` () -> Early ()
forall a. a -> Early a
Early ())


-- | This trivial scheduler will behave in the same way as `withScheduler` with `Seq`
-- computation strategy, except it is restricted to `PrimMonad`, instead of `MonadUnliftIO`.
--
-- @since 1.4.2
withTrivialScheduler :: PrimMonad m => (Scheduler m a -> m b) -> m [a]
withTrivialScheduler :: (Scheduler m a -> m b) -> m [a]
withTrivialScheduler Scheduler m a -> m b
action = Results a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
F.toList (Results a -> [a]) -> m (Results a) -> m [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (Scheduler m a -> m b) -> m (Results a)
forall (m :: * -> *) a b.
PrimMonad m =>
(Scheduler m a -> m b) -> m (Results a)
withTrivialSchedulerR Scheduler m a -> m b
action



-- | Map an action over each element of the `Traversable` @t@ acccording to the supplied computation
-- strategy.
--
-- @since 1.0.0
traverseConcurrently :: (MonadUnliftIO m, Traversable t) => Comp -> (a -> m b) -> t a -> m (t b)
traverseConcurrently :: Comp -> (a -> m b) -> t a -> m (t b)
traverseConcurrently Comp
comp a -> m b
f t a
xs = do
  [b]
ys <- Comp -> (Scheduler m b -> m ()) -> m [b]
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler m a -> m b) -> m [a]
withScheduler Comp
comp ((Scheduler m b -> m ()) -> m [b])
-> (Scheduler m b -> m ()) -> m [b]
forall a b. (a -> b) -> a -> b
$ \Scheduler m b
s -> (a -> m ()) -> t a -> m ()
forall (f :: * -> *) (t :: * -> *) a.
(Applicative f, Foldable t) =>
(a -> f ()) -> t a -> f ()
traverse_ (Scheduler m b -> m b -> m ()
forall (m :: * -> *) a. Scheduler m a -> m a -> m ()
scheduleWork Scheduler m b
s (m b -> m ()) -> (a -> m b) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f) t a
xs
  t b -> m (t b)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (t b -> m (t b)) -> t b -> m (t b)
forall a b. (a -> b) -> a -> b
$ [b] -> t a -> t b
forall (t :: * -> *) a b. Traversable t => [a] -> t b -> t a
transList [b]
ys t a
xs

transList :: Traversable t => [a] -> t b -> t a
transList :: [a] -> t b -> t a
transList [a]
xs' = ([a], t a) -> t a
forall a b. (a, b) -> b
snd (([a], t a) -> t a) -> (t b -> ([a], t a)) -> t b -> t a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ([a] -> b -> ([a], a)) -> [a] -> t b -> ([a], t a)
forall (t :: * -> *) a b c.
Traversable t =>
(a -> b -> (a, c)) -> a -> t b -> (a, t c)
mapAccumL [a] -> b -> ([a], a)
forall b p. [b] -> p -> ([b], b)
withR [a]
xs'
  where
    withR :: [b] -> p -> ([b], b)
withR (b
x:[b]
xs) p
_ = ([b]
xs, b
x)
    withR [b]
_      p
_ = [Char] -> ([b], b)
forall a. [Char] -> a
errorWithoutStackTrace [Char]
"Impossible<traverseConcurrently> - Mismatched sizes"

-- | Just like `traverseConcurrently`, but restricted to `Foldable` and discards the results of
-- computation.
--
-- @since 1.0.0
traverseConcurrently_ :: (MonadUnliftIO m, Foldable t) => Comp -> (a -> m b) -> t a -> m ()
traverseConcurrently_ :: Comp -> (a -> m b) -> t a -> m ()
traverseConcurrently_ Comp
comp a -> m b
f t a
xs =
  Comp -> (Scheduler m () -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler m a -> m b) -> m ()
withScheduler_ Comp
comp ((Scheduler m () -> m ()) -> m ())
-> (Scheduler m () -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Scheduler m ()
s -> Scheduler m () -> m () -> m ()
forall (m :: * -> *) a. Scheduler m a -> m a -> m ()
scheduleWork Scheduler m ()
s (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ (a -> m ()) -> t a -> m ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
F.traverse_ (Scheduler m () -> m () -> m ()
forall (m :: * -> *) a. Scheduler m a -> m a -> m ()
scheduleWork Scheduler m ()
s (m () -> m ()) -> (a -> m ()) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m b -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m b -> m ()) -> (a -> m b) -> a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f) t a
xs

-- | Replicate an action @n@ times and schedule them acccording to the supplied computation
-- strategy.
--
-- @since 1.1.0
replicateConcurrently :: MonadUnliftIO m => Comp -> Int -> m a -> m [a]
replicateConcurrently :: Comp -> Int -> m a -> m [a]
replicateConcurrently Comp
comp Int
n m a
f =
  Comp -> (Scheduler m a -> m ()) -> m [a]
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler m a -> m b) -> m [a]
withScheduler Comp
comp ((Scheduler m a -> m ()) -> m [a])
-> (Scheduler m a -> m ()) -> m [a]
forall a b. (a -> b) -> a -> b
$ \Scheduler m a
s -> Int -> m () -> m ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Scheduler m a -> m a -> m ()
forall (m :: * -> *) a. Scheduler m a -> m a -> m ()
scheduleWork Scheduler m a
s m a
f

-- | Just like `replicateConcurrently`, but discards the results of computation.
--
-- @since 1.1.0
replicateConcurrently_ :: MonadUnliftIO m => Comp -> Int -> m a -> m ()
replicateConcurrently_ :: Comp -> Int -> m a -> m ()
replicateConcurrently_ Comp
comp Int
n m a
f =
  Comp -> (Scheduler m () -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp -> (Scheduler m a -> m b) -> m ()
withScheduler_ Comp
comp ((Scheduler m () -> m ()) -> m ())
-> (Scheduler m () -> m ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \Scheduler m ()
s -> Scheduler m () -> m () -> m ()
forall (m :: * -> *) a. Scheduler m a -> m a -> m ()
scheduleWork Scheduler m ()
s (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Int -> m () -> m ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
n (Scheduler m () -> m () -> m ()
forall (m :: * -> *) a. Scheduler m a -> m a -> m ()
scheduleWork Scheduler m ()
s (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ m a -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void m a
f)




-- | Initialize a scheduler and submit jobs that will be computed sequentially or in parallelel,
-- which is determined by the `Comp`utation strategy.
--
-- Here are some cool properties about the `withScheduler`:
--
-- * This function will block until all of the submitted jobs have finished or at least one of them
--   resulted in an exception, which will be re-thrown at the callsite.
--
-- * It is totally fine for nested jobs to submit more jobs for the same or other scheduler
--
-- * It is ok to initialize multiple schedulers at the same time, although that will likely result
--   in suboptimal performance, unless workers are pinned to different capabilities.
--
-- * __Warning__ It is pretty dangerous to schedule jobs that can block, because it might
--   lead to a potential deadlock, if you are not careful. Consider this example. First
--   execution works fine, since there are two scheduled workers, and one can unblock the
--   other, but the second scenario immediately results in a deadlock.
--
-- >>> withScheduler (ParOn [1,2]) $ \s -> newEmptyMVar >>= (\ mv -> scheduleWork s (readMVar mv) >> scheduleWork s (putMVar mv ()))
-- [(),()]
-- >>> import System.Timeout
-- >>> timeout 1000000 $ withScheduler (ParOn [1]) $ \s -> newEmptyMVar >>= (\ mv -> scheduleWork s (readMVar mv) >> scheduleWork s (putMVar mv ()))
-- Nothing
--
-- __Important__: In order to get work done truly in parallel, program needs to be compiled with
-- @-threaded@ GHC flag and executed with @+RTS -N -RTS@ to use all available cores.
--
-- @since 1.0.0
withScheduler ::
     MonadUnliftIO m
  => Comp -- ^ Computation strategy
  -> (Scheduler m a -> m b)
     -- ^ Action that will be scheduling all the work.
  -> m [a]
withScheduler :: Comp -> (Scheduler m a -> m b) -> m [a]
withScheduler Comp
Seq = (Results a -> [a]) -> m (Results a) -> m [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([a] -> [a]
forall a. [a] -> [a]
reverse ([a] -> [a]) -> (Results a -> [a]) -> Results a -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Results a -> [a]
forall a. Results a -> [a]
resultsToList) (m (Results a) -> m [a])
-> ((Scheduler m a -> m b) -> m (Results a))
-> (Scheduler m a -> m b)
-> m [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Scheduler m a -> m b) -> m (Results a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
(Scheduler m a -> m b) -> m (Results a)
withTrivialSchedulerRIO
withScheduler Comp
comp =
  (Results a -> [a]) -> m (Results a) -> m [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap ([a] -> [a]
forall a. [a] -> [a]
reverse ([a] -> [a]) -> (Results a -> [a]) -> Results a -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Results a -> [a]
forall a. Results a -> [a]
resultsToList) (m (Results a) -> m [a])
-> ((Scheduler m a -> m b) -> m (Results a))
-> (Scheduler m a -> m b)
-> m [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> (Scheduler m a -> m b)
-> m (Results a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> (Scheduler m a -> m b)
-> m (Results a)
withSchedulerInternal Comp
comp Jobs m a -> (WorkerId -> m a) -> m ()
forall (m :: * -> *) a.
MonadIO m =>
Jobs m a -> (WorkerId -> m a) -> m ()
scheduleJobs JQueue m a -> m [a]
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m [a]
readResults
{-# INLINE withScheduler #-}

-- | Same as `withScheduler`, except instead of a list it produces `Results`, which allows
-- for distinguishing between the ways computation was terminated.
--
-- @since 1.4.2
withSchedulerR ::
     MonadUnliftIO m
  => Comp -- ^ Computation strategy
  -> (Scheduler m a -> m b)
     -- ^ Action that will be scheduling all the work.
  -> m (Results a)
withSchedulerR :: Comp -> (Scheduler m a -> m b) -> m (Results a)
withSchedulerR Comp
Seq = (Results a -> Results a) -> m (Results a) -> m (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Results a -> Results a
forall a. Results a -> Results a
reverseResults (m (Results a) -> m (Results a))
-> ((Scheduler m a -> m b) -> m (Results a))
-> (Scheduler m a -> m b)
-> m (Results a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Scheduler m a -> m b) -> m (Results a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
(Scheduler m a -> m b) -> m (Results a)
withTrivialSchedulerRIO
withSchedulerR Comp
comp = (Results a -> Results a) -> m (Results a) -> m (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Results a -> Results a
forall a. Results a -> Results a
reverseResults (m (Results a) -> m (Results a))
-> ((Scheduler m a -> m b) -> m (Results a))
-> (Scheduler m a -> m b)
-> m (Results a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> (Scheduler m a -> m b)
-> m (Results a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> (Scheduler m a -> m b)
-> m (Results a)
withSchedulerInternal Comp
comp Jobs m a -> (WorkerId -> m a) -> m ()
forall (m :: * -> *) a.
MonadIO m =>
Jobs m a -> (WorkerId -> m a) -> m ()
scheduleJobs JQueue m a -> m [a]
forall (m :: * -> *) a. MonadIO m => JQueue m a -> m [a]
readResults
{-# INLINE withSchedulerR #-}


-- | Same as `withScheduler`, but discards results of submitted jobs.
--
-- @since 1.0.0
withScheduler_ ::
     MonadUnliftIO m
  => Comp -- ^ Computation strategy
  -> (Scheduler m a -> m b)
     -- ^ Action that will be scheduling all the work.
  -> m ()
withScheduler_ :: Comp -> (Scheduler m a -> m b) -> m ()
withScheduler_ Comp
Seq = m (Results a) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Results a) -> m ())
-> ((Scheduler m a -> m b) -> m (Results a))
-> (Scheduler m a -> m b)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Scheduler m a -> m b) -> m (Results a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
(Scheduler m a -> m b) -> m (Results a)
withTrivialSchedulerRIO
withScheduler_ Comp
comp = m (Results a) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (m (Results a) -> m ())
-> ((Scheduler m a -> m b) -> m (Results a))
-> (Scheduler m a -> m b)
-> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> (Scheduler m a -> m b)
-> m (Results a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
Comp
-> (Jobs m a -> (WorkerId -> m a) -> m ())
-> (JQueue m a -> m [a])
-> (Scheduler m a -> m b)
-> m (Results a)
withSchedulerInternal Comp
comp Jobs m a -> (WorkerId -> m a) -> m ()
forall (m :: * -> *) a b.
MonadIO m =>
Jobs m a -> (WorkerId -> m b) -> m ()
scheduleJobs_ (m [a] -> JQueue m a -> m [a]
forall a b. a -> b -> a
const ([a] -> m [a]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []))
{-# INLINE withScheduler_ #-}



-- | Check if the supplied batch has already finished.
--
-- @since 1.5.0
hasBatchFinished :: Functor m => Batch m a -> m Bool
hasBatchFinished :: Batch m a -> m Bool
hasBatchFinished = Batch m a -> m Bool
forall (m :: * -> *) a. Batch m a -> m Bool
batchHasFinished
{-# INLINE hasBatchFinished #-}


-- | Cancel batch with supplied identifier, which will lead to scheduler to return
-- `FinishedEarly` result. This is an idempotent operation and has no affect if currently
-- running batch does not match supplied identifier. Returns `False` when cancelling did
-- not succeed due to mismatched identifier or does not return at all since all jobs get
-- cancelled immediately. For trivial schedulers however there is no way to perform
-- concurrent cancelation and it will return `True`.
--
-- @since 1.5.0
cancelBatch :: Batch m a -> a -> m Bool
cancelBatch :: Batch m a -> a -> m Bool
cancelBatch = Batch m a -> a -> m Bool
forall (m :: * -> *) a. Batch m a -> a -> m Bool
batchCancel
{-# INLINE cancelBatch #-}

-- | Same as `cancelBatch`, but only works with schedulers that don't care about results
--
-- @since 1.5.0
cancelBatch_ :: Batch m () -> m Bool
cancelBatch_ :: Batch m () -> m Bool
cancelBatch_ Batch m ()
b = Batch m () -> () -> m Bool
forall (m :: * -> *) a. Batch m a -> a -> m Bool
batchCancel Batch m ()
b ()
{-# INLINE cancelBatch_ #-}

-- | Same as `cancelBatch_`, but the result of computation will be set to `FinishedEarlyWith`
--
-- @since 1.5.0
cancelBatchWith :: Batch m a -> a -> m Bool
cancelBatchWith :: Batch m a -> a -> m Bool
cancelBatchWith = Batch m a -> a -> m Bool
forall (m :: * -> *) a. Batch m a -> a -> m Bool
batchCancelWith
{-# INLINE cancelBatchWith #-}


-- | This function gives a way to get access to the main batch that started implicitely.
--
-- @since 1.5.0
getCurrentBatch ::
     Monad m => Scheduler m a -> m (Batch m a)
getCurrentBatch :: Scheduler m a -> m (Batch m a)
getCurrentBatch Scheduler m a
scheduler = do
  BatchId
batchId <- Scheduler m a -> m BatchId
forall (m :: * -> *) a. Scheduler m a -> m BatchId
_currentBatchId Scheduler m a
scheduler
  Batch m a -> m (Batch m a)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Batch m a -> m (Batch m a)) -> Batch m a -> m (Batch m a)
forall a b. (a -> b) -> a -> b
$ Batch :: forall (m :: * -> *) a.
(a -> m Bool) -> (a -> m Bool) -> m Bool -> Batch m a
Batch
    { batchCancel :: a -> m Bool
batchCancel = Scheduler m a -> BatchId -> Early a -> m Bool
forall (m :: * -> *) a.
Scheduler m a -> BatchId -> Early a -> m Bool
_cancelBatch Scheduler m a
scheduler BatchId
batchId (Early a -> m Bool) -> (a -> Early a) -> a -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Early a
forall a. a -> Early a
Early
    , batchCancelWith :: a -> m Bool
batchCancelWith = Scheduler m a -> BatchId -> Early a -> m Bool
forall (m :: * -> *) a.
Scheduler m a -> BatchId -> Early a -> m Bool
_cancelBatch Scheduler m a
scheduler BatchId
batchId (Early a -> m Bool) -> (a -> Early a) -> a -> m Bool
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Early a
forall a. a -> Early a
EarlyWith
    , batchHasFinished :: m Bool
batchHasFinished = (BatchId
batchId BatchId -> BatchId -> Bool
forall a. Eq a => a -> a -> Bool
/=) (BatchId -> Bool) -> m BatchId -> m Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Scheduler m a -> m BatchId
forall (m :: * -> *) a. Scheduler m a -> m BatchId
_currentBatchId Scheduler m a
scheduler
    }
{-# INLINE getCurrentBatch #-}


-- | Run a single batch of jobs. Supplied action will not return until all jobs placed on
-- the queue are done or the whole batch is cancelled with one of these `cancelBatch`,
-- `cancelBatch_` or `cancelBatchWith`.
--
-- It waits for all scheduled jobs to finish and collects the computed results into a
-- list. It is a blocking operation, but if there are no jobs in progress it will return
-- immediately. It is safe to continue using the supplied scheduler after this function
-- returns. However, if any of the jobs resulted in an exception it will be rethrown by this
-- function, which, unless caught, will further put the scheduler in a terminated state.
--
-- It is important to note that any job that hasn't had its results collected from the
-- scheduler prior to starting the batch it will end up on the batch result list.
--
-- @since 1.5.0
runBatch ::
     Monad m => Scheduler m a -> (Batch m a -> m c) -> m [a]
runBatch :: Scheduler m a -> (Batch m a -> m c) -> m [a]
runBatch Scheduler m a
scheduler Batch m a -> m c
f = do
  c
_ <- Batch m a -> m c
f (Batch m a -> m c) -> m (Batch m a) -> m c
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Scheduler m a -> m (Batch m a)
forall (m :: * -> *) a. Monad m => Scheduler m a -> m (Batch m a)
getCurrentBatch Scheduler m a
scheduler
  [a] -> [a]
forall a. [a] -> [a]
reverse ([a] -> [a]) -> (Results a -> [a]) -> Results a -> [a]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Results a -> [a]
forall a. Results a -> [a]
resultsToList (Results a -> [a]) -> m (Results a) -> m [a]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Scheduler m a -> m (Results a)
forall (m :: * -> *) a. Scheduler m a -> m (Results a)
_waitForCurrentBatch Scheduler m a
scheduler
{-# INLINE runBatch #-}

-- | Same as `runBatch`, except it ignores results of computation
--
-- @since 1.5.0
runBatch_ ::
     Monad m => Scheduler m () -> (Batch m () -> m c) -> m ()
runBatch_ :: Scheduler m () -> (Batch m () -> m c) -> m ()
runBatch_ Scheduler m ()
scheduler Batch m () -> m c
f = do
  c
_ <- Batch m () -> m c
f (Batch m () -> m c) -> m (Batch m ()) -> m c
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Scheduler m () -> m (Batch m ())
forall (m :: * -> *) a. Monad m => Scheduler m a -> m (Batch m a)
getCurrentBatch Scheduler m ()
scheduler
  m (Results ()) -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Scheduler m () -> m (Results ())
forall (m :: * -> *) a. Scheduler m a -> m (Results a)
_waitForCurrentBatch Scheduler m ()
scheduler)
{-# INLINE runBatch_ #-}


-- | Same as `runBatch`, except it produces `Results` instead of a list.
--
-- @since 1.5.0
runBatchR ::
     Monad m => Scheduler m a -> (Batch m a -> m c) -> m (Results a)
runBatchR :: Scheduler m a -> (Batch m a -> m c) -> m (Results a)
runBatchR Scheduler m a
scheduler Batch m a -> m c
f = do
  c
_ <- Batch m a -> m c
f (Batch m a -> m c) -> m (Batch m a) -> m c
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Scheduler m a -> m (Batch m a)
forall (m :: * -> *) a. Monad m => Scheduler m a -> m (Batch m a)
getCurrentBatch Scheduler m a
scheduler
  Results a -> Results a
forall a. Results a -> Results a
reverseResults (Results a -> Results a) -> m (Results a) -> m (Results a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Scheduler m a -> m (Results a)
forall (m :: * -> *) a. Scheduler m a -> m (Results a)
_waitForCurrentBatch Scheduler m a
scheduler
{-# INLINE runBatchR #-}

{- $setup

>>> import Control.Exception
>>> import Control.Concurrent
>>> import Control.Concurrent.MVar

-}


{- $exceptions

If any one of the workers dies with an exception, even if that exceptions is asynchronous, it will be
re-thrown in the scheduling thread.


>>> let didAWorkerDie = handleJust asyncExceptionFromException (return . (== ThreadKilled)) . fmap or
>>> :t didAWorkerDie
didAWorkerDie :: Foldable t => IO (t Bool) -> IO Bool
>>> didAWorkerDie $ withScheduler Par $ \ s -> scheduleWork s $ pure False
False
>>> didAWorkerDie $ withScheduler Par $ \ s -> scheduleWork s $ myThreadId >>= killThread >> pure False
True
>>> withScheduler Par $ \ s -> scheduleWork s $ myThreadId >>= killThread >> pure False
*** Exception: thread killed

-}