{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Control.Concurrent.Async.Pool.Internal where

import           Control.Applicative (Applicative((<*>), pure), (<$>))
import           Control.Arrow (first)
import           Control.Concurrent (ThreadId)
import qualified Control.Concurrent.Async as Async (withAsync)
import           Control.Concurrent.Async.Pool.Async
import           Control.Concurrent.STM
import           Control.Exception (SomeException, throwIO, finally, bracket_)
import           Control.Monad hiding (forM, forM_)
import           Control.Monad.Base
import           Control.Monad.IO.Class (MonadIO(..))
import           Control.Monad.Trans.Control
import           Data.Foldable (Foldable(foldMap), toList, forM_, all)
import           Data.Graph.Inductive.Graph as Gr (Graph(empty))
import           Data.IntMap (IntMap)
import qualified Data.IntMap as IntMap
import           Data.List (delete)
import           Data.Monoid (Monoid(mempty), (<>))
import           Data.Traversable (Traversable(sequenceA), forM)
import           Prelude hiding (mapM_, mapM, foldr, all, any, concatMap, foldl1)

-- | Return a list of actions ready for execution, by checking the graph to
--   ensure all dependencies have completed.
getReadyNodes :: TaskGroup -> TaskGraph -> STM (IntMap (IO ThreadId, SomeTMVar))
getReadyNodes :: TaskGroup -> TaskGraph -> STM (IntMap (IO ThreadId, SomeTMVar))
getReadyNodes TaskGroup
p TaskGraph
g = do
    Int
availSlots <- forall a. TVar a -> STM a
readTVar (TaskGroup -> TVar Int
avail TaskGroup
p)
    Bool -> STM ()
check (Int
availSlots forall a. Ord a => a -> a -> Bool
> Int
0)
    IntMap (IO ThreadId, SomeTMVar)
taskQueue  <- forall a. TVar a -> STM a
readTVar (TaskGroup -> TVar (IntMap (IO ThreadId, SomeTMVar))
pending TaskGroup
p)
    Bool -> STM ()
check (Bool -> Bool
not (forall a. IntMap a -> Bool
IntMap.null IntMap (IO ThreadId, SomeTMVar)
taskQueue))
    let readyNodes :: IntMap (IO ThreadId, SomeTMVar)
readyNodes = forall a. [(Int, a)] -> IntMap a
IntMap.fromList
                   forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Int -> [a] -> [a]
take Int
availSlots
                   forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. IntMap a -> [(Int, a)]
IntMap.toAscList
                   forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. (Int -> a -> Bool) -> IntMap a -> IntMap a
IntMap.filterWithKey (forall a b. a -> b -> a
const forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Bool
isReady)
                   forall a b. (a -> b) -> a -> b
$ IntMap (IO ThreadId, SomeTMVar)
taskQueue
    Bool -> STM ()
check (Bool -> Bool
not (forall a. IntMap a -> Bool
IntMap.null IntMap (IO ThreadId, SomeTMVar)
readyNodes))
    forall a. TVar a -> a -> STM ()
writeTVar (TaskGroup -> TVar Int
avail TaskGroup
p) (Int
availSlots forall a. Num a => a -> a -> a
- forall a. IntMap a -> Int
IntMap.size IntMap (IO ThreadId, SomeTMVar)
readyNodes)
    forall a. TVar a -> a -> STM ()
writeTVar (TaskGroup -> TVar (IntMap (IO ThreadId, SomeTMVar))
pending TaskGroup
p) (IntMap (IO ThreadId, SomeTMVar)
taskQueue forall a b. IntMap a -> IntMap b -> IntMap a
IntMap.\\ IntMap (IO ThreadId, SomeTMVar)
readyNodes)
    forall (m :: * -> *) a. Monad m => a -> m a
return IntMap (IO ThreadId, SomeTMVar)
readyNodes
  where
    isReady :: Int -> Bool
isReady = forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all forall {a} {b}. (a, b, Status) -> Bool
isCompleted forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (gr :: * -> * -> *) a b.
Graph gr =>
gr a b -> Int -> [LEdge b]
inn TaskGraph
g

    isCompleted :: (a, b, Status) -> Bool
isCompleted (a
_, b
_, Status
Completed) = Bool
True
    isCompleted (a
_, b
_, Status
_)         = Bool
False

-- | Return a list of tasks ready to execute, and their related state
--   variables from the dependency graph.
getReadyTasks :: TaskGroup -> STM [(TVar State, (IO ThreadId, SomeTMVar))]
getReadyTasks :: TaskGroup -> STM [(TVar State, (IO ThreadId, SomeTMVar))]
getReadyTasks TaskGroup
p = do
    TaskGraph
g <- forall a. TVar a -> STM a
readTVar (Pool -> TVar TaskGraph
tasks (TaskGroup -> Pool
pool TaskGroup
p))
    forall a b. (a -> b) -> [a] -> [b]
map (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (b, d) (c, d)
first (TaskGraph -> Int -> TVar State
getTaskVar TaskGraph
g)) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. IntMap a -> [(Int, a)]
IntMap.toList forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TaskGroup -> TaskGraph -> STM (IntMap (IO ThreadId, SomeTMVar))
getReadyNodes TaskGroup
p TaskGraph
g

-- | Create a task pool for managing many-to-many acyclic dependencies among
--   tasks.
createPool :: IO Pool
createPool :: IO Pool
createPool = TVar TaskGraph -> TVar Int -> Pool
Pool forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (TVar a)
newTVarIO forall (gr :: * -> * -> *) a b. Graph gr => gr a b
Gr.empty
                  forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. a -> IO (TVar a)
newTVarIO Int
0

-- | Use a task pool for a bounded region. At the end of the region,
-- 'withPool' will block until all tasks have completed.
withPool :: (Pool -> IO a) -> IO a
withPool :: forall a. (Pool -> IO a) -> IO a
withPool Pool -> IO a
f = do
    Pool
p <- IO Pool
createPool
    a
x <- Pool -> IO a
f Pool
p
    forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ Pool -> STM ()
syncPool Pool
p
    forall (m :: * -> *) a. Monad m => a -> m a
return a
x

-- | Create a task group for executing interdependent tasks concurrently.  The
--   number of available slots governs how many tasks may run at one time.
createTaskGroup :: Pool -> Int -> IO TaskGroup
createTaskGroup :: Pool -> Int -> IO TaskGroup
createTaskGroup Pool
p Int
cnt = do
    TVar Int
c <- forall a. a -> IO (TVar a)
newTVarIO Int
cnt
    TVar (IntMap (IO ThreadId, SomeTMVar))
m <- forall a. a -> IO (TVar a)
newTVarIO forall a. IntMap a
IntMap.empty
    forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Pool
-> TVar Int -> TVar (IntMap (IO ThreadId, SomeTMVar)) -> TaskGroup
TaskGroup Pool
p TVar Int
c TVar (IntMap (IO ThreadId, SomeTMVar))
m

-- | Execute tasks in a given task group.  The number of slots determines how
--   many threads may execute concurrently.
runTaskGroup :: TaskGroup -> IO ()
runTaskGroup :: TaskGroup -> IO ()
runTaskGroup TaskGroup
p = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
    [(TVar State, (IO ThreadId, SomeTMVar))]
ready <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
        Int
cnt <- forall a. TVar a -> STM a
readTVar (TaskGroup -> TVar Int
avail TaskGroup
p)
        Bool -> STM ()
check (Int
cnt forall a. Ord a => a -> a -> Bool
> Int
0)
        [(TVar State, (IO ThreadId, SomeTMVar))]
ready <- TaskGroup -> STM [(TVar State, (IO ThreadId, SomeTMVar))]
getReadyTasks TaskGroup
p
        Bool -> STM ()
check (Bool -> Bool
not (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(TVar State, (IO ThreadId, SomeTMVar))]
ready))
        forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(TVar State, (IO ThreadId, SomeTMVar))]
ready forall a b. (a -> b) -> a -> b
$ \(TVar State
tv, (IO ThreadId, SomeTMVar)
_) -> forall a. TVar a -> a -> STM ()
writeTVar TVar State
tv State
Starting
        forall (m :: * -> *) a. Monad m => a -> m a
return [(TVar State, (IO ThreadId, SomeTMVar))]
ready
    forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(TVar State, (IO ThreadId, SomeTMVar))]
ready forall a b. (a -> b) -> a -> b
$ \(TVar State
tv, (IO ThreadId
go, SomeTMVar
var)) -> do
        ThreadId
t <- IO ThreadId
go
        forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM a
swapTVar TVar State
tv forall a b. (a -> b) -> a -> b
$ ThreadId -> SomeTMVar -> State
Started ThreadId
t SomeTMVar
var

-- | Create a task group within the given pool having a specified number of
--   execution slots, but with a bounded lifetime.  Leaving the block cancels
--   every task still executing in the group.
withTaskGroupIn :: Pool -> Int -> (TaskGroup -> IO b) -> IO b
withTaskGroupIn :: forall b. Pool -> Int -> (TaskGroup -> IO b) -> IO b
withTaskGroupIn Pool
p Int
n TaskGroup -> IO b
f = Pool -> Int -> IO TaskGroup
createTaskGroup Pool
p Int
n forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TaskGroup
g ->
    forall a b. IO a -> (Async a -> IO b) -> IO b
Async.withAsync (TaskGroup -> IO ()
runTaskGroup TaskGroup
g) forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const forall a b. (a -> b) -> a -> b
$ TaskGroup -> IO b
f TaskGroup
g forall a b. IO a -> IO b -> IO a
`finally` TaskGroup -> IO ()
cancelAll TaskGroup
g

-- | Create both a pool, and a task group with a given number of execution slots,
--   but with a bounded lifetime. Once the given function exits, all tasks (that 
--   are still running) in the TaskGroup will be cancelled.
withTaskGroup :: Int -> (TaskGroup -> IO b) -> IO b
withTaskGroup :: forall b. Int -> (TaskGroup -> IO b) -> IO b
withTaskGroup Int
n TaskGroup -> IO b
f = IO Pool
createPool forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Pool
p -> forall b. Pool -> Int -> (TaskGroup -> IO b) -> IO b
withTaskGroupIn Pool
p Int
n TaskGroup -> IO b
f

-- | Given parent and child tasks, link them so the child cannot execute until
--   the parent has finished.
makeDependent :: Pool
              -> Handle    -- ^ Handle of task doing the waiting
              -> Handle    -- ^ Handle of task we must wait on (the parent)
              -> STM ()
makeDependent :: Pool -> Int -> Int -> STM ()
makeDependent Pool
p Int
child Int
parent = do
    TaskGraph
g <- forall a. TVar a -> STM a
readTVar (Pool -> TVar TaskGraph
tasks Pool
p)
    -- Check whether the parent is in any way dependent on the child, which
    -- would introduce a cycle.
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (gr :: * -> * -> *) a b. Graph gr => Int -> gr a b -> Bool
gelem Int
parent TaskGraph
g) forall a b. (a -> b) -> a -> b
$
        case forall (gr :: * -> * -> *) a b.
Graph gr =>
Int -> Int -> gr a b -> Path
esp Int
child Int
parent TaskGraph
g of
            -- If the parent is no longer in the graph, there is no need to
            -- establish a dependency.  The child can begin executing in the
            -- next free slot.
            [] -> forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (Pool -> TVar TaskGraph
tasks Pool
p) (forall (gr :: * -> * -> *) b a.
DynGraph gr =>
LEdge b -> gr a b -> gr a b
insEdge (Int
parent, Int
child, Status
Pending))
            Path
_  -> forall a. HasCallStack => [Char] -> a
error [Char]
"makeDependent: Cycle in task graph"

-- | Given parent and child tasks, link them so the child cannot execute until
--   the parent has finished.  This function does not check for introduction of
--   cycles into the dependency graph, which would prevent the child from ever
--   running.
unsafeMakeDependent :: Pool
                    -> Handle    -- ^ Handle of task doing the waiting
                    -> Handle    -- ^ Handle of task we must wait on (the parent)
                    -> STM ()
unsafeMakeDependent :: Pool -> Int -> Int -> STM ()
unsafeMakeDependent Pool
p Int
child Int
parent = do
    TaskGraph
g <- forall a. TVar a -> STM a
readTVar (Pool -> TVar TaskGraph
tasks Pool
p)
    -- If the parent is no longer in the graph, there is no need to establish
    -- dependency.  The child can begin executing in the next free slot.
    forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (gr :: * -> * -> *) a b. Graph gr => Int -> gr a b -> Bool
gelem Int
parent TaskGraph
g) forall a b. (a -> b) -> a -> b
$
        forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (Pool -> TVar TaskGraph
tasks Pool
p) (forall (gr :: * -> * -> *) b a.
DynGraph gr =>
LEdge b -> gr a b -> gr a b
insEdge (Int
parent, Int
child, Status
Pending))

-- | Equivalent to 'async', but acts in STM so that 'makeDependent' may be
--   called after the task is created, but before it begins executing.
asyncSTM :: TaskGroup -> IO a -> STM (Async a)
asyncSTM :: forall a. TaskGroup -> IO a -> STM (Async a)
asyncSTM TaskGroup
p = forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO

-- | Submit a task which begins execution after all its parents have completed.
--   This is equivalent to submitting a new task with 'asyncSTM' and linking
--   it to its parents using 'mapM makeDependent'.
asyncAfterAll :: TaskGroup -> [Handle] -> IO a -> IO (Async a)
asyncAfterAll :: forall a. TaskGroup -> Path -> IO a -> IO (Async a)
asyncAfterAll TaskGroup
p Path
parents IO a
t = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
    Async a
child <- forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO IO a
t
    forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Path
parents forall a b. (a -> b) -> a -> b
$ Pool -> Int -> Int -> STM ()
makeDependent (TaskGroup -> Pool
pool TaskGroup
p) (forall a. Async a -> Int
taskHandle Async a
child)
    forall (m :: * -> *) a. Monad m => a -> m a
return Async a
child

-- | Submit a task that begins execution only after its parent has completed.
--   This is equivalent to submitting a new task with 'asyncSTM' and linking
--   it to its parent using 'makeDependent'.
asyncAfter :: TaskGroup -> Async b -> IO a -> IO (Async a)
asyncAfter :: forall b a. TaskGroup -> Async b -> IO a -> IO (Async a)
asyncAfter TaskGroup
p Async b
parent = forall a. TaskGroup -> Path -> IO a -> IO (Async a)
asyncAfterAll TaskGroup
p [forall a. Async a -> Int
taskHandle Async b
parent]

extraWorkerWhileBlocked :: TaskGroup -> IO a -> IO a
extraWorkerWhileBlocked :: forall a. TaskGroup -> IO a -> IO a
extraWorkerWhileBlocked TaskGroup
p = forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (TaskGroup -> TVar Int
avail TaskGroup
p) (forall a. Num a => a -> a -> a
+ Int
1)) (forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (TaskGroup -> TVar Int
avail TaskGroup
p) ((-) Int
1))

-- | Helper function used by several of the variants of 'mapTasks' below.
mapTasksWorker :: Traversable t
               => TaskGroup
               -> t (IO a)
               -> (IO (t b) -> IO (t c))
               -> (Async a -> IO b)
               -> IO (t c)
mapTasksWorker :: forall (t :: * -> *) a b c.
Traversable t =>
TaskGroup
-> t (IO a)
-> (IO (t b) -> IO (t c))
-> (Async a -> IO b)
-> IO (t c)
mapTasksWorker TaskGroup
p t (IO a)
fs IO (t b) -> IO (t c)
f Async a -> IO b
g = do
    t (Async a)
hs <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM t (IO a)
fs forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsingLazy TaskGroup
p IO () -> IO ThreadId
rawForkIO
    forall a. TaskGroup -> IO a -> IO a
extraWorkerWhileBlocked TaskGroup
p forall a b. (a -> b) -> a -> b
$ IO (t b) -> IO (t c)
f forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM t (Async a)
hs Async a -> IO b
g

-- | Execute a group of tasks within the given task group, returning the
--   results in order.  The order of execution is random, but the results are
--   returned in order.
mapTasks :: Traversable t => TaskGroup -> t (IO a) -> IO (t a)
mapTasks :: forall (t :: * -> *) a.
Traversable t =>
TaskGroup -> t (IO a) -> IO (t a)
mapTasks TaskGroup
p t (IO a)
fs = forall (t :: * -> *) a b c.
Traversable t =>
TaskGroup
-> t (IO a)
-> (IO (t b) -> IO (t c))
-> (Async a -> IO b)
-> IO (t c)
mapTasksWorker TaskGroup
p t (IO a)
fs forall a. a -> a
id forall a. Async a -> IO a
wait

-- | Execute a group of tasks within the given task group, returning the
--   results in order as an Either type to represent exceptions from actions.
--   The order of execution is random, but the results are returned in order.
mapTasksE :: Traversable t => TaskGroup -> t (IO a) -> IO (t (Either SomeException a))
mapTasksE :: forall (t :: * -> *) a.
Traversable t =>
TaskGroup -> t (IO a) -> IO (t (Either SomeException a))
mapTasksE TaskGroup
p t (IO a)
fs = forall (t :: * -> *) a b c.
Traversable t =>
TaskGroup
-> t (IO a)
-> (IO (t b) -> IO (t c))
-> (Async a -> IO b)
-> IO (t c)
mapTasksWorker TaskGroup
p t (IO a)
fs forall a. a -> a
id forall a. Async a -> IO (Either SomeException a)
waitCatch

-- | Execute a group of tasks within the given task group, ignoring results.
mapTasks_ :: Foldable t => TaskGroup -> t (IO a) -> IO ()
mapTasks_ :: forall (t :: * -> *) a.
Foldable t =>
TaskGroup -> t (IO a) -> IO ()
mapTasks_ TaskGroup
p t (IO a)
fs = forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ t (IO a)
fs forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO

-- | Execute a group of tasks within the given task group, ignoring results,
--   but returning a list of all exceptions.
mapTasksE_ :: Traversable t => TaskGroup -> t (IO a) -> IO (t (Maybe SomeException))
mapTasksE_ :: forall (t :: * -> *) a.
Traversable t =>
TaskGroup -> t (IO a) -> IO (t (Maybe SomeException))
mapTasksE_ TaskGroup
p t (IO a)
fs = forall (t :: * -> *) a b c.
Traversable t =>
TaskGroup
-> t (IO a)
-> (IO (t b) -> IO (t c))
-> (Async a -> IO b)
-> IO (t c)
mapTasksWorker TaskGroup
p t (IO a)
fs (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. Either a b -> Maybe a
leftToMaybe)) forall a. Async a -> IO (Either SomeException a)
waitCatch
  where
    leftToMaybe :: Either a b -> Maybe a
    leftToMaybe :: forall a b. Either a b -> Maybe a
leftToMaybe = forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> Maybe a
Just (forall a b. a -> b -> a
const forall a. Maybe a
Nothing)

-- | Execute a group of tasks, but return the first result or failure and
--   cancel the remaining tasks.
mapRace :: Foldable t
        => TaskGroup -> t (IO a) -> IO (Async a, Either SomeException a)
mapRace :: forall (t :: * -> *) a.
Foldable t =>
TaskGroup -> t (IO a) -> IO (Async a, Either SomeException a)
mapRace TaskGroup
p t (IO a)
fs = do
    [Async a]
hs <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
sequenceA forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((forall a. a -> [a] -> [a]
:[]) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO) t (IO a)
fs
    forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel [Async a]
hs

-- | Given a list of actions yielding 'Monoid' results, execute the actions
--   concurrently (up to N at a time, based on available slots), and 'mappend'
--   each pair of results concurrently as they become ready.  The immediate
--   result of this function is an 'Async' representing the final value.
--
--   This is similar to the following: @mconcat <$> mapTasks n actions@,
--   except that intermediate results can be garbage collected as soon as
--   they've been merged.  Also, the value returned from this function is an
--   'Async' which may be polled for the final result.
--
--   Lastly, if an 'Exception' occurs in any subtask, the final result will
--   also yield an exception -- but not necessarily the first or last that was
--   caught.
mapReduce :: (Foldable t, Monoid a)
          => TaskGroup     -- ^ Task group to execute the tasks within
          -> t (IO a)      -- ^ Set of Monoid-yielding IO actions
          -> STM (Async a) -- ^ Returns the final result task
mapReduce :: forall (t :: * -> *) a.
(Foldable t, Monoid a) =>
TaskGroup -> t (IO a) -> STM (Async a)
mapReduce TaskGroup
p t (IO a)
fs = do
    -- Submit all the tasks right away, and jobs to combine all those results.
    -- Since we're working with a Monoid, it doesn't matter what order they
    -- complete in, or what order we combine the results in, just as long we
    -- each combination waits on the results it intends to combine.
    [Async a]
hs <- forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
sequenceA forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((forall a. a -> [a] -> [a]
:[]) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO) t (IO a)
fs
    forall {a}. Monoid a => [Async a] -> STM (Async a)
loopM [Async a]
hs
  where
    loopM :: [Async a] -> STM (Async a)
loopM [Async a]
hs = do
        [Async a]
hs' <- forall {a}. Monoid a => [Async a] -> STM [Async a]
squeeze [Async a]
hs
        case [Async a]
hs' of
            []  -> forall a. HasCallStack => [Char] -> a
error [Char]
"mapReduce: impossible"
            [Async a
x] -> forall (m :: * -> *) a. Monad m => a -> m a
return Async a
x
            [Async a]
xs  -> [Async a] -> STM (Async a)
loopM [Async a]
xs

    squeeze :: [Async a] -> STM [Async a]
squeeze []  = (forall a. a -> [a] -> [a]
:[]) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO (forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Monoid a => a
mempty)
    squeeze [Async a
x] = forall (m :: * -> *) a. Monad m => a -> m a
return [Async a
x]
    squeeze (Async a
x:Async a
y:[Async a]
xs) = do
        Async a
t <- forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO forall a b. (a -> b) -> a -> b
$ do
            Either SomeException a
meres <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
                -- These polls should by definition always succeed, since this
                -- task should not start until results are available.
                Maybe (Either SomeException a)
eres1 <- forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM Async a
x
                Maybe (Either SomeException a)
eres2 <- forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM Async a
y
                case forall (m :: * -> *) a1 a2 r.
Monad m =>
(a1 -> a2 -> r) -> m a1 -> m a2 -> m r
liftM2 forall a. Semigroup a => a -> a -> a
(<>) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Either SomeException a)
eres1 forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe (Either SomeException a)
eres2 of
                    Maybe (Either SomeException a)
Nothing -> forall a. STM a
retry
                    Just Either SomeException a
a  -> forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
a
            case Either SomeException a
meres of
                Left SomeException
e  -> forall e a. Exception e => e -> IO a
throwIO SomeException
e
                Right a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
        forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Async a
x, Async a
y] (Pool -> Int -> Int -> STM ()
unsafeMakeDependent (TaskGroup -> Pool
pool TaskGroup
p) (forall a. Async a -> Int
taskHandle Async a
t) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> Int
taskHandle)
        case [Async a]
xs of
            [] -> forall (m :: * -> *) a. Monad m => a -> m a
return [Async a
t]
            [Async a]
_  -> (Async a
t forall a. a -> [a] -> [a]
:) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Async a] -> STM [Async a]
squeeze [Async a]
xs

-- | Execute a group of tasks concurrently (using up to N active threads,
--   depending on the task group), and feed results to a continuation as soon
--   as they become available, in random order.  The continuation function may
--   return a monoid value which is accumulated to yield a final result.  If
--   no such value is needed, simply provide `()`.
scatterFoldMapM :: (Foldable t, Monoid b, MonadBaseControl IO m)
                => TaskGroup -> t (IO a) -> (Either SomeException a -> m b) -> m b
scatterFoldMapM :: forall (t :: * -> *) b (m :: * -> *) a.
(Foldable t, Monoid b, MonadBaseControl IO m) =>
TaskGroup -> t (IO a) -> (Either SomeException a -> m b) -> m b
scatterFoldMapM TaskGroup
p t (IO a)
fs Either SomeException a -> m b
f = do
    [Async a]
hs <- forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically
                  forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
sequenceA
                  forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((forall a. a -> [a] -> [a]
:[]) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO) t (IO a)
fs
    forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b (StM m a)) -> m a
control forall a b. (a -> b) -> a -> b
$ \(m b -> IO (StM m b)
run :: m b -> IO (StM m b)) -> forall {b :: * -> *}.
MonadBaseControl b m =>
(m b -> IO (StM m b)) -> IO (StM m b) -> [Async a] -> IO (StM m b)
loop m b -> IO (StM m b)
run (m b -> IO (StM m b)
run forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Monoid a => a
mempty) (forall (t :: * -> *) a. Foldable t => t a -> [a]
toList [Async a]
hs)
  where
    loop :: (m b -> IO (StM m b)) -> IO (StM m b) -> [Async a] -> IO (StM m b)
loop m b -> IO (StM m b)
_ IO (StM m b)
z [] = IO (StM m b)
z
    loop m b -> IO (StM m b)
run IO (StM m b)
z [Async a]
hs = do
        (Async a
h, Either SomeException a
eres) <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
            Maybe (Async a, Either SomeException a)
mres <- forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM forall {b}.
Maybe (Async b, Either SomeException b)
-> Async b -> STM (Maybe (Async b, Either SomeException b))
go forall a. Maybe a
Nothing [Async a]
hs
            forall b a. b -> (a -> b) -> Maybe a -> b
maybe forall a. STM a
retry forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Async a, Either SomeException a)
mres
        StM m b
r' <- IO (StM m b)
z
        StM m b
r  <- m b -> IO (StM m b)
run forall a b. (a -> b) -> a -> b
$ do
            b
s <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m b
r'
            b
r <- Either SomeException a -> m b
f Either SomeException a
eres
            forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ b
s forall a. Semigroup a => a -> a -> a
<> b
r
        (m b -> IO (StM m b)) -> IO (StM m b) -> [Async a] -> IO (StM m b)
loop m b -> IO (StM m b)
run (forall (m :: * -> *) a. Monad m => a -> m a
return StM m b
r) (forall a. Eq a => a -> [a] -> [a]
delete Async a
h [Async a]
hs)

    go :: Maybe (Async b, Either SomeException b)
-> Async b -> STM (Maybe (Async b, Either SomeException b))
go acc :: Maybe (Async b, Either SomeException b)
acc@(Just (Async b, Either SomeException b)
_) Async b
_ = forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Async b, Either SomeException b)
acc
    go Maybe (Async b, Either SomeException b)
acc Async b
h = do
        Maybe (Either SomeException b)
eres <- forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM Async b
h
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Maybe (Either SomeException b)
eres of
            Maybe (Either SomeException b)
Nothing        -> Maybe (Async b, Either SomeException b)
acc
            Just (Left SomeException
e)  -> forall a. a -> Maybe a
Just (Async b
h, forall a b. a -> Either a b
Left SomeException
e)
            Just (Right b
x) -> forall a. a -> Maybe a
Just (Async b
h, forall a b. b -> Either a b
Right b
x)

-- | maps an @IO@-performing function over any @Traversable@ data
-- type, performing all the @IO@ actions concurrently, and returning
-- the original data structure with the arguments replaced by the
-- results.
--
-- For example, @mapConcurrently@ works with lists:
--
-- > pages <- mapConcurrently getURL ["url1", "url2", "url3"]
--
mapConcurrently :: Traversable t => TaskGroup -> (a -> IO b) -> t a -> IO (t b)
mapConcurrently :: forall (t :: * -> *) a b.
Traversable t =>
TaskGroup -> (a -> IO b) -> t a -> IO (t b)
mapConcurrently TaskGroup
tg a -> IO b
f = forall (t :: * -> *) a.
Traversable t =>
TaskGroup -> t (IO a) -> IO (t a)
mapTasks TaskGroup
tg forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> IO b
f

-- | The 'Task' Applicative and Monad allow for task dependencies to be built
--   using Applicative and do notation.  Monadic evaluation is sequenced,
--   while applicative Evaluation is concurrent for each argument.  In this
--   way, mixing the two builds a dependency graph via ordinary Haskell code.
newtype Task a = Task { forall a. Task a -> TaskGroup -> IO (IO a)
runTask' :: TaskGroup -> IO (IO a) }

-- | Run a value in the 'Task' monad and block until the final result is
--   computed.
runTask :: TaskGroup -> Task a -> IO a
runTask :: forall a. TaskGroup -> Task a -> IO a
runTask TaskGroup
group Task a
ts = forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall a b. (a -> b) -> a -> b
$ forall a. Task a -> TaskGroup -> IO (IO a)
runTask' Task a
ts TaskGroup
group

-- | Lift any 'IO' action into a 'Task'.  This is a synonym for 'liftIO'.
task :: IO a -> Task a
task :: forall a. IO a -> Task a
task IO a
action = forall a. (TaskGroup -> IO (IO a)) -> Task a
Task forall a b. (a -> b) -> a -> b
$ \TaskGroup
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return IO a
action

instance Functor Task where
    fmap :: forall a b. (a -> b) -> Task a -> Task b
fmap a -> b
f (Task TaskGroup -> IO (IO a)
k) = forall a. (TaskGroup -> IO (IO a)) -> Task a
Task forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM a -> b
f)) TaskGroup -> IO (IO a)
k

instance Applicative Task where
    pure :: forall a. a -> Task a
pure a
x = forall a. (TaskGroup -> IO (IO a)) -> Task a
Task forall a b. (a -> b) -> a -> b
$ \TaskGroup
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) a. Monad m => a -> m a
return a
x)
    Task TaskGroup -> IO (IO (a -> b))
f <*> :: forall a b. Task (a -> b) -> Task a -> Task b
<*> Task TaskGroup -> IO (IO a)
x = forall a. (TaskGroup -> IO (IO a)) -> Task a
Task forall a b. (a -> b) -> a -> b
$ \TaskGroup
tg -> do
        IO a
xa <- TaskGroup -> IO (IO a)
x TaskGroup
tg
        IO a
x' <- forall a. Async a -> IO a
wait forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TaskGroup -> IO a -> IO (Async a)
async TaskGroup
tg IO a
xa
        IO (a -> b)
fa <- TaskGroup -> IO (IO (a -> b))
f TaskGroup
tg
        forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ IO (a -> b)
fa forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO a
x'

instance Monad Task where
    return :: forall a. a -> Task a
return = forall (f :: * -> *) a. Applicative f => a -> f a
pure
    Task TaskGroup -> IO (IO a)
m >>= :: forall a b. Task a -> (a -> Task b) -> Task b
>>= a -> Task b
f = forall a. (TaskGroup -> IO (IO a)) -> Task a
Task forall a b. (a -> b) -> a -> b
$ \TaskGroup
tg -> forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (TaskGroup -> IO (IO a)
m TaskGroup
tg) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a b c. (a -> b -> c) -> b -> a -> c
flip forall a. Task a -> TaskGroup -> IO (IO a)
runTask' TaskGroup
tg forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Task b
f

instance MonadIO Task where
    liftIO :: forall a. IO a -> Task a
liftIO = forall a. IO a -> Task a
task