{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Control.Concurrent.Async.Pool.Internal where
import Control.Applicative (Applicative((<*>), pure), (<$>))
import Control.Arrow (first)
import Control.Concurrent (ThreadId)
import qualified Control.Concurrent.Async as Async (withAsync)
import Control.Concurrent.Async.Pool.Async
import Control.Concurrent.STM
import Control.Exception (SomeException, throwIO, finally, bracket_)
import Control.Monad hiding (forM, forM_)
import Control.Monad.Base
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control
import Data.Foldable (Foldable(foldMap), toList, forM_, all)
import Data.Graph.Inductive.Graph as Gr (Graph(empty))
import Data.IntMap (IntMap)
import qualified Data.IntMap as IntMap
import Data.List (delete)
import Data.Monoid (Monoid(mempty), (<>))
import Data.Traversable (Traversable(sequenceA), forM)
import Prelude hiding (mapM_, mapM, foldr, all, any, concatMap, foldl1)
getReadyNodes :: TaskGroup -> TaskGraph -> STM (IntMap (IO ThreadId, SomeTMVar))
getReadyNodes :: TaskGroup -> TaskGraph -> STM (IntMap (IO ThreadId, SomeTMVar))
getReadyNodes TaskGroup
p TaskGraph
g = do
Int
availSlots <- forall a. TVar a -> STM a
readTVar (TaskGroup -> TVar Int
avail TaskGroup
p)
Bool -> STM ()
check (Int
availSlots forall a. Ord a => a -> a -> Bool
> Int
0)
IntMap (IO ThreadId, SomeTMVar)
taskQueue <- forall a. TVar a -> STM a
readTVar (TaskGroup -> TVar (IntMap (IO ThreadId, SomeTMVar))
pending TaskGroup
p)
Bool -> STM ()
check (Bool -> Bool
not (forall a. IntMap a -> Bool
IntMap.null IntMap (IO ThreadId, SomeTMVar)
taskQueue))
let readyNodes :: IntMap (IO ThreadId, SomeTMVar)
readyNodes = forall a. [(Int, a)] -> IntMap a
IntMap.fromList
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Int -> [a] -> [a]
take Int
availSlots
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. IntMap a -> [(Int, a)]
IntMap.toAscList
forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. (Int -> a -> Bool) -> IntMap a -> IntMap a
IntMap.filterWithKey (forall a b. a -> b -> a
const forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> Bool
isReady)
forall a b. (a -> b) -> a -> b
$ IntMap (IO ThreadId, SomeTMVar)
taskQueue
Bool -> STM ()
check (Bool -> Bool
not (forall a. IntMap a -> Bool
IntMap.null IntMap (IO ThreadId, SomeTMVar)
readyNodes))
forall a. TVar a -> a -> STM ()
writeTVar (TaskGroup -> TVar Int
avail TaskGroup
p) (Int
availSlots forall a. Num a => a -> a -> a
- forall a. IntMap a -> Int
IntMap.size IntMap (IO ThreadId, SomeTMVar)
readyNodes)
forall a. TVar a -> a -> STM ()
writeTVar (TaskGroup -> TVar (IntMap (IO ThreadId, SomeTMVar))
pending TaskGroup
p) (IntMap (IO ThreadId, SomeTMVar)
taskQueue forall a b. IntMap a -> IntMap b -> IntMap a
IntMap.\\ IntMap (IO ThreadId, SomeTMVar)
readyNodes)
forall (m :: * -> *) a. Monad m => a -> m a
return IntMap (IO ThreadId, SomeTMVar)
readyNodes
where
isReady :: Int -> Bool
isReady = forall (t :: * -> *) a. Foldable t => (a -> Bool) -> t a -> Bool
all forall {a} {b}. (a, b, Status) -> Bool
isCompleted forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (gr :: * -> * -> *) a b.
Graph gr =>
gr a b -> Int -> [LEdge b]
inn TaskGraph
g
isCompleted :: (a, b, Status) -> Bool
isCompleted (a
_, b
_, Status
Completed) = Bool
True
isCompleted (a
_, b
_, Status
_) = Bool
False
getReadyTasks :: TaskGroup -> STM [(TVar State, (IO ThreadId, SomeTMVar))]
getReadyTasks :: TaskGroup -> STM [(TVar State, (IO ThreadId, SomeTMVar))]
getReadyTasks TaskGroup
p = do
TaskGraph
g <- forall a. TVar a -> STM a
readTVar (Pool -> TVar TaskGraph
tasks (TaskGroup -> Pool
pool TaskGroup
p))
forall a b. (a -> b) -> [a] -> [b]
map (forall (a :: * -> * -> *) b c d.
Arrow a =>
a b c -> a (b, d) (c, d)
first (TaskGraph -> Int -> TVar State
getTaskVar TaskGraph
g)) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. IntMap a -> [(Int, a)]
IntMap.toList forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TaskGroup -> TaskGraph -> STM (IntMap (IO ThreadId, SomeTMVar))
getReadyNodes TaskGroup
p TaskGraph
g
createPool :: IO Pool
createPool :: IO Pool
createPool = TVar TaskGraph -> TVar Int -> Pool
Pool forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (TVar a)
newTVarIO forall (gr :: * -> * -> *) a b. Graph gr => gr a b
Gr.empty
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> forall a. a -> IO (TVar a)
newTVarIO Int
0
withPool :: (Pool -> IO a) -> IO a
withPool :: forall a. (Pool -> IO a) -> IO a
withPool Pool -> IO a
f = do
Pool
p <- IO Pool
createPool
a
x <- Pool -> IO a
f Pool
p
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ Pool -> STM ()
syncPool Pool
p
forall (m :: * -> *) a. Monad m => a -> m a
return a
x
createTaskGroup :: Pool -> Int -> IO TaskGroup
createTaskGroup :: Pool -> Int -> IO TaskGroup
createTaskGroup Pool
p Int
cnt = do
TVar Int
c <- forall a. a -> IO (TVar a)
newTVarIO Int
cnt
TVar (IntMap (IO ThreadId, SomeTMVar))
m <- forall a. a -> IO (TVar a)
newTVarIO forall a. IntMap a
IntMap.empty
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ Pool
-> TVar Int -> TVar (IntMap (IO ThreadId, SomeTMVar)) -> TaskGroup
TaskGroup Pool
p TVar Int
c TVar (IntMap (IO ThreadId, SomeTMVar))
m
runTaskGroup :: TaskGroup -> IO ()
runTaskGroup :: TaskGroup -> IO ()
runTaskGroup TaskGroup
p = forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
[(TVar State, (IO ThreadId, SomeTMVar))]
ready <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Int
cnt <- forall a. TVar a -> STM a
readTVar (TaskGroup -> TVar Int
avail TaskGroup
p)
Bool -> STM ()
check (Int
cnt forall a. Ord a => a -> a -> Bool
> Int
0)
[(TVar State, (IO ThreadId, SomeTMVar))]
ready <- TaskGroup -> STM [(TVar State, (IO ThreadId, SomeTMVar))]
getReadyTasks TaskGroup
p
Bool -> STM ()
check (Bool -> Bool
not (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [(TVar State, (IO ThreadId, SomeTMVar))]
ready))
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(TVar State, (IO ThreadId, SomeTMVar))]
ready forall a b. (a -> b) -> a -> b
$ \(TVar State
tv, (IO ThreadId, SomeTMVar)
_) -> forall a. TVar a -> a -> STM ()
writeTVar TVar State
tv State
Starting
forall (m :: * -> *) a. Monad m => a -> m a
return [(TVar State, (IO ThreadId, SomeTMVar))]
ready
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [(TVar State, (IO ThreadId, SomeTMVar))]
ready forall a b. (a -> b) -> a -> b
$ \(TVar State
tv, (IO ThreadId
go, SomeTMVar
var)) -> do
ThreadId
t <- IO ThreadId
go
forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> a -> STM a
swapTVar TVar State
tv forall a b. (a -> b) -> a -> b
$ ThreadId -> SomeTMVar -> State
Started ThreadId
t SomeTMVar
var
withTaskGroupIn :: Pool -> Int -> (TaskGroup -> IO b) -> IO b
withTaskGroupIn :: forall b. Pool -> Int -> (TaskGroup -> IO b) -> IO b
withTaskGroupIn Pool
p Int
n TaskGroup -> IO b
f = Pool -> Int -> IO TaskGroup
createTaskGroup Pool
p Int
n forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \TaskGroup
g ->
forall a b. IO a -> (Async a -> IO b) -> IO b
Async.withAsync (TaskGroup -> IO ()
runTaskGroup TaskGroup
g) forall a b. (a -> b) -> a -> b
$ forall a b. a -> b -> a
const forall a b. (a -> b) -> a -> b
$ TaskGroup -> IO b
f TaskGroup
g forall a b. IO a -> IO b -> IO a
`finally` TaskGroup -> IO ()
cancelAll TaskGroup
g
withTaskGroup :: Int -> (TaskGroup -> IO b) -> IO b
withTaskGroup :: forall b. Int -> (TaskGroup -> IO b) -> IO b
withTaskGroup Int
n TaskGroup -> IO b
f = IO Pool
createPool forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Pool
p -> forall b. Pool -> Int -> (TaskGroup -> IO b) -> IO b
withTaskGroupIn Pool
p Int
n TaskGroup -> IO b
f
makeDependent :: Pool
-> Handle
-> Handle
-> STM ()
makeDependent :: Pool -> Int -> Int -> STM ()
makeDependent Pool
p Int
child Int
parent = do
TaskGraph
g <- forall a. TVar a -> STM a
readTVar (Pool -> TVar TaskGraph
tasks Pool
p)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (gr :: * -> * -> *) a b. Graph gr => Int -> gr a b -> Bool
gelem Int
parent TaskGraph
g) forall a b. (a -> b) -> a -> b
$
case forall (gr :: * -> * -> *) a b.
Graph gr =>
Int -> Int -> gr a b -> Path
esp Int
child Int
parent TaskGraph
g of
[] -> forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (Pool -> TVar TaskGraph
tasks Pool
p) (forall (gr :: * -> * -> *) b a.
DynGraph gr =>
LEdge b -> gr a b -> gr a b
insEdge (Int
parent, Int
child, Status
Pending))
Path
_ -> forall a. HasCallStack => [Char] -> a
error [Char]
"makeDependent: Cycle in task graph"
unsafeMakeDependent :: Pool
-> Handle
-> Handle
-> STM ()
unsafeMakeDependent :: Pool -> Int -> Int -> STM ()
unsafeMakeDependent Pool
p Int
child Int
parent = do
TaskGraph
g <- forall a. TVar a -> STM a
readTVar (Pool -> TVar TaskGraph
tasks Pool
p)
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (forall (gr :: * -> * -> *) a b. Graph gr => Int -> gr a b -> Bool
gelem Int
parent TaskGraph
g) forall a b. (a -> b) -> a -> b
$
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar (Pool -> TVar TaskGraph
tasks Pool
p) (forall (gr :: * -> * -> *) b a.
DynGraph gr =>
LEdge b -> gr a b -> gr a b
insEdge (Int
parent, Int
child, Status
Pending))
asyncSTM :: TaskGroup -> IO a -> STM (Async a)
asyncSTM :: forall a. TaskGroup -> IO a -> STM (Async a)
asyncSTM TaskGroup
p = forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO
asyncAfterAll :: TaskGroup -> [Handle] -> IO a -> IO (Async a)
asyncAfterAll :: forall a. TaskGroup -> Path -> IO a -> IO (Async a)
asyncAfterAll TaskGroup
p Path
parents IO a
t = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Async a
child <- forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO IO a
t
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Path
parents forall a b. (a -> b) -> a -> b
$ Pool -> Int -> Int -> STM ()
makeDependent (TaskGroup -> Pool
pool TaskGroup
p) (forall a. Async a -> Int
taskHandle Async a
child)
forall (m :: * -> *) a. Monad m => a -> m a
return Async a
child
asyncAfter :: TaskGroup -> Async b -> IO a -> IO (Async a)
asyncAfter :: forall b a. TaskGroup -> Async b -> IO a -> IO (Async a)
asyncAfter TaskGroup
p Async b
parent = forall a. TaskGroup -> Path -> IO a -> IO (Async a)
asyncAfterAll TaskGroup
p [forall a. Async a -> Int
taskHandle Async b
parent]
extraWorkerWhileBlocked :: TaskGroup -> IO a -> IO a
TaskGroup
p = forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (TaskGroup -> TVar Int
avail TaskGroup
p) (forall a. Num a => a -> a -> a
+ Int
1)) (forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' (TaskGroup -> TVar Int
avail TaskGroup
p) ((-) Int
1))
mapTasksWorker :: Traversable t
=> TaskGroup
-> t (IO a)
-> (IO (t b) -> IO (t c))
-> (Async a -> IO b)
-> IO (t c)
mapTasksWorker :: forall (t :: * -> *) a b c.
Traversable t =>
TaskGroup
-> t (IO a)
-> (IO (t b) -> IO (t c))
-> (Async a -> IO b)
-> IO (t c)
mapTasksWorker TaskGroup
p t (IO a)
fs IO (t b) -> IO (t c)
f Async a -> IO b
g = do
t (Async a)
hs <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM t (IO a)
fs forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsingLazy TaskGroup
p IO () -> IO ThreadId
rawForkIO
forall a. TaskGroup -> IO a -> IO a
extraWorkerWhileBlocked TaskGroup
p forall a b. (a -> b) -> a -> b
$ IO (t b) -> IO (t c)
f forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
t a -> (a -> m b) -> m (t b)
forM t (Async a)
hs Async a -> IO b
g
mapTasks :: Traversable t => TaskGroup -> t (IO a) -> IO (t a)
mapTasks :: forall (t :: * -> *) a.
Traversable t =>
TaskGroup -> t (IO a) -> IO (t a)
mapTasks TaskGroup
p t (IO a)
fs = forall (t :: * -> *) a b c.
Traversable t =>
TaskGroup
-> t (IO a)
-> (IO (t b) -> IO (t c))
-> (Async a -> IO b)
-> IO (t c)
mapTasksWorker TaskGroup
p t (IO a)
fs forall a. a -> a
id forall a. Async a -> IO a
wait
mapTasksE :: Traversable t => TaskGroup -> t (IO a) -> IO (t (Either SomeException a))
mapTasksE :: forall (t :: * -> *) a.
Traversable t =>
TaskGroup -> t (IO a) -> IO (t (Either SomeException a))
mapTasksE TaskGroup
p t (IO a)
fs = forall (t :: * -> *) a b c.
Traversable t =>
TaskGroup
-> t (IO a)
-> (IO (t b) -> IO (t c))
-> (Async a -> IO b)
-> IO (t c)
mapTasksWorker TaskGroup
p t (IO a)
fs forall a. a -> a
id forall a. Async a -> IO (Either SomeException a)
waitCatch
mapTasks_ :: Foldable t => TaskGroup -> t (IO a) -> IO ()
mapTasks_ :: forall (t :: * -> *) a.
Foldable t =>
TaskGroup -> t (IO a) -> IO ()
mapTasks_ TaskGroup
p t (IO a)
fs = forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ t (IO a)
fs forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO
mapTasksE_ :: Traversable t => TaskGroup -> t (IO a) -> IO (t (Maybe SomeException))
mapTasksE_ :: forall (t :: * -> *) a.
Traversable t =>
TaskGroup -> t (IO a) -> IO (t (Maybe SomeException))
mapTasksE_ TaskGroup
p t (IO a)
fs = forall (t :: * -> *) a b c.
Traversable t =>
TaskGroup
-> t (IO a)
-> (IO (t b) -> IO (t c))
-> (Async a -> IO b)
-> IO (t c)
mapTasksWorker TaskGroup
p t (IO a)
fs (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall a b. Either a b -> Maybe a
leftToMaybe)) forall a. Async a -> IO (Either SomeException a)
waitCatch
where
leftToMaybe :: Either a b -> Maybe a
leftToMaybe :: forall a b. Either a b -> Maybe a
leftToMaybe = forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> Maybe a
Just (forall a b. a -> b -> a
const forall a. Maybe a
Nothing)
mapRace :: Foldable t
=> TaskGroup -> t (IO a) -> IO (Async a, Either SomeException a)
mapRace :: forall (t :: * -> *) a.
Foldable t =>
TaskGroup -> t (IO a) -> IO (Async a, Either SomeException a)
mapRace TaskGroup
p t (IO a)
fs = do
[Async a]
hs <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
sequenceA forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((forall a. a -> [a] -> [a]
:[]) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO) t (IO a)
fs
forall a. [Async a] -> IO (Async a, Either SomeException a)
waitAnyCatchCancel [Async a]
hs
mapReduce :: (Foldable t, Monoid a)
=> TaskGroup
-> t (IO a)
-> STM (Async a)
mapReduce :: forall (t :: * -> *) a.
(Foldable t, Monoid a) =>
TaskGroup -> t (IO a) -> STM (Async a)
mapReduce TaskGroup
p t (IO a)
fs = do
[Async a]
hs <- forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
sequenceA forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((forall a. a -> [a] -> [a]
:[]) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO) t (IO a)
fs
forall {a}. Monoid a => [Async a] -> STM (Async a)
loopM [Async a]
hs
where
loopM :: [Async a] -> STM (Async a)
loopM [Async a]
hs = do
[Async a]
hs' <- forall {a}. Monoid a => [Async a] -> STM [Async a]
squeeze [Async a]
hs
case [Async a]
hs' of
[] -> forall a. HasCallStack => [Char] -> a
error [Char]
"mapReduce: impossible"
[Async a
x] -> forall (m :: * -> *) a. Monad m => a -> m a
return Async a
x
[Async a]
xs -> [Async a] -> STM (Async a)
loopM [Async a]
xs
squeeze :: [Async a] -> STM [Async a]
squeeze [] = (forall a. a -> [a] -> [a]
:[]) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO (forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Monoid a => a
mempty)
squeeze [Async a
x] = forall (m :: * -> *) a. Monad m => a -> m a
return [Async a
x]
squeeze (Async a
x:Async a
y:[Async a]
xs) = do
Async a
t <- forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO forall a b. (a -> b) -> a -> b
$ do
Either SomeException a
meres <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Maybe (Either SomeException a)
eres1 <- forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM Async a
x
Maybe (Either SomeException a)
eres2 <- forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM Async a
y
case forall (m :: * -> *) a1 a2 r.
Monad m =>
(a1 -> a2 -> r) -> m a1 -> m a2 -> m r
liftM2 forall a. Semigroup a => a -> a -> a
(<>) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe (Either SomeException a)
eres1 forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Maybe (Either SomeException a)
eres2 of
Maybe (Either SomeException a)
Nothing -> forall a. STM a
retry
Just Either SomeException a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
a
case Either SomeException a
meres of
Left SomeException
e -> forall e a. Exception e => e -> IO a
throwIO SomeException
e
Right a
a -> forall (m :: * -> *) a. Monad m => a -> m a
return a
a
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [Async a
x, Async a
y] (Pool -> Int -> Int -> STM ()
unsafeMakeDependent (TaskGroup -> Pool
pool TaskGroup
p) (forall a. Async a -> Int
taskHandle Async a
t) forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> Int
taskHandle)
case [Async a]
xs of
[] -> forall (m :: * -> *) a. Monad m => a -> m a
return [Async a
t]
[Async a]
_ -> (Async a
t forall a. a -> [a] -> [a]
:) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> [Async a] -> STM [Async a]
squeeze [Async a]
xs
scatterFoldMapM :: (Foldable t, Monoid b, MonadBaseControl IO m)
=> TaskGroup -> t (IO a) -> (Either SomeException a -> m b) -> m b
scatterFoldMapM :: forall (t :: * -> *) b (m :: * -> *) a.
(Foldable t, Monoid b, MonadBaseControl IO m) =>
TaskGroup -> t (IO a) -> (Either SomeException a -> m b) -> m b
scatterFoldMapM TaskGroup
p t (IO a)
fs Either SomeException a -> m b
f = do
[Async a]
hs <- forall (b :: * -> *) (m :: * -> *) α. MonadBase b m => b α -> m α
liftBase forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically
forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (f :: * -> *) a.
(Traversable t, Applicative f) =>
t (f a) -> f (t a)
sequenceA
forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) m a.
(Foldable t, Monoid m) =>
(a -> m) -> t a -> m
foldMap ((forall a. a -> [a] -> [a]
:[]) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a.
TaskGroup -> (IO () -> IO ThreadId) -> IO a -> STM (Async a)
asyncUsing TaskGroup
p IO () -> IO ThreadId
rawForkIO) t (IO a)
fs
forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
(RunInBase m b -> b (StM m a)) -> m a
control forall a b. (a -> b) -> a -> b
$ \(m b -> IO (StM m b)
run :: m b -> IO (StM m b)) -> forall {b :: * -> *}.
MonadBaseControl b m =>
(m b -> IO (StM m b)) -> IO (StM m b) -> [Async a] -> IO (StM m b)
loop m b -> IO (StM m b)
run (m b -> IO (StM m b)
run forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. Monad m => a -> m a
return forall a. Monoid a => a
mempty) (forall (t :: * -> *) a. Foldable t => t a -> [a]
toList [Async a]
hs)
where
loop :: (m b -> IO (StM m b)) -> IO (StM m b) -> [Async a] -> IO (StM m b)
loop m b -> IO (StM m b)
_ IO (StM m b)
z [] = IO (StM m b)
z
loop m b -> IO (StM m b)
run IO (StM m b)
z [Async a]
hs = do
(Async a
h, Either SomeException a
eres) <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
Maybe (Async a, Either SomeException a)
mres <- forall (t :: * -> *) (m :: * -> *) b a.
(Foldable t, Monad m) =>
(b -> a -> m b) -> b -> t a -> m b
foldM forall {b}.
Maybe (Async b, Either SomeException b)
-> Async b -> STM (Maybe (Async b, Either SomeException b))
go forall a. Maybe a
Nothing [Async a]
hs
forall b a. b -> (a -> b) -> Maybe a -> b
maybe forall a. STM a
retry forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Async a, Either SomeException a)
mres
StM m b
r' <- IO (StM m b)
z
StM m b
r <- m b -> IO (StM m b)
run forall a b. (a -> b) -> a -> b
$ do
b
s <- forall (b :: * -> *) (m :: * -> *) a.
MonadBaseControl b m =>
StM m a -> m a
restoreM StM m b
r'
b
r <- Either SomeException a -> m b
f Either SomeException a
eres
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ b
s forall a. Semigroup a => a -> a -> a
<> b
r
(m b -> IO (StM m b)) -> IO (StM m b) -> [Async a] -> IO (StM m b)
loop m b -> IO (StM m b)
run (forall (m :: * -> *) a. Monad m => a -> m a
return StM m b
r) (forall a. Eq a => a -> [a] -> [a]
delete Async a
h [Async a]
hs)
go :: Maybe (Async b, Either SomeException b)
-> Async b -> STM (Maybe (Async b, Either SomeException b))
go acc :: Maybe (Async b, Either SomeException b)
acc@(Just (Async b, Either SomeException b)
_) Async b
_ = forall (m :: * -> *) a. Monad m => a -> m a
return Maybe (Async b, Either SomeException b)
acc
go Maybe (Async b, Either SomeException b)
acc Async b
h = do
Maybe (Either SomeException b)
eres <- forall a. Async a -> STM (Maybe (Either SomeException a))
pollSTM Async b
h
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ case Maybe (Either SomeException b)
eres of
Maybe (Either SomeException b)
Nothing -> Maybe (Async b, Either SomeException b)
acc
Just (Left SomeException
e) -> forall a. a -> Maybe a
Just (Async b
h, forall a b. a -> Either a b
Left SomeException
e)
Just (Right b
x) -> forall a. a -> Maybe a
Just (Async b
h, forall a b. b -> Either a b
Right b
x)
mapConcurrently :: Traversable t => TaskGroup -> (a -> IO b) -> t a -> IO (t b)
mapConcurrently :: forall (t :: * -> *) a b.
Traversable t =>
TaskGroup -> (a -> IO b) -> t a -> IO (t b)
mapConcurrently TaskGroup
tg a -> IO b
f = forall (t :: * -> *) a.
Traversable t =>
TaskGroup -> t (IO a) -> IO (t a)
mapTasks TaskGroup
tg forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap a -> IO b
f
newtype Task a = Task { forall a. Task a -> TaskGroup -> IO (IO a)
runTask' :: TaskGroup -> IO (IO a) }
runTask :: TaskGroup -> Task a -> IO a
runTask :: forall a. TaskGroup -> Task a -> IO a
runTask TaskGroup
group Task a
ts = forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall a b. (a -> b) -> a -> b
$ forall a. Task a -> TaskGroup -> IO (IO a)
runTask' Task a
ts TaskGroup
group
task :: IO a -> Task a
task :: forall a. IO a -> Task a
task IO a
action = forall a. (TaskGroup -> IO (IO a)) -> Task a
Task forall a b. (a -> b) -> a -> b
$ \TaskGroup
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return IO a
action
instance Functor Task where
fmap :: forall a b. (a -> b) -> Task a -> Task b
fmap a -> b
f (Task TaskGroup -> IO (IO a)
k) = forall a. (TaskGroup -> IO (IO a)) -> Task a
Task forall a b. (a -> b) -> a -> b
$ forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM a -> b
f)) TaskGroup -> IO (IO a)
k
instance Applicative Task where
pure :: forall a. a -> Task a
pure a
x = forall a. (TaskGroup -> IO (IO a)) -> Task a
Task forall a b. (a -> b) -> a -> b
$ \TaskGroup
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall (m :: * -> *) a. Monad m => a -> m a
return a
x)
Task TaskGroup -> IO (IO (a -> b))
f <*> :: forall a b. Task (a -> b) -> Task a -> Task b
<*> Task TaskGroup -> IO (IO a)
x = forall a. (TaskGroup -> IO (IO a)) -> Task a
Task forall a b. (a -> b) -> a -> b
$ \TaskGroup
tg -> do
IO a
xa <- TaskGroup -> IO (IO a)
x TaskGroup
tg
IO a
x' <- forall a. Async a -> IO a
wait forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TaskGroup -> IO a -> IO (Async a)
async TaskGroup
tg IO a
xa
IO (a -> b)
fa <- TaskGroup -> IO (IO (a -> b))
f TaskGroup
tg
forall (m :: * -> *) a. Monad m => a -> m a
return forall a b. (a -> b) -> a -> b
$ IO (a -> b)
fa forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> IO a
x'
instance Monad Task where
return :: forall a. a -> Task a
return = forall (f :: * -> *) a. Applicative f => a -> f a
pure
Task TaskGroup -> IO (IO a)
m >>= :: forall a b. Task a -> (a -> Task b) -> Task b
>>= a -> Task b
f = forall a. (TaskGroup -> IO (IO a)) -> Task a
Task forall a b. (a -> b) -> a -> b
$ \TaskGroup
tg -> forall (m :: * -> *) a. Monad m => m (m a) -> m a
join (TaskGroup -> IO (IO a)
m TaskGroup
tg) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a b c. (a -> b -> c) -> b -> a -> c
flip forall a. Task a -> TaskGroup -> IO (IO a)
runTask' TaskGroup
tg forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> Task b
f
instance MonadIO Task where
liftIO :: forall a. IO a -> Task a
liftIO = forall a. IO a -> Task a
task