| Copyright | (c) Alexey Kuleshevich 2018-2019 | 
|---|---|
| License | BSD3 | 
| Maintainer | Alexey Kuleshevich <lehins@yandex.ru> | 
| Stability | experimental | 
| Portability | non-portable | 
| Safe Haskell | None | 
| Language | Haskell2010 | 
Control.Scheduler
Description
Synopsis
- data Scheduler m a
- data SchedulerWS s m a
- trivialScheduler_ :: Applicative f => Scheduler f ()
- withScheduler :: MonadUnliftIO m => Comp -> (Scheduler m a -> m b) -> m [a]
- withScheduler_ :: MonadUnliftIO m => Comp -> (Scheduler m a -> m b) -> m ()
- withSchedulerWS :: MonadUnliftIO m => WorkerStates s -> (SchedulerWS s m a -> m b) -> m [a]
- withSchedulerWS_ :: MonadUnliftIO m => WorkerStates s -> (SchedulerWS s m () -> m b) -> m ()
- unwrapSchedulerWS :: SchedulerWS s m a -> Scheduler m a
- scheduleWork :: Scheduler m a -> m a -> m ()
- scheduleWork_ :: Scheduler m () -> m () -> m ()
- scheduleWorkId :: Scheduler m a -> (WorkerId -> m a) -> m ()
- scheduleWorkId_ :: Scheduler m () -> (WorkerId -> m ()) -> m ()
- scheduleWorkState :: SchedulerWS s m a -> (s -> m a) -> m ()
- scheduleWorkState_ :: SchedulerWS s m () -> (s -> m ()) -> m ()
- terminate :: Scheduler m a -> a -> m a
- terminate_ :: Scheduler m () -> m ()
- terminateWith :: Scheduler m a -> a -> m a
- newtype WorkerId = WorkerId {- getWorkerId :: Int
 
- data WorkerStates s
- numWorkers :: Scheduler m a -> Int
- workerStatesComp :: WorkerStates s -> Comp
- initWorkerStates :: MonadIO m => Comp -> (WorkerId -> m s) -> m (WorkerStates s)
- data Comp where
- getCompWorkers :: MonadIO m => Comp -> m Int
- replicateConcurrently :: MonadUnliftIO m => Comp -> Int -> m a -> m [a]
- replicateConcurrently_ :: MonadUnliftIO m => Comp -> Int -> m a -> m ()
- traverseConcurrently :: (MonadUnliftIO m, Traversable t) => Comp -> (a -> m b) -> t a -> m (t b)
- traverseConcurrently_ :: (MonadUnliftIO m, Foldable t) => Comp -> (a -> m b) -> t a -> m ()
- traverse_ :: (Applicative f, Foldable t) => (a -> f ()) -> t a -> f ()
- data MutexException = MutexException
Scheduler
Main type for scheduling work. See withScheduler or
 withScheduler_ for ways to construct and use this data type.
Since: 1.0.0
data SchedulerWS s m a Source #
This is a wrapper around Scheduler, but it also keeps a separate state for each
 individual worker. See withSchedulerWS or
 withSchedulerWS_ for ways to construct and use this data type.
Since: 1.4.0
trivialScheduler_ :: Applicative f => Scheduler f () Source #
The most basic scheduler that simply runs the task instead of scheduling it. Early termination requests are bluntly ignored.
Since: 1.1.0
Arguments
| :: MonadUnliftIO m | |
| => Comp | Computation strategy | 
| -> (Scheduler m a -> m b) | Action that will be scheduling all the work. | 
| -> m [a] | 
Initialize a scheduler and submit jobs that will be computed sequentially or in parallelel,
 which is determined by the Computation strategy.
Here are some cool properties about the withScheduler:
- This function will block until all of the submitted jobs have finished or at least one of them resulted in an exception, which will be re-thrown at the callsite.
- It is totally fine for nested jobs to submit more jobs for the same or other scheduler
- It is ok to initialize multiple schedulers at the same time, although that will likely result in suboptimal performance, unless workers are pinned to different capabilities.
- Warning It is pretty dangerous to schedule jobs that do blocking IO, since it can easily lead to deadlock, if you are not careful. Consider this example. First execution works fine, since there are two scheduled workers, and one can unblock the other, but the second scenario immediately results in a deadlock.
>>>withScheduler (ParOn [1,2]) $ \s -> newEmptyMVar >>= (\ mv -> scheduleWork s (readMVar mv) >> scheduleWork s (putMVar mv ()))[(),()]>>>import System.Timeout>>>timeout 1000000 $ withScheduler (ParOn [1]) $ \s -> newEmptyMVar >>= (\ mv -> scheduleWork s (readMVar mv) >> scheduleWork s (putMVar mv ()))Nothing
Important: In order to get work done truly in parallel, program needs to be compiled with
 -threaded GHC flag and executed with +RTS -N -RTS to use all available cores.
Since: 1.0.0
Arguments
| :: MonadUnliftIO m | |
| => Comp | Computation strategy | 
| -> (Scheduler m a -> m b) | Action that will be scheduling all the work. | 
| -> m () | 
Same as withScheduler, but discards results of submitted jobs.
Since: 1.0.0
withSchedulerWS :: MonadUnliftIO m => WorkerStates s -> (SchedulerWS s m a -> m b) -> m [a] Source #
Run a scheduler with stateful workers. Throws MutexException if an attempt is made
 to concurrently use the same WorkerStates with another SchedulerWS.
Examples
A good example of using stateful workers would be generation of random number in parallel. A lof of times random number generators are not gonna be thread safe, so we can work around this problem, by using a separate stateful generator for each of the workers.
>>>import Control.Monad as M ((>=>), replicateM)>>>import Control.Concurrent (yield, threadDelay)>>>import Data.List (sort)>>>-- ^ Above imports are used to make sure output is deterministic, which is needed for doctest>>>import System.Random.MWC as MWC>>>import Data.Vector.Unboxed as V (singleton)>>>states <- initWorkerStates (ParN 4) (MWC.initialize . V.singleton . fromIntegral . getWorkerId)>>>let scheduleGen scheduler = scheduleWorkState scheduler (MWC.uniform >=> \r -> yield >> threadDelay 200000 >> pure r)>>>sort <$> withSchedulerWS states (M.replicateM 4 . scheduleGen) :: IO [Double][0.21734983682025255,0.5000843862105709,0.5759825622603018,0.8587171114177893]>>>sort <$> withSchedulerWS states (M.replicateM 4 . scheduleGen) :: IO [Double][2.3598617298033475e-2,9.949679290089553e-2,0.38223134248645885,0.7408640677124702]
In the above example we use four different random number generators from
 `mwc-random` in order to generate 4
 numbers, all in separate threads. The subsequent call to the withSchedulerWS function
 with the same states is allowed to reuse the same generators, thus avoiding expensive
 initialization.
Side note - The example presented was crafted with slight trickery in order to guarantee that the output is deterministic, so if you run instructions exactly the same way in GHCI you will get the exact same output. Non-determinism comes from thread scheduling, rather than from random number generator, because we use exactly the same seed for each worker, but workers run concurrently. Exact output is not really needed, except for the doctests to pass.
Since: 1.4.0
withSchedulerWS_ :: MonadUnliftIO m => WorkerStates s -> (SchedulerWS s m () -> m b) -> m () Source #
Run a scheduler with stateful workers, while discarding computation results.
Since: 1.4.0
unwrapSchedulerWS :: SchedulerWS s m a -> Scheduler m a Source #
Get the underlying Scheduler, which cannot access WorkerStates.
Since: 1.4.0
Scheduling computation
scheduleWork :: Scheduler m a -> m a -> m () Source #
Schedule an action to be picked up and computed by a worker from a pool of
 jobs. Similar to scheduleWorkId, except the job doesn't get the worker id.
Since: 1.0.0
scheduleWork_ :: Scheduler m () -> m () -> m () Source #
Same as scheduleWork, but only for a Scheduler that doesn't keep the results.
Since: 1.1.0
scheduleWorkId :: Scheduler m a -> (WorkerId -> m a) -> m () Source #
Schedule an action to be picked up and computed by a worker from a pool of jobs. Argument supplied to the job will be the id of the worker doing the job.
Since: 1.2.0
scheduleWorkId_ :: Scheduler m () -> (WorkerId -> m ()) -> m () Source #
Same as scheduleWorkId, but only for a Scheduler that doesn't keep the results.
Since: 1.2.0
scheduleWorkState :: SchedulerWS s m a -> (s -> m a) -> m () Source #
Schedule a job that will get a worker state passed as an argument
Since: 1.4.0
scheduleWorkState_ :: SchedulerWS s m () -> (s -> m ()) -> m () Source #
Same as scheduleWorkState, but dont' keep the result of computation.
Since: 1.4.0
terminate :: Scheduler m a -> a -> m a Source #
As soon as possible try to terminate any computation that is being performed by all workers managed by this scheduler and collect whatever results have been computed, with supplied element guaranteed to being the last one.
Important - With Seq strategy this will not stop other scheduled tasks from being computed,
 although it will make sure their results are discarded.
Since: 1.1.0
terminate_ :: Scheduler m () -> m () Source #
terminateWith :: Scheduler m a -> a -> m a Source #
Same as terminate, but returning a single element list containing the supplied
 argument. This can be very useful for parallel search algorithms.
Important - Same as with terminate, when Seq strategy is used, this will not prevent
 computation from continuing, but the scheduler will return only the result supplied to this
 function.
Since: 1.1.0
Workers
A unique id for the worker in the Scheduler context. It will
 always be a number from 0 up to, but not including, the number of workers a scheduler
 has, which in turn can always be determined with numWorkers function.
Since: 1.4.0
Constructors
| WorkerId | |
| Fields 
 | |
Instances
| Enum WorkerId Source # | |
| Defined in Control.Scheduler.Queue | |
| Eq WorkerId Source # | |
| Num WorkerId Source # | |
| Ord WorkerId Source # | |
| Defined in Control.Scheduler.Queue | |
| Show WorkerId Source # | |
data WorkerStates s Source #
Each worker is capable of keeping it's own state, that can be share for different
 schedulers, but not at the same time. In other words using the same WorkerStates on
 withSchedulerS concurrently will result in an error. Can be initialized with
 initWorkerStates
Since: 1.4.0
numWorkers :: Scheduler m a -> Int Source #
Get the number of workers. Will mainly depend on the computation strategy and/or number of
 capabilities you have. Related function is getCompWorkers.
Since: 1.0.0
workerStatesComp :: WorkerStates s -> Comp Source #
Get the computation strategy the states where initialized with.
Since: 1.4.0
initWorkerStates :: MonadIO m => Comp -> (WorkerId -> m s) -> m (WorkerStates s) Source #
Initialize a separate state for each worker.
Since: 1.4.0
Computation strategies
Computation strategy to use when scheduling work.
Constructors
| Seq | Sequential computation | 
| ParOn ![Int] | Schedule workers to run on specific capabilities. Specifying an empty list  | 
| ParN !Word16 | Specify the number of workers that will be handling all the jobs. Difference from  | 
Bundled Patterns
| pattern Par :: Comp | Parallel computation using all available cores. Same as  Since: 1.0.0 | 
| pattern Par' :: Comp | Parallel computation using all available cores. Same as  Since: 1.1.0 | 
getCompWorkers :: MonadIO m => Comp -> m Int Source #
Figure out how many workers will this computation strategy create.
Note - If at any point during program execution global number of capabilities gets
 changed with setNumCapabilities, it will have no affect on this
 function, unless it hasn't yet been called with Par or Par' arguments.
Since: 1.1.0
Useful functions
replicateConcurrently :: MonadUnliftIO m => Comp -> Int -> m a -> m [a] Source #
Replicate an action n times and schedule them acccording to the supplied computation
 strategy.
Since: 1.1.0
replicateConcurrently_ :: MonadUnliftIO m => Comp -> Int -> m a -> m () Source #
Just like replicateConcurrently, but discards the results of computation.
Since: 1.1.0
traverseConcurrently :: (MonadUnliftIO m, Traversable t) => Comp -> (a -> m b) -> t a -> m (t b) Source #
Map an action over each element of the Traversable t acccording to the supplied computation
 strategy.
Since: 1.0.0
traverseConcurrently_ :: (MonadUnliftIO m, Foldable t) => Comp -> (a -> m b) -> t a -> m () Source #
Just like traverseConcurrently, but restricted to Foldable and discards the results of
 computation.
Since: 1.0.0
traverse_ :: (Applicative f, Foldable t) => (a -> f ()) -> t a -> f () Source #
This is generally a faster way to traverse while ignoring the result rather than using mapM_.
Since: 1.0.0
Exceptions
If any one of the workers dies with an exception, even if that exceptions is asynchronous, it will be re-thrown in the scheduling thread.
>>>let didAWorkerDie = handleJust asyncExceptionFromException (return . (== ThreadKilled)) . fmap or>>>:t didAWorkerDiedidAWorkerDie :: Foldable t => IO (t Bool) -> IO Bool>>>didAWorkerDie $ withScheduler Par $ \ s -> scheduleWork s $ pure FalseFalse>>>didAWorkerDie $ withScheduler Par $ \ s -> scheduleWork s $ myThreadId >>= killThread >> pure FalseTrue>>>withScheduler Par $ \ s -> scheduleWork s $ myThreadId >>= killThread >> pure False*** Exception: thread killed
data MutexException Source #
Exception that gets thrown whenever concurrent access is attempted to the WorkerStates
Since: 1.4.0
Constructors
| MutexException | 
Instances
| Eq MutexException Source # | |
| Defined in Control.Scheduler.Internal Methods (==) :: MutexException -> MutexException -> Bool # (/=) :: MutexException -> MutexException -> Bool # | |
| Show MutexException Source # | |
| Defined in Control.Scheduler.Internal Methods showsPrec :: Int -> MutexException -> ShowS # show :: MutexException -> String # showList :: [MutexException] -> ShowS # | |
| Exception MutexException Source # | |
| Defined in Control.Scheduler.Internal Methods toException :: MutexException -> SomeException # | |