{-# LANGUAGE CPP                 #-}
{-# LANGUAGE TupleSections       #-}
{-# LANGUAGE DeriveDataTypeable  #-}
{-# LANGUAGE DeriveFunctor       #-}
{-# LANGUAGE DeriveGeneric       #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving  #-}
module UnliftIO.Internals.Async where

import           Control.Applicative
import           Control.Concurrent       (threadDelay, getNumCapabilities)
import qualified Control.Concurrent       as C
import           Control.Concurrent.Async (Async)
import qualified Control.Concurrent.Async as A
import           Control.Concurrent.STM
import           Control.Exception        (Exception, SomeException)
import           Control.Monad            (forever, liftM, unless, void, (>=>))
import           Control.Monad.IO.Unlift
import           Data.Foldable            (for_, traverse_)
import           Data.Typeable            (Typeable)
import           Data.IORef (IORef, readIORef, atomicWriteIORef, newIORef, atomicModifyIORef')
import qualified UnliftIO.Exception       as UE

-- For the implementation of Conc below, we do not want any of the
-- smart async exception handling logic from UnliftIO.Exception, since
-- (eg) we're low-level enough to need to explicit be throwing async
-- exceptions synchronously.
import qualified Control.Exception        as E
import           GHC.Generics             (Generic)

#if MIN_VERSION_base(4,9,0)
import           Data.Semigroup
#else
import           Data.Monoid              hiding (Alt)
#endif
import           Data.Foldable            (Foldable, toList)
import           Data.Traversable         (Traversable, for, traverse)

-- | Unlifted 'A.async'.
--
-- @since 0.1.0.0
async :: MonadUnliftIO m => m a -> m (Async a)
async :: m a -> m (Async a)
async m a
m = ((forall a. m a -> IO a) -> IO (Async a)) -> m (Async a)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Async a)) -> m (Async a))
-> ((forall a. m a -> IO a) -> IO (Async a)) -> m (Async a)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
A.async (IO a -> IO (Async a)) -> IO a -> IO (Async a)
forall a b. (a -> b) -> a -> b
$ m a -> IO a
forall a. m a -> IO a
run m a
m

-- | Unlifted 'A.asyncBound'.
--
-- @since 0.1.0.0
asyncBound :: MonadUnliftIO m => m a -> m (Async a)
asyncBound :: m a -> m (Async a)
asyncBound m a
m = ((forall a. m a -> IO a) -> IO (Async a)) -> m (Async a)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Async a)) -> m (Async a))
-> ((forall a. m a -> IO a) -> IO (Async a)) -> m (Async a)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO a -> IO (Async a)
forall a. IO a -> IO (Async a)
A.asyncBound (IO a -> IO (Async a)) -> IO a -> IO (Async a)
forall a b. (a -> b) -> a -> b
$ m a -> IO a
forall a. m a -> IO a
run m a
m

-- | Unlifted 'A.asyncOn'.
--
-- @since 0.1.0.0
asyncOn :: MonadUnliftIO m => Int -> m a -> m (Async a)
asyncOn :: Int -> m a -> m (Async a)
asyncOn Int
i m a
m = ((forall a. m a -> IO a) -> IO (Async a)) -> m (Async a)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Async a)) -> m (Async a))
-> ((forall a. m a -> IO a) -> IO (Async a)) -> m (Async a)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> Int -> IO a -> IO (Async a)
forall a. Int -> IO a -> IO (Async a)
A.asyncOn Int
i (IO a -> IO (Async a)) -> IO a -> IO (Async a)
forall a b. (a -> b) -> a -> b
$ m a -> IO a
forall a. m a -> IO a
run m a
m

-- | Unlifted 'A.asyncWithUnmask'.
--
-- @since 0.1.0.0
asyncWithUnmask :: MonadUnliftIO m => ((forall b. m b -> m b) -> m a) -> m (Async a)
asyncWithUnmask :: ((forall b. m b -> m b) -> m a) -> m (Async a)
asyncWithUnmask (forall b. m b -> m b) -> m a
m =
  ((forall a. m a -> IO a) -> IO (Async a)) -> m (Async a)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Async a)) -> m (Async a))
-> ((forall a. m a -> IO a) -> IO (Async a)) -> m (Async a)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
forall a. ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
A.asyncWithUnmask (((forall b. IO b -> IO b) -> IO a) -> IO (Async a))
-> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
unmask -> m a -> IO a
forall a. m a -> IO a
run (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ (forall b. m b -> m b) -> m a
m ((forall b. m b -> m b) -> m a) -> (forall b. m b -> m b) -> m a
forall a b. (a -> b) -> a -> b
$ IO b -> m b
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO b -> m b) -> (m b -> IO b) -> m b -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO b -> IO b
forall b. IO b -> IO b
unmask (IO b -> IO b) -> (m b -> IO b) -> m b -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m b -> IO b
forall a. m a -> IO a
run

-- | Unlifted 'A.asyncOnWithUnmask'.
--
-- @since 0.1.0.0
asyncOnWithUnmask :: MonadUnliftIO m => Int -> ((forall b. m b -> m b) -> m a) -> m (Async a)
asyncOnWithUnmask :: Int -> ((forall b. m b -> m b) -> m a) -> m (Async a)
asyncOnWithUnmask Int
i (forall b. m b -> m b) -> m a
m =
  ((forall a. m a -> IO a) -> IO (Async a)) -> m (Async a)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Async a)) -> m (Async a))
-> ((forall a. m a -> IO a) -> IO (Async a)) -> m (Async a)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> Int -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
forall a. Int -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
A.asyncOnWithUnmask Int
i (((forall b. IO b -> IO b) -> IO a) -> IO (Async a))
-> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
unmask -> m a -> IO a
forall a. m a -> IO a
run (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ (forall b. m b -> m b) -> m a
m ((forall b. m b -> m b) -> m a) -> (forall b. m b -> m b) -> m a
forall a b. (a -> b) -> a -> b
$ IO b -> m b
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO b -> m b) -> (m b -> IO b) -> m b -> m b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO b -> IO b
forall b. IO b -> IO b
unmask (IO b -> IO b) -> (m b -> IO b) -> m b -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m b -> IO b
forall a. m a -> IO a
run

-- | Unlifted 'A.withAsync'.
--
-- @since 0.1.0.0
withAsync :: MonadUnliftIO m => m a -> (Async a -> m b) -> m b
withAsync :: m a -> (Async a -> m b) -> m b
withAsync m a
a Async a -> m b
b = ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO a -> (Async a -> IO b) -> IO b
forall a b. IO a -> (Async a -> IO b) -> IO b
A.withAsync (m a -> IO a
forall a. m a -> IO a
run m a
a) (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (Async a -> m b) -> Async a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> m b
b)

-- | Unlifted 'A.withAsyncBound'.
--
-- @since 0.1.0.0
withAsyncBound :: MonadUnliftIO m => m a -> (Async a -> m b) -> m b
withAsyncBound :: m a -> (Async a -> m b) -> m b
withAsyncBound m a
a Async a -> m b
b = ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO a -> (Async a -> IO b) -> IO b
forall a b. IO a -> (Async a -> IO b) -> IO b
A.withAsyncBound (m a -> IO a
forall a. m a -> IO a
run m a
a) (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (Async a -> m b) -> Async a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> m b
b)

-- | Unlifted 'A.withAsyncOn'.
--
-- @since 0.1.0.0
withAsyncOn :: MonadUnliftIO m => Int -> m a -> (Async a -> m b) -> m b
withAsyncOn :: Int -> m a -> (Async a -> m b) -> m b
withAsyncOn Int
i m a
a Async a -> m b
b = ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> Int -> IO a -> (Async a -> IO b) -> IO b
forall a b. Int -> IO a -> (Async a -> IO b) -> IO b
A.withAsyncOn Int
i (m a -> IO a
forall a. m a -> IO a
run m a
a) (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (Async a -> m b) -> Async a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> m b
b)

-- | Unlifted 'A.withAsyncWithUnmask'.
--
-- @since 0.1.0.0
withAsyncWithUnmask
  :: MonadUnliftIO m
  => ((forall c. m c -> m c) -> m a)
  -> (Async a -> m b)
  -> m b
withAsyncWithUnmask :: ((forall c. m c -> m c) -> m a) -> (Async a -> m b) -> m b
withAsyncWithUnmask (forall c. m c -> m c) -> m a
a Async a -> m b
b =
  ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> ((forall b. IO b -> IO b) -> IO a) -> (Async a -> IO b) -> IO b
forall a b.
((forall b. IO b -> IO b) -> IO a) -> (Async a -> IO b) -> IO b
A.withAsyncWithUnmask
    (\forall b. IO b -> IO b
unmask -> m a -> IO a
forall a. m a -> IO a
run (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ (forall c. m c -> m c) -> m a
a ((forall c. m c -> m c) -> m a) -> (forall c. m c -> m c) -> m a
forall a b. (a -> b) -> a -> b
$ IO c -> m c
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO c -> m c) -> (m c -> IO c) -> m c -> m c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO c -> IO c
forall b. IO b -> IO b
unmask (IO c -> IO c) -> (m c -> IO c) -> m c -> IO c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m c -> IO c
forall a. m a -> IO a
run)
    (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (Async a -> m b) -> Async a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> m b
b)

-- | Unlifted 'A.withAsyncOnWithMask'.
--
-- @since 0.1.0.0
withAsyncOnWithUnmask
  :: MonadUnliftIO m
  => Int
  -> ((forall c. m c -> m c) -> m a)
  -> (Async a -> m b)
  -> m b
withAsyncOnWithUnmask :: Int -> ((forall c. m c -> m c) -> m a) -> (Async a -> m b) -> m b
withAsyncOnWithUnmask Int
i (forall c. m c -> m c) -> m a
a Async a -> m b
b =
  ((forall a. m a -> IO a) -> IO b) -> m b
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO b) -> m b)
-> ((forall a. m a -> IO a) -> IO b) -> m b
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> Int
-> ((forall b. IO b -> IO b) -> IO a) -> (Async a -> IO b) -> IO b
forall a b.
Int
-> ((forall b. IO b -> IO b) -> IO a) -> (Async a -> IO b) -> IO b
A.withAsyncOnWithUnmask Int
i
    (\forall b. IO b -> IO b
unmask -> m a -> IO a
forall a. m a -> IO a
run (m a -> IO a) -> m a -> IO a
forall a b. (a -> b) -> a -> b
$ (forall c. m c -> m c) -> m a
a ((forall c. m c -> m c) -> m a) -> (forall c. m c -> m c) -> m a
forall a b. (a -> b) -> a -> b
$ IO c -> m c
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO c -> m c) -> (m c -> IO c) -> m c -> m c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO c -> IO c
forall b. IO b -> IO b
unmask (IO c -> IO c) -> (m c -> IO c) -> m c -> IO c
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m c -> IO c
forall a. m a -> IO a
run)
    (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (Async a -> m b) -> Async a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> m b
b)

-- | Lifted 'A.wait'.
--
-- @since 0.1.0.0
wait :: MonadIO m => Async a -> m a
wait :: Async a -> m a
wait = IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> (Async a -> IO a) -> Async a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> IO a
forall a. Async a -> IO a
A.wait

-- | Lifted 'A.poll'.
--
-- @since 0.1.0.0
poll :: MonadIO m => Async a -> m (Maybe (Either SomeException a))
poll :: Async a -> m (Maybe (Either SomeException a))
poll = IO (Maybe (Either SomeException a))
-> m (Maybe (Either SomeException a))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Maybe (Either SomeException a))
 -> m (Maybe (Either SomeException a)))
-> (Async a -> IO (Maybe (Either SomeException a)))
-> Async a
-> m (Maybe (Either SomeException a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> IO (Maybe (Either SomeException a))
forall a. Async a -> IO (Maybe (Either SomeException a))
A.poll

-- | Lifted 'A.waitCatch'.
--
-- @since 0.1.0.0
waitCatch :: MonadIO m => Async a -> m (Either SomeException a)
waitCatch :: Async a -> m (Either SomeException a)
waitCatch = IO (Either SomeException a) -> m (Either SomeException a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Either SomeException a) -> m (Either SomeException a))
-> (Async a -> IO (Either SomeException a))
-> Async a
-> m (Either SomeException a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> IO (Either SomeException a)
forall a. Async a -> IO (Either SomeException a)
A.waitCatch

-- | Lifted 'A.cancel'.
--
-- @since 0.1.0.0
cancel :: MonadIO m => Async a -> m ()
cancel :: Async a -> m ()
cancel = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Async a -> IO ()) -> Async a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> IO ()
forall a. Async a -> IO ()
A.cancel

-- | Lifted 'A.uninterruptibleCancel'.
--
-- @since 0.1.0.0
uninterruptibleCancel :: MonadIO m => Async a -> m ()
uninterruptibleCancel :: Async a -> m ()
uninterruptibleCancel = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Async a -> IO ()) -> Async a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> IO ()
forall a. Async a -> IO ()
A.uninterruptibleCancel

-- | Lifted 'A.cancelWith'. Additionally uses 'UE.toAsyncException' to
-- ensure async exception safety.
--
-- @since 0.1.0.0
cancelWith :: (Exception e, MonadIO m) => Async a -> e -> m ()
cancelWith :: Async a -> e -> m ()
cancelWith Async a
a e
e = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Async a -> SomeException -> IO ()
forall e a. Exception e => Async a -> e -> IO ()
A.cancelWith Async a
a (e -> SomeException
forall e. Exception e => e -> SomeException
UE.toAsyncException e
e))

-- | Lifted 'A.waitAny'.
--
-- @since 0.1.0.0
waitAny :: MonadIO m => [Async a] -> m (Async a, a)
waitAny :: [Async a] -> m (Async a, a)
waitAny = IO (Async a, a) -> m (Async a, a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async a, a) -> m (Async a, a))
-> ([Async a] -> IO (Async a, a)) -> [Async a] -> m (Async a, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Async a] -> IO (Async a, a)
forall a. [Async a] -> IO (Async a, a)
A.waitAny

-- | Lifted 'A.waitAnyCatch'.
--
-- @since 0.1.0.0
waitAnyCatch :: MonadIO m => [Async a] -> m (Async a, Either SomeException a)
waitAnyCatch :: [Async a] -> m (Async a, Either SomeException a)
waitAnyCatch = IO (Async a, Either SomeException a)
-> m (Async a, Either SomeException a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async a, Either SomeException a)
 -> m (Async a, Either SomeException a))
-> ([Async a] -> IO (Async a, Either SomeException a))
-> [Async a]
-> m (Async a, Either SomeException a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Async a] -> IO (Async a, Either SomeException a)
forall a. [Async a] -> IO (Async a, Either SomeException a)
A.waitAnyCatch

-- | Lifted 'A.waitAnyCancel'.
--
-- @since 0.1.0.0
waitAnyCancel :: MonadIO m => [Async a] -> m (Async a, a)
waitAnyCancel :: [Async a] -> m (Async a, a)
waitAnyCancel = IO (Async a, a) -> m (Async a, a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async a, a) -> m (Async a, a))
-> ([Async a] -> IO (Async a, a)) -> [Async a] -> m (Async a, a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Async a] -> IO (Async a, a)
forall a. [Async a] -> IO (Async a, a)
A.waitAnyCancel

-- | Lifted 'A.waitAnyCatchCancel'.
--
-- @since 0.1.0.0
waitAnyCatchCancel :: MonadIO m => [Async a] -> m (Async a, Either SomeException a)
waitAnyCatchCancel :: [Async a] -> m (Async a, Either SomeException a)
waitAnyCatchCancel = IO (Async a, Either SomeException a)
-> m (Async a, Either SomeException a)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO (Async a, Either SomeException a)
 -> m (Async a, Either SomeException a))
-> ([Async a] -> IO (Async a, Either SomeException a))
-> [Async a]
-> m (Async a, Either SomeException a)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. [Async a] -> IO (Async a, Either SomeException a)
forall a. [Async a] -> IO (Async a, Either SomeException a)
A.waitAnyCatchCancel

-- | Lifted 'A.waitEither'.
--
-- @since 0.1.0.0
waitEither :: MonadIO m => Async a -> Async b -> m (Either a b)
waitEither :: Async a -> Async b -> m (Either a b)
waitEither Async a
a Async b
b = IO (Either a b) -> m (Either a b)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Async a -> Async b -> IO (Either a b)
forall a b. Async a -> Async b -> IO (Either a b)
A.waitEither Async a
a Async b
b)

-- | Lifted 'A.waitEitherCatch'.
--
-- @since 0.1.0.0
waitEitherCatch :: MonadIO m => Async a -> Async b -> m (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch :: Async a
-> Async b
-> m (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
a Async b
b = IO (Either (Either SomeException a) (Either SomeException b))
-> m (Either (Either SomeException a) (Either SomeException b))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
A.waitEitherCatch Async a
a Async b
b)

-- | Lifted 'A.waitEitherCancel'.
--
-- @since 0.1.0.0
waitEitherCancel :: MonadIO m => Async a -> Async b -> m (Either a b)
waitEitherCancel :: Async a -> Async b -> m (Either a b)
waitEitherCancel Async a
a Async b
b = IO (Either a b) -> m (Either a b)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Async a -> Async b -> IO (Either a b)
forall a b. Async a -> Async b -> IO (Either a b)
A.waitEitherCancel Async a
a Async b
b)

-- | Lifted 'A.waitEitherCatchCancel'.
--
-- @since 0.1.0.0
waitEitherCatchCancel :: MonadIO m => Async a -> Async b -> m (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchCancel :: Async a
-> Async b
-> m (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchCancel Async a
a Async b
b = IO (Either (Either SomeException a) (Either SomeException b))
-> m (Either (Either SomeException a) (Either SomeException b))
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
A.waitEitherCatchCancel Async a
a Async b
b)

-- | Lifted 'A.waitEither_'.
--
-- @since 0.1.0.0
waitEither_ :: MonadIO m => Async a -> Async b -> m ()
waitEither_ :: Async a -> Async b -> m ()
waitEither_ Async a
a Async b
b = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Async a -> Async b -> IO ()
forall a b. Async a -> Async b -> IO ()
A.waitEither_ Async a
a Async b
b)

-- | Lifted 'A.waitBoth'.
--
-- @since 0.1.0.0
waitBoth :: MonadIO m => Async a -> Async b -> m (a, b)
waitBoth :: Async a -> Async b -> m (a, b)
waitBoth Async a
a Async b
b = IO (a, b) -> m (a, b)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Async a -> Async b -> IO (a, b)
forall a b. Async a -> Async b -> IO (a, b)
A.waitBoth Async a
a Async b
b)

-- | Lifted 'A.link'.
--
-- @since 0.1.0.0
link :: MonadIO m => Async a -> m ()
link :: Async a -> m ()
link = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> m ()) -> (Async a -> IO ()) -> Async a -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> IO ()
forall a. Async a -> IO ()
A.link

-- | Lifted 'A.link2'.
--
-- @since 0.1.0.0
link2 :: MonadIO m => Async a -> Async b -> m ()
link2 :: Async a -> Async b -> m ()
link2 Async a
a Async b
b = IO () -> m ()
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Async a -> Async b -> IO ()
forall a b. Async a -> Async b -> IO ()
A.link2 Async a
a Async b
b)

-- | Unlifted 'A.race'.
--
-- @since 0.1.0.0
race :: MonadUnliftIO m => m a -> m b -> m (Either a b)
race :: m a -> m b -> m (Either a b)
race m a
a m b
b = ((forall a. m a -> IO a) -> IO (Either a b)) -> m (Either a b)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Either a b)) -> m (Either a b))
-> ((forall a. m a -> IO a) -> IO (Either a b)) -> m (Either a b)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO a -> IO b -> IO (Either a b)
forall a b. IO a -> IO b -> IO (Either a b)
A.race (m a -> IO a
forall a. m a -> IO a
run m a
a) (m b -> IO b
forall a. m a -> IO a
run m b
b)

-- | Unlifted 'A.race_'.
--
-- @since 0.1.0.0
race_ :: MonadUnliftIO m => m a -> m b -> m ()
race_ :: m a -> m b -> m ()
race_ m a
a m b
b = ((forall a. m a -> IO a) -> IO ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO a -> IO b -> IO ()
forall a b. IO a -> IO b -> IO ()
A.race_ (m a -> IO a
forall a. m a -> IO a
run m a
a) (m b -> IO b
forall a. m a -> IO a
run m b
b)

-- | Unlifted 'A.concurrently'.
--
-- @since 0.1.0.0
concurrently :: MonadUnliftIO m => m a -> m b -> m (a, b)
concurrently :: m a -> m b -> m (a, b)
concurrently m a
a m b
b = ((forall a. m a -> IO a) -> IO (a, b)) -> m (a, b)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (a, b)) -> m (a, b))
-> ((forall a. m a -> IO a) -> IO (a, b)) -> m (a, b)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO a -> IO b -> IO (a, b)
forall a b. IO a -> IO b -> IO (a, b)
A.concurrently (m a -> IO a
forall a. m a -> IO a
run m a
a) (m b -> IO b
forall a. m a -> IO a
run m b
b)

-- | Unlifted 'A.concurrently_'.
--
-- @since 0.1.0.0
concurrently_ :: MonadUnliftIO m => m a -> m b -> m ()
concurrently_ :: m a -> m b -> m ()
concurrently_ m a
a m b
b = ((forall a. m a -> IO a) -> IO ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> IO a -> IO b -> IO ()
forall a b. IO a -> IO b -> IO ()
A.concurrently_ (m a -> IO a
forall a. m a -> IO a
run m a
a) (m b -> IO b
forall a. m a -> IO a
run m b
b)

-- | Unlifted 'A.Concurrently'.
--
-- @since 0.1.0.0
newtype Concurrently m a = Concurrently
  { Concurrently m a -> m a
runConcurrently :: m a
  }

-- | @since 0.1.0.0
instance Monad m => Functor (Concurrently m) where
  fmap :: (a -> b) -> Concurrently m a -> Concurrently m b
fmap a -> b
f (Concurrently m a
a) = m b -> Concurrently m b
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (m b -> Concurrently m b) -> m b -> Concurrently m b
forall a b. (a -> b) -> a -> b
$ (a -> b) -> m a -> m b
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM a -> b
f m a
a

-- | @since 0.1.0.0
instance MonadUnliftIO m => Applicative (Concurrently m) where
  pure :: a -> Concurrently m a
pure = m a -> Concurrently m a
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (m a -> Concurrently m a) -> (a -> m a) -> a -> Concurrently m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return
  Concurrently m (a -> b)
fs <*> :: Concurrently m (a -> b) -> Concurrently m a -> Concurrently m b
<*> Concurrently m a
as =
    m b -> Concurrently m b
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (m b -> Concurrently m b) -> m b -> Concurrently m b
forall a b. (a -> b) -> a -> b
$ ((a -> b, a) -> b) -> m (a -> b, a) -> m b
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (\(a -> b
f, a
a) -> a -> b
f a
a) (m (a -> b) -> m a -> m (a -> b, a)
forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m (a, b)
concurrently m (a -> b)
fs m a
as)

-- | Composing two unlifted 'Concurrently' values using 'Alternative' is the
-- equivalent to using a 'race' combinator, the asynchrounous sub-routine that
-- returns a value first is the one that gets it's value returned, the slowest
-- sub-routine gets cancelled and it's thread is killed.
--
-- @since 0.1.0.0
instance MonadUnliftIO m => Alternative (Concurrently m) where
  -- | Care should be taken when using the 'empty' value of the 'Alternative'
  -- interface, as it will create a thread that delays for a long period of
  -- time. The reason behind this implementation is that any other computation
  -- will finish first than the 'empty' value. This implementation is less than
  -- ideal, and in a perfect world, we would have a typeclass family that allows
  -- '(<|>)' but not 'empty'.
  --
  -- @since 0.1.0.0
  empty :: Concurrently m a
empty = m a -> Concurrently m a
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (m a -> Concurrently m a) -> m a -> Concurrently m a
forall a b. (a -> b) -> a -> b
$ IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> IO a
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Int -> IO ()
threadDelay Int
forall a. Bounded a => a
maxBound))
  Concurrently m a
as <|> :: Concurrently m a -> Concurrently m a -> Concurrently m a
<|> Concurrently m a
bs =
    m a -> Concurrently m a
forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently (m a -> Concurrently m a) -> m a -> Concurrently m a
forall a b. (a -> b) -> a -> b
$ (Either a a -> a) -> m (Either a a) -> m a
forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM ((a -> a) -> (a -> a) -> Either a a -> a
forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either a -> a
forall a. a -> a
id a -> a
forall a. a -> a
id) (m a -> m a -> m (Either a a)
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race m a
as m a
bs)

--------------------------------------------------------------------------------
#if MIN_VERSION_base(4,9,0)
--------------------------------------------------------------------------------
-- | Only defined by @async@ for @base >= 4.9@.
--
-- @since 0.1.0.0
instance (MonadUnliftIO m, Semigroup a) => Semigroup (Concurrently m a) where
  <> :: Concurrently m a -> Concurrently m a -> Concurrently m a
(<>) = (a -> a -> a)
-> Concurrently m a -> Concurrently m a -> Concurrently m a
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 a -> a -> a
forall a. Semigroup a => a -> a -> a
(<>)

-- | @since 0.1.0.0
instance (Semigroup a, Monoid a, MonadUnliftIO m) => Monoid (Concurrently m a) where
  mempty :: Concurrently m a
mempty = a -> Concurrently m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
forall a. Monoid a => a
mempty
  mappend :: Concurrently m a -> Concurrently m a -> Concurrently m a
mappend = Concurrently m a -> Concurrently m a -> Concurrently m a
forall a. Semigroup a => a -> a -> a
(<>)
--------------------------------------------------------------------------------
#else
--------------------------------------------------------------------------------
-- | @since 0.1.0.0
instance (Monoid a, MonadUnliftIO m) => Monoid (Concurrently m a) where
  mempty = pure mempty
  mappend = liftA2 mappend
--------------------------------------------------------------------------------
#endif
--------------------------------------------------------------------------------

-- | Similar to 'mapConcurrently' but with arguments flipped
--
-- @since 0.1.0.0
forConcurrently :: MonadUnliftIO m => Traversable t => t a -> (a -> m b) -> m (t b)
forConcurrently :: t a -> (a -> m b) -> m (t b)
forConcurrently = ((a -> m b) -> t a -> m (t b)) -> t a -> (a -> m b) -> m (t b)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (a -> m b) -> t a -> m (t b)
forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
(a -> m b) -> t a -> m (t b)
mapConcurrently
{-# INLINE forConcurrently #-}

-- | Similar to 'mapConcurrently_' but with arguments flipped
--
-- @since 0.1.0.0
forConcurrently_ :: MonadUnliftIO m => Foldable f => f a -> (a -> m b) -> m ()
forConcurrently_ :: f a -> (a -> m b) -> m ()
forConcurrently_ = ((a -> m b) -> f a -> m ()) -> f a -> (a -> m b) -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (a -> m b) -> f a -> m ()
forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
(a -> m b) -> f a -> m ()
mapConcurrently_
{-# INLINE forConcurrently_ #-}

-- | Unlifted 'A.replicateConcurrently'.
--
-- @since 0.1.0.0
#if MIN_VERSION_base(4,7,0)
#else
replicateConcurrently :: (Functor m, MonadUnliftIO m) => Int -> m a -> m [a]
#endif
replicateConcurrently :: Int -> m b -> m [b]
replicateConcurrently Int
cnt m b
m =
  case Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare Int
cnt Int
1 of
    Ordering
LT -> [b] -> m [b]
forall (f :: * -> *) a. Applicative f => a -> f a
pure []
    Ordering
EQ -> (b -> [b] -> [b]
forall a. a -> [a] -> [a]
:[]) (b -> [b]) -> m b -> m [b]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> m b
m
    Ordering
GT -> (m b -> m b) -> [m b] -> m [b]
forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
(a -> m b) -> t a -> m (t b)
mapConcurrently m b -> m b
forall a. a -> a
id (Int -> m b -> [m b]
forall a. Int -> a -> [a]
replicate Int
cnt m b
m)
{-# INLINE replicateConcurrently #-}

-- | Unlifted 'A.replicateConcurrently_'.
--
-- @since 0.1.0.0
#if MIN_VERSION_base(4,7,0)
replicateConcurrently_ :: (Applicative m, MonadUnliftIO m) => Int -> m a -> m ()
#else
replicateConcurrently_ :: (MonadUnliftIO m) => Int -> m a -> m ()
#endif
replicateConcurrently_ :: Int -> m a -> m ()
replicateConcurrently_ Int
cnt m a
m =
  case Int -> Int -> Ordering
forall a. Ord a => a -> a -> Ordering
compare Int
cnt Int
1 of
    Ordering
LT -> () -> m ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Ordering
EQ -> m a -> m ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void m a
m
    Ordering
GT -> (m a -> m a) -> [m a] -> m ()
forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
(a -> m b) -> f a -> m ()
mapConcurrently_ m a -> m a
forall a. a -> a
id (Int -> m a -> [m a]
forall a. Int -> a -> [a]
replicate Int
cnt m a
m)
{-# INLINE replicateConcurrently_ #-}

-- Conc uses GHC features that are not supported in versions <= to ghc-7.10
-- so we are going to export/use it when we have a higher version only.
--------------------------------------------------------------------------------
#if MIN_VERSION_base(4,8,0)
--------------------------------------------------------------------------------

-- | Executes a 'Traversable' container of items concurrently, it uses the 'Flat'
-- type internally.
--
-- @since 0.1.0.0
mapConcurrently :: MonadUnliftIO m => Traversable t => (a -> m b) -> t a -> m (t b)
mapConcurrently :: (a -> m b) -> t a -> m (t b)
mapConcurrently a -> m b
f t a
t = ((forall a. m a -> IO a) -> IO (t b)) -> m (t b)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (t b)) -> m (t b))
-> ((forall a. m a -> IO a) -> IO (t b)) -> m (t b)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> Flat (t b) -> IO (t b)
forall a. Flat a -> IO a
runFlat (Flat (t b) -> IO (t b)) -> Flat (t b) -> IO (t b)
forall a b. (a -> b) -> a -> b
$ (a -> Flat b) -> t a -> Flat (t b)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse
  (FlatApp b -> Flat b
forall a. FlatApp a -> Flat a
FlatApp (FlatApp b -> Flat b) -> (a -> FlatApp b) -> a -> Flat b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO b -> FlatApp b
forall a. IO a -> FlatApp a
FlatAction (IO b -> FlatApp b) -> (a -> IO b) -> a -> FlatApp b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (a -> m b) -> a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f)
  t a
t
{-# INLINE mapConcurrently #-}

-- | Executes a 'Traversable' container of items concurrently, it uses the 'Flat'
-- type internally. This function ignores the results.
--
-- @since 0.1.0.0
mapConcurrently_ :: MonadUnliftIO m => Foldable f => (a -> m b) -> f a -> m ()
mapConcurrently_ :: (a -> m b) -> f a -> m ()
mapConcurrently_ a -> m b
f f a
t = ((forall a. m a -> IO a) -> IO ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> Flat () -> IO ()
forall a. Flat a -> IO a
runFlat (Flat () -> IO ()) -> Flat () -> IO ()
forall a b. (a -> b) -> a -> b
$ (a -> Flat b) -> f a -> Flat ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_
  (FlatApp b -> Flat b
forall a. FlatApp a -> Flat a
FlatApp (FlatApp b -> Flat b) -> (a -> FlatApp b) -> a -> Flat b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. IO b -> FlatApp b
forall a. IO a -> FlatApp a
FlatAction (IO b -> FlatApp b) -> (a -> IO b) -> a -> FlatApp b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (a -> m b) -> a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f)
  f a
t
{-# INLINE mapConcurrently_ #-}


-- More efficient Conc implementation

-- | A more efficient alternative to 'Concurrently', which reduces the
-- number of threads that need to be forked. For more information, see
-- [this blog post](https://www.fpcomplete.com/blog/transformations-on-applicative-concurrent-computations/).
-- This is provided as a separate type to @Concurrently@ as it has a slightly different API.
--
-- Use the 'conc' function to construct values of type 'Conc', and
-- 'runConc' to execute the composed actions. You can use the
-- @Applicative@ instance to run different actions and wait for all of
-- them to complete, or the @Alternative@ instance to wait for the
-- first thread to complete.
--
-- In the event of a runtime exception thrown by any of the children
-- threads, or an asynchronous exception received in the parent
-- thread, all threads will be killed with an 'A.AsyncCancelled'
-- exception and the original exception rethrown. If multiple
-- exceptions are generated by different threads, there are no
-- guarantees on which exception will end up getting rethrown.
--
-- For many common use cases, you may prefer using helper functions in
-- this module like 'mapConcurrently'.
--
-- There are some intentional differences in behavior to
-- @Concurrently@:
--
-- * Children threads are always launched in an unmasked state, not
--   the inherited state of the parent thread.
--
-- Note that it is a programmer error to use the @Alternative@
-- instance in such a way that there are no alternatives to an empty,
-- e.g. @runConc (empty <|> empty)@. In such a case, a 'ConcException'
-- will be thrown. If there was an @Alternative@ in the standard
-- libraries without @empty@, this library would use it instead.
--
-- @since 0.2.9.0
data Conc m a where
  Action :: m a -> Conc m a
  Apply   :: Conc m (v -> a) -> Conc m v -> Conc m a
  LiftA2 :: (x -> y -> a) -> Conc m x -> Conc m y -> Conc m a

  -- Just an optimization to avoid spawning extra threads
  Pure :: a -> Conc m a

  -- I thought there would be an optimization available from having a
  -- data constructor that explicit doesn't care about the first
  -- result. Turns out it doesn't help much: we still need to keep a
  -- TMVar below to know when the thread completes.
  --
  -- Then :: Conc m a -> Conc m b -> Conc m b

  Alt :: Conc m a -> Conc m a -> Conc m a
  Empty :: Conc m a

deriving instance Functor m => Functor (Conc m)
-- fmap f (Action routine) = Action (fmap f routine)
-- fmap f (LiftA2 g x y)   = LiftA2 (fmap f g) x y
-- fmap f (Pure val)       = Pure (f val)
-- fmap f (Alt a b)        = Alt (fmap f a) (fmap f b)
-- fmap f Empty            = Empty

-- | Construct a value of type 'Conc' from an action. Compose these
-- values using the typeclass instances (most commonly 'Applicative'
-- and 'Alternative') and then run with 'runConc'.
--
-- @since 0.2.9.0
conc :: m a -> Conc m a
conc :: m a -> Conc m a
conc = m a -> Conc m a
forall (m :: * -> *) a. m a -> Conc m a
Action
{-# INLINE conc #-}

-- | Run a 'Conc' value on multiple threads.
--
-- @since 0.2.9.0
runConc :: MonadUnliftIO m => Conc m a -> m a
runConc :: Conc m a -> m a
runConc = Conc m a -> m (Flat a)
forall (m :: * -> *) a. MonadUnliftIO m => Conc m a -> m (Flat a)
flatten (Conc m a -> m (Flat a)) -> (Flat a -> m a) -> Conc m a -> m a
forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> (IO a -> m a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO a -> m a) -> (Flat a -> IO a) -> Flat a -> m a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Flat a -> IO a
forall a. Flat a -> IO a
runFlat)
{-# INLINE runConc #-}

-- | @since 0.2.9.0
instance MonadUnliftIO m => Applicative (Conc m) where
  pure :: a -> Conc m a
pure = a -> Conc m a
forall a (m :: * -> *). a -> Conc m a
Pure
  {-# INLINE pure #-}
  -- | Following is an example of how an 'Applicative' expands to a Tree
  --
  -- @@@
  -- downloadA :: IO String
  -- downloadB :: IO String
  --
  -- (f <$> conc downloadA <*> conc downloadB <*> pure 123)
  --
  --   (((f <$> a) <*> b) <*> c))
  --        (1)    (2)    (3)
  --
  -- (1)
  --   Action (fmap f downloadA)
  -- (2)
  --   Apply (Action (fmap f downloadA)) (Action downloadB)
  -- (3)
  --   Apply (Apply (Action (fmap f downloadA)) (Action downloadB))
  --        (Pure 123)
  -- @@@
  --
  <*> :: Conc m (a -> b) -> Conc m a -> Conc m b
(<*>) = Conc m (a -> b) -> Conc m a -> Conc m b
forall (m :: * -> *) v a. Conc m (v -> a) -> Conc m v -> Conc m a
Apply
  {-# INLINE (<*>) #-}
  -- See comment above on Then
  -- (*>) = Then
#if MIN_VERSION_base(4,11,0)
  liftA2 :: (a -> b -> c) -> Conc m a -> Conc m b -> Conc m c
liftA2 = (a -> b -> c) -> Conc m a -> Conc m b -> Conc m c
forall x y a (m :: * -> *).
(x -> y -> a) -> Conc m x -> Conc m y -> Conc m a
LiftA2
  {-# INLINE liftA2 #-}
#endif

  Conc m a
a *> :: Conc m a -> Conc m b -> Conc m b
*> Conc m b
b = (a -> b -> b) -> Conc m a -> Conc m b -> Conc m b
forall x y a (m :: * -> *).
(x -> y -> a) -> Conc m x -> Conc m y -> Conc m a
LiftA2 (\a
_ b
x -> b
x) Conc m a
a Conc m b
b
  {-# INLINE (*>) #-}

-- | @since 0.2.9.0
instance MonadUnliftIO m => Alternative (Conc m) where
  empty :: Conc m a
empty = Conc m a
forall (m :: * -> *) a. Conc m a
Empty -- this is so ugly, we don't actually want to provide it!
  {-# INLINE empty #-}
  <|> :: Conc m a -> Conc m a -> Conc m a
(<|>) = Conc m a -> Conc m a -> Conc m a
forall (m :: * -> *) a. Conc m a -> Conc m a -> Conc m a
Alt
  {-# INLINE (<|>) #-}

#if MIN_VERSION_base(4, 11, 0)
-- | @since 0.2.9.0
instance (MonadUnliftIO m, Semigroup a) => Semigroup (Conc m a) where
  <> :: Conc m a -> Conc m a -> Conc m a
(<>) = (a -> a -> a) -> Conc m a -> Conc m a -> Conc m a
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 a -> a -> a
forall a. Semigroup a => a -> a -> a
(<>)
  {-# INLINE (<>) #-}
#endif

-- | @since 0.2.9.0
instance (Monoid a, MonadUnliftIO m) => Monoid (Conc m a) where
  mempty :: Conc m a
mempty = a -> Conc m a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
forall a. Monoid a => a
mempty
  {-# INLINE mempty #-}
  mappend :: Conc m a -> Conc m a -> Conc m a
mappend = (a -> a -> a) -> Conc m a -> Conc m a -> Conc m a
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 a -> a -> a
forall a. Monoid a => a -> a -> a
mappend
  {-# INLINE mappend #-}

-------------------------
-- Conc implementation --
-------------------------

-- Data types for flattening out the original @Conc@ into a simplified
-- view. Goals:
--
-- * We want to get rid of the Empty data constructor. We don't want
--   it anyway, it's only there because of the Alternative typeclass.
--
-- * We want to ensure that there is no nesting of Alt data
--   constructors. There is a bookkeeping overhead to each time we
--   need to track raced threads, and we want to minimize that
--   bookkeeping.
--
-- * We want to ensure that, when racing, we're always racing at least
--   two threads.
--
-- * We want to simplify down to IO.

-- | Flattened structure, either Applicative or Alternative
data Flat a
  = FlatApp !(FlatApp a)
  -- | Flattened Alternative. Has at least 2 entries, which must be
  -- FlatApp (no nesting of FlatAlts).
  | FlatAlt !(FlatApp a) !(FlatApp a) ![FlatApp a]

deriving instance Functor Flat
-- fmap f (FlatApp a) =
--  FlatApp (fmap f a)
-- fmap f (FlatAlt (FlatApp a) (FlatApp b) xs) =
--   FlatAlt (FlatApp (fmap f a)) (FlatApp (fmap f b)) (map (fmap f) xs)
instance Applicative Flat where
  pure :: a -> Flat a
pure = FlatApp a -> Flat a
forall a. FlatApp a -> Flat a
FlatApp (FlatApp a -> Flat a) -> (a -> FlatApp a) -> a -> Flat a
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> FlatApp a
forall (f :: * -> *) a. Applicative f => a -> f a
pure
  <*> :: Flat (a -> b) -> Flat a -> Flat b
(<*>) Flat (a -> b)
f Flat a
a = FlatApp b -> Flat b
forall a. FlatApp a -> Flat a
FlatApp (((a -> b) -> a -> b) -> Flat (a -> b) -> Flat a -> FlatApp b
forall x y a. (x -> y -> a) -> Flat x -> Flat y -> FlatApp a
FlatLiftA2 (a -> b) -> a -> b
forall a. a -> a
id Flat (a -> b)
f Flat a
a)
#if MIN_VERSION_base(4,11,0)
  liftA2 :: (a -> b -> c) -> Flat a -> Flat b -> Flat c
liftA2 a -> b -> c
f Flat a
a Flat b
b = FlatApp c -> Flat c
forall a. FlatApp a -> Flat a
FlatApp ((a -> b -> c) -> Flat a -> Flat b -> FlatApp c
forall x y a. (x -> y -> a) -> Flat x -> Flat y -> FlatApp a
FlatLiftA2 a -> b -> c
f Flat a
a Flat b
b)
#endif

-- | Flattened Applicative. No Alternative stuff directly in here, but may be in
-- the children. Notice this type doesn't have a type parameter for monadic
-- contexts, it hardwires the base monad to IO given concurrency relies
-- eventually on that.
--
-- @since 0.2.9.0
data FlatApp a where
  FlatPure   :: a -> FlatApp a
  FlatAction :: IO a -> FlatApp a
  FlatApply   :: Flat (v -> a) -> Flat v -> FlatApp a
  FlatLiftA2 :: (x -> y -> a) -> Flat x -> Flat y -> FlatApp a

deriving instance Functor FlatApp
instance Applicative FlatApp where
  pure :: a -> FlatApp a
pure = a -> FlatApp a
forall a. a -> FlatApp a
FlatPure
  <*> :: FlatApp (a -> b) -> FlatApp a -> FlatApp b
(<*>) FlatApp (a -> b)
mf FlatApp a
ma = Flat (a -> b) -> Flat a -> FlatApp b
forall v a. Flat (v -> a) -> Flat v -> FlatApp a
FlatApply (FlatApp (a -> b) -> Flat (a -> b)
forall a. FlatApp a -> Flat a
FlatApp FlatApp (a -> b)
mf) (FlatApp a -> Flat a
forall a. FlatApp a -> Flat a
FlatApp FlatApp a
ma)
#if MIN_VERSION_base(4,11,0)
  liftA2 :: (a -> b -> c) -> FlatApp a -> FlatApp b -> FlatApp c
liftA2 a -> b -> c
f FlatApp a
a FlatApp b
b = (a -> b -> c) -> Flat a -> Flat b -> FlatApp c
forall x y a. (x -> y -> a) -> Flat x -> Flat y -> FlatApp a
FlatLiftA2 a -> b -> c
f (FlatApp a -> Flat a
forall a. FlatApp a -> Flat a
FlatApp FlatApp a
a) (FlatApp b -> Flat b
forall a. FlatApp a -> Flat a
FlatApp FlatApp b
b)
#endif

-- | Things that can go wrong in the structure of a 'Conc'. These are
-- /programmer errors/.
--
-- @since 0.2.9.0
data ConcException
  = EmptyWithNoAlternative
  deriving ((forall x. ConcException -> Rep ConcException x)
-> (forall x. Rep ConcException x -> ConcException)
-> Generic ConcException
forall x. Rep ConcException x -> ConcException
forall x. ConcException -> Rep ConcException x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ConcException x -> ConcException
$cfrom :: forall x. ConcException -> Rep ConcException x
Generic, Int -> ConcException -> ShowS
[ConcException] -> ShowS
ConcException -> String
(Int -> ConcException -> ShowS)
-> (ConcException -> String)
-> ([ConcException] -> ShowS)
-> Show ConcException
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConcException] -> ShowS
$cshowList :: [ConcException] -> ShowS
show :: ConcException -> String
$cshow :: ConcException -> String
showsPrec :: Int -> ConcException -> ShowS
$cshowsPrec :: Int -> ConcException -> ShowS
Show, Typeable, ConcException -> ConcException -> Bool
(ConcException -> ConcException -> Bool)
-> (ConcException -> ConcException -> Bool) -> Eq ConcException
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConcException -> ConcException -> Bool
$c/= :: ConcException -> ConcException -> Bool
== :: ConcException -> ConcException -> Bool
$c== :: ConcException -> ConcException -> Bool
Eq, Eq ConcException
Eq ConcException
-> (ConcException -> ConcException -> Ordering)
-> (ConcException -> ConcException -> Bool)
-> (ConcException -> ConcException -> Bool)
-> (ConcException -> ConcException -> Bool)
-> (ConcException -> ConcException -> Bool)
-> (ConcException -> ConcException -> ConcException)
-> (ConcException -> ConcException -> ConcException)
-> Ord ConcException
ConcException -> ConcException -> Bool
ConcException -> ConcException -> Ordering
ConcException -> ConcException -> ConcException
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: ConcException -> ConcException -> ConcException
$cmin :: ConcException -> ConcException -> ConcException
max :: ConcException -> ConcException -> ConcException
$cmax :: ConcException -> ConcException -> ConcException
>= :: ConcException -> ConcException -> Bool
$c>= :: ConcException -> ConcException -> Bool
> :: ConcException -> ConcException -> Bool
$c> :: ConcException -> ConcException -> Bool
<= :: ConcException -> ConcException -> Bool
$c<= :: ConcException -> ConcException -> Bool
< :: ConcException -> ConcException -> Bool
$c< :: ConcException -> ConcException -> Bool
compare :: ConcException -> ConcException -> Ordering
$ccompare :: ConcException -> ConcException -> Ordering
$cp1Ord :: Eq ConcException
Ord)
instance E.Exception ConcException

-- | Simple difference list, for nicer types below
type DList a = [a] -> [a]

dlistConcat :: DList a -> DList a -> DList a
dlistConcat :: DList a -> DList a -> DList a
dlistConcat = DList a -> DList a -> DList a
forall b c a. (b -> c) -> (a -> b) -> a -> c
(.)
{-# INLINE dlistConcat #-}

dlistCons :: a -> DList a -> DList a
dlistCons :: a -> DList a -> DList a
dlistCons a
a DList a
as = a -> DList a
forall a. a -> [a] -> [a]
dlistSingleton a
a DList a -> DList a -> DList a
forall a. DList a -> DList a -> DList a
`dlistConcat` DList a
as
{-# INLINE dlistCons #-}

dlistConcatAll :: [DList a] -> DList a
dlistConcatAll :: [DList a] -> DList a
dlistConcatAll = (DList a -> DList a -> DList a) -> DList a -> [DList a] -> DList a
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr DList a -> DList a -> DList a
forall b c a. (b -> c) -> (a -> b) -> a -> c
(.) DList a
forall a. a -> a
id
{-# INLINE dlistConcatAll #-}

dlistToList :: DList a -> [a]
dlistToList :: DList a -> [a]
dlistToList = (DList a -> DList a
forall a b. (a -> b) -> a -> b
$ [])
{-# INLINE dlistToList #-}

dlistSingleton :: a -> DList a
dlistSingleton :: a -> DList a
dlistSingleton a
a = (a
aa -> DList a
forall a. a -> [a] -> [a]
:)
{-# INLINE dlistSingleton #-}

dlistEmpty :: DList a
dlistEmpty :: DList a
dlistEmpty = DList a
forall a. a -> a
id
{-# INLINE dlistEmpty #-}

-- | Turn a 'Conc' into a 'Flat'. Note that thanks to the ugliness of
-- 'empty', this may fail, e.g. @flatten Empty@.
--
-- @since 0.2.9.0
flatten :: forall m a. MonadUnliftIO m => Conc m a -> m (Flat a)
flatten :: Conc m a -> m (Flat a)
flatten Conc m a
c0 = ((forall a. m a -> IO a) -> IO (Flat a)) -> m (Flat a)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (Flat a)) -> m (Flat a))
-> ((forall a. m a -> IO a) -> IO (Flat a)) -> m (Flat a)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do

  -- why not app?
  let both :: forall k. Conc m k -> IO (Flat k)
      both :: Conc m k -> IO (Flat k)
both Conc m k
Empty = ConcException -> IO (Flat k)
forall e a. Exception e => e -> IO a
E.throwIO ConcException
EmptyWithNoAlternative
      both (Action m k
m) = Flat k -> IO (Flat k)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Flat k -> IO (Flat k)) -> Flat k -> IO (Flat k)
forall a b. (a -> b) -> a -> b
$ FlatApp k -> Flat k
forall a. FlatApp a -> Flat a
FlatApp (FlatApp k -> Flat k) -> FlatApp k -> Flat k
forall a b. (a -> b) -> a -> b
$ IO k -> FlatApp k
forall a. IO a -> FlatApp a
FlatAction (IO k -> FlatApp k) -> IO k -> FlatApp k
forall a b. (a -> b) -> a -> b
$ m k -> IO k
forall a. m a -> IO a
run m k
m
      both (Apply Conc m (v -> k)
cf Conc m v
ca) = do
        Flat (v -> k)
f <- Conc m (v -> k) -> IO (Flat (v -> k))
forall k. Conc m k -> IO (Flat k)
both Conc m (v -> k)
cf
        Flat v
a <- Conc m v -> IO (Flat v)
forall k. Conc m k -> IO (Flat k)
both Conc m v
ca
        Flat k -> IO (Flat k)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Flat k -> IO (Flat k)) -> Flat k -> IO (Flat k)
forall a b. (a -> b) -> a -> b
$ FlatApp k -> Flat k
forall a. FlatApp a -> Flat a
FlatApp (FlatApp k -> Flat k) -> FlatApp k -> Flat k
forall a b. (a -> b) -> a -> b
$ Flat (v -> k) -> Flat v -> FlatApp k
forall v a. Flat (v -> a) -> Flat v -> FlatApp a
FlatApply Flat (v -> k)
f Flat v
a
      both (LiftA2 x -> y -> k
f Conc m x
ca Conc m y
cb) = do
        Flat x
a <- Conc m x -> IO (Flat x)
forall k. Conc m k -> IO (Flat k)
both Conc m x
ca
        Flat y
b <- Conc m y -> IO (Flat y)
forall k. Conc m k -> IO (Flat k)
both Conc m y
cb
        Flat k -> IO (Flat k)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Flat k -> IO (Flat k)) -> Flat k -> IO (Flat k)
forall a b. (a -> b) -> a -> b
$ FlatApp k -> Flat k
forall a. FlatApp a -> Flat a
FlatApp (FlatApp k -> Flat k) -> FlatApp k -> Flat k
forall a b. (a -> b) -> a -> b
$ (x -> y -> k) -> Flat x -> Flat y -> FlatApp k
forall x y a. (x -> y -> a) -> Flat x -> Flat y -> FlatApp a
FlatLiftA2 x -> y -> k
f Flat x
a Flat y
b
      both (Alt Conc m k
ca Conc m k
cb) = do
        DList (FlatApp k)
a <- Conc m k -> IO (DList (FlatApp k))
forall k. Conc m k -> IO (DList (FlatApp k))
alt Conc m k
ca
        DList (FlatApp k)
b <- Conc m k -> IO (DList (FlatApp k))
forall k. Conc m k -> IO (DList (FlatApp k))
alt Conc m k
cb
        case DList (FlatApp k) -> [FlatApp k]
forall a. DList a -> [a]
dlistToList (DList (FlatApp k)
a DList (FlatApp k) -> DList (FlatApp k) -> DList (FlatApp k)
forall a. DList a -> DList a -> DList a
`dlistConcat` DList (FlatApp k)
b) of
          []    -> ConcException -> IO (Flat k)
forall e a. Exception e => e -> IO a
E.throwIO ConcException
EmptyWithNoAlternative
          [FlatApp k
x]   -> Flat k -> IO (Flat k)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Flat k -> IO (Flat k)) -> Flat k -> IO (Flat k)
forall a b. (a -> b) -> a -> b
$ FlatApp k -> Flat k
forall a. FlatApp a -> Flat a
FlatApp FlatApp k
x
          FlatApp k
x:FlatApp k
y:[FlatApp k]
z -> Flat k -> IO (Flat k)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Flat k -> IO (Flat k)) -> Flat k -> IO (Flat k)
forall a b. (a -> b) -> a -> b
$ FlatApp k -> FlatApp k -> [FlatApp k] -> Flat k
forall a. FlatApp a -> FlatApp a -> [FlatApp a] -> Flat a
FlatAlt FlatApp k
x FlatApp k
y [FlatApp k]
z
      both (Pure k
a) = Flat k -> IO (Flat k)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (Flat k -> IO (Flat k)) -> Flat k -> IO (Flat k)
forall a b. (a -> b) -> a -> b
$ FlatApp k -> Flat k
forall a. FlatApp a -> Flat a
FlatApp (FlatApp k -> Flat k) -> FlatApp k -> Flat k
forall a b. (a -> b) -> a -> b
$ k -> FlatApp k
forall a. a -> FlatApp a
FlatPure k
a

      -- Returns a difference list for cheaper concatenation
      alt :: forall k. Conc m k -> IO (DList (FlatApp k))
      alt :: Conc m k -> IO (DList (FlatApp k))
alt Conc m k
Empty = DList (FlatApp k) -> IO (DList (FlatApp k))
forall (f :: * -> *) a. Applicative f => a -> f a
pure DList (FlatApp k)
forall a. DList a
dlistEmpty
      alt (Apply Conc m (v -> k)
cf Conc m v
ca) = do
        Flat (v -> k)
f <- Conc m (v -> k) -> IO (Flat (v -> k))
forall k. Conc m k -> IO (Flat k)
both Conc m (v -> k)
cf
        Flat v
a <- Conc m v -> IO (Flat v)
forall k. Conc m k -> IO (Flat k)
both Conc m v
ca
        DList (FlatApp k) -> IO (DList (FlatApp k))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (FlatApp k -> DList (FlatApp k)
forall a. a -> [a] -> [a]
dlistSingleton (FlatApp k -> DList (FlatApp k)) -> FlatApp k -> DList (FlatApp k)
forall a b. (a -> b) -> a -> b
$ Flat (v -> k) -> Flat v -> FlatApp k
forall v a. Flat (v -> a) -> Flat v -> FlatApp a
FlatApply Flat (v -> k)
f Flat v
a)
      alt (Alt Conc m k
ca Conc m k
cb) = do
        DList (FlatApp k)
a <- Conc m k -> IO (DList (FlatApp k))
forall k. Conc m k -> IO (DList (FlatApp k))
alt Conc m k
ca
        DList (FlatApp k)
b <- Conc m k -> IO (DList (FlatApp k))
forall k. Conc m k -> IO (DList (FlatApp k))
alt Conc m k
cb
        DList (FlatApp k) -> IO (DList (FlatApp k))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (DList (FlatApp k) -> IO (DList (FlatApp k)))
-> DList (FlatApp k) -> IO (DList (FlatApp k))
forall a b. (a -> b) -> a -> b
$ DList (FlatApp k)
a DList (FlatApp k) -> DList (FlatApp k) -> DList (FlatApp k)
forall a. DList a -> DList a -> DList a
`dlistConcat` DList (FlatApp k)
b
      alt (Action m k
m) = DList (FlatApp k) -> IO (DList (FlatApp k))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (FlatApp k -> DList (FlatApp k)
forall a. a -> [a] -> [a]
dlistSingleton (FlatApp k -> DList (FlatApp k)) -> FlatApp k -> DList (FlatApp k)
forall a b. (a -> b) -> a -> b
$ IO k -> FlatApp k
forall a. IO a -> FlatApp a
FlatAction (m k -> IO k
forall a. m a -> IO a
run m k
m))
      alt (LiftA2 x -> y -> k
f Conc m x
ca Conc m y
cb) = do
        Flat x
a <- Conc m x -> IO (Flat x)
forall k. Conc m k -> IO (Flat k)
both Conc m x
ca
        Flat y
b <- Conc m y -> IO (Flat y)
forall k. Conc m k -> IO (Flat k)
both Conc m y
cb
        DList (FlatApp k) -> IO (DList (FlatApp k))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (FlatApp k -> DList (FlatApp k)
forall a. a -> [a] -> [a]
dlistSingleton (FlatApp k -> DList (FlatApp k)) -> FlatApp k -> DList (FlatApp k)
forall a b. (a -> b) -> a -> b
$ (x -> y -> k) -> Flat x -> Flat y -> FlatApp k
forall x y a. (x -> y -> a) -> Flat x -> Flat y -> FlatApp a
FlatLiftA2 x -> y -> k
f Flat x
a Flat y
b)
      alt (Pure k
a) = DList (FlatApp k) -> IO (DList (FlatApp k))
forall (f :: * -> *) a. Applicative f => a -> f a
pure (FlatApp k -> DList (FlatApp k)
forall a. a -> [a] -> [a]
dlistSingleton (FlatApp k -> DList (FlatApp k)) -> FlatApp k -> DList (FlatApp k)
forall a b. (a -> b) -> a -> b
$ k -> FlatApp k
forall a. a -> FlatApp a
FlatPure k
a)

  Conc m a -> IO (Flat a)
forall k. Conc m k -> IO (Flat k)
both Conc m a
c0

-- | Run a @Flat a@ on multiple threads.
runFlat :: Flat a -> IO a

-- Silly, simple optimizations
runFlat :: Flat a -> IO a
runFlat (FlatApp (FlatAction IO a
io)) = IO a
io
runFlat (FlatApp (FlatPure a
x)) = a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
x

-- Start off with all exceptions masked so we can install proper cleanup.
runFlat Flat a
f0 = ((forall b. IO b -> IO b) -> IO a) -> IO a
forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
E.uninterruptibleMask (((forall b. IO b -> IO b) -> IO a) -> IO a)
-> ((forall b. IO b -> IO b) -> IO a) -> IO a
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore -> do
  -- How many threads have been spawned and finished their task? We need to
  -- ensure we kill all child threads and wait for them to die.
  TVar Int
resultCountVar <- Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0

  -- Forks off as many threads as necessary to run the given Flat a,
  -- and returns:
  --
  -- + An STM action that will block until completion and return the
  --   result.
  --
  -- + The IDs of all forked threads. These need to be tracked so they
  --   can be killed (either when an exception is thrown, or when one
  --   of the alternatives completes first).
  --
  -- It would be nice to have the returned STM action return an Either
  -- and keep the SomeException values somewhat explicit, but in all
  -- my testing this absolutely kills performance. Instead, we're
  -- going to use a hack of providing a TMVar to fill up with a
  -- SomeException when things fail.
  --
  -- TODO: Investigate why performance degradation on Either
  let go :: forall a.
            TMVar E.SomeException
         -> Flat a
         -> IO (STM a, DList C.ThreadId)
      go :: TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
go TMVar SomeException
_excVar (FlatApp (FlatPure a
x)) = (STM a, DList ThreadId) -> IO (STM a, DList ThreadId)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (a -> STM a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
x, DList ThreadId
forall a. DList a
dlistEmpty)
      go TMVar SomeException
excVar (FlatApp (FlatAction IO a
io)) = do
        TMVar a
resVar <- IO (TMVar a)
forall a. IO (TMVar a)
newEmptyTMVarIO
        ThreadId
tid <- ((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId
C.forkIOWithUnmask (((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId)
-> ((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore1 -> do
          Either SomeException a
res <- IO a -> IO (Either SomeException a)
forall e a. Exception e => IO a -> IO (Either e a)
E.try (IO a -> IO (Either SomeException a))
-> IO a -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ IO a -> IO a
forall b. IO b -> IO b
restore1 IO a
io
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
resultCountVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            case Either SomeException a
res of
              Left SomeException
e  -> STM Bool -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM Bool -> STM ()) -> STM Bool -> STM ()
forall a b. (a -> b) -> a -> b
$ TMVar SomeException -> SomeException -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar SomeException
excVar SomeException
e
              Right a
x -> TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar a
resVar a
x
        (STM a, DList ThreadId) -> IO (STM a, DList ThreadId)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (TMVar a -> STM a
forall a. TMVar a -> STM a
readTMVar TMVar a
resVar, ThreadId -> DList ThreadId
forall a. a -> [a] -> [a]
dlistSingleton ThreadId
tid)
      go TMVar SomeException
excVar (FlatApp (FlatApply Flat (v -> a)
cf Flat v
ca)) = do
        (STM (v -> a)
f, DList ThreadId
tidsf) <- TMVar SomeException
-> Flat (v -> a) -> IO (STM (v -> a), DList ThreadId)
forall a.
TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
go TMVar SomeException
excVar Flat (v -> a)
cf
        (STM v
a, DList ThreadId
tidsa) <- TMVar SomeException -> Flat v -> IO (STM v, DList ThreadId)
forall a.
TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
go TMVar SomeException
excVar Flat v
ca
        (STM a, DList ThreadId) -> IO (STM a, DList ThreadId)
forall (f :: * -> *) a. Applicative f => a -> f a
pure (STM (v -> a)
f STM (v -> a) -> STM v -> STM a
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM v
a, DList ThreadId
tidsf DList ThreadId -> DList ThreadId -> DList ThreadId
forall a. DList a -> DList a -> DList a
`dlistConcat` DList ThreadId
tidsa)
      go TMVar SomeException
excVar (FlatApp (FlatLiftA2 x -> y -> a
f Flat x
a Flat y
b)) = do
        (STM x
a', DList ThreadId
tidsa) <- TMVar SomeException -> Flat x -> IO (STM x, DList ThreadId)
forall a.
TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
go TMVar SomeException
excVar Flat x
a
        (STM y
b', DList ThreadId
tidsb) <- TMVar SomeException -> Flat y -> IO (STM y, DList ThreadId)
forall a.
TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
go TMVar SomeException
excVar Flat y
b
        (STM a, DList ThreadId) -> IO (STM a, DList ThreadId)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ((x -> y -> a) -> STM x -> STM y -> STM a
forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 x -> y -> a
f STM x
a' STM y
b', DList ThreadId
tidsa DList ThreadId -> DList ThreadId -> DList ThreadId
forall a. DList a -> DList a -> DList a
`dlistConcat` DList ThreadId
tidsb)

      go TMVar SomeException
excVar0 (FlatAlt FlatApp a
x FlatApp a
y [FlatApp a]
z) = do
        -- As soon as one of the children finishes, we need to kill the siblings,
        -- we're going to create our own excVar here to pass to the children, so
        -- we can prevent the ThreadKilled exceptions we throw to the children
        -- here from propagating and taking down the whole system.
        TMVar SomeException
excVar <- IO (TMVar SomeException)
forall a. IO (TMVar a)
newEmptyTMVarIO
        TMVar a
resVar <- IO (TMVar a)
forall a. IO (TMVar a)
newEmptyTMVarIO
        [(STM a, DList ThreadId)]
pairs <- (FlatApp a -> IO (STM a, DList ThreadId))
-> [FlatApp a] -> IO [(STM a, DList ThreadId)]
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
forall a.
TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
go TMVar SomeException
excVar (Flat a -> IO (STM a, DList ThreadId))
-> (FlatApp a -> Flat a) -> FlatApp a -> IO (STM a, DList ThreadId)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. FlatApp a -> Flat a
forall a. FlatApp a -> Flat a
FlatApp) (FlatApp a
xFlatApp a -> [FlatApp a] -> [FlatApp a]
forall a. a -> [a] -> [a]
:FlatApp a
yFlatApp a -> [FlatApp a] -> [FlatApp a]
forall a. a -> [a] -> [a]
:[FlatApp a]
z)
        let ([STM a]
blockers, [DList ThreadId]
workerTids) = [(STM a, DList ThreadId)] -> ([STM a], [DList ThreadId])
forall a b. [(a, b)] -> ([a], [b])
unzip [(STM a, DList ThreadId)]
pairs

        -- Fork a helper thread to wait for the first child to
        -- complete, or for one of them to die with an exception so we
        -- can propagate it to excVar0.
        ThreadId
helperTid <- ((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId
C.forkIOWithUnmask (((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId)
-> ((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore1 -> do
          Either SomeException (Either SomeException a)
eres <- IO (Either SomeException a)
-> IO (Either SomeException (Either SomeException a))
forall e a. Exception e => IO a -> IO (Either e a)
E.try (IO (Either SomeException a)
 -> IO (Either SomeException (Either SomeException a)))
-> IO (Either SomeException a)
-> IO (Either SomeException (Either SomeException a))
forall a b. (a -> b) -> a -> b
$ IO (Either SomeException a) -> IO (Either SomeException a)
forall b. IO b -> IO b
restore1 (IO (Either SomeException a) -> IO (Either SomeException a))
-> IO (Either SomeException a) -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ STM (Either SomeException a) -> IO (Either SomeException a)
forall a. STM a -> IO a
atomically (STM (Either SomeException a) -> IO (Either SomeException a))
-> STM (Either SomeException a) -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ (STM a
 -> STM (Either SomeException a) -> STM (Either SomeException a))
-> STM (Either SomeException a)
-> [STM a]
-> STM (Either SomeException a)
forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr
            (\STM a
blocker STM (Either SomeException a)
rest -> (a -> Either SomeException a
forall a b. b -> Either a b
Right (a -> Either SomeException a)
-> STM a -> STM (Either SomeException a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM a
blocker) STM (Either SomeException a)
-> STM (Either SomeException a) -> STM (Either SomeException a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> STM (Either SomeException a)
rest)
            (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> STM SomeException -> STM (Either SomeException a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar SomeException -> STM SomeException
forall a. TMVar a -> STM a
readTMVar TMVar SomeException
excVar)
            [STM a]
blockers
          STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
            TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
resultCountVar (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
            case Either SomeException (Either SomeException a)
eres of
              -- NOTE: The child threads are spawned from @traverse go@ call above, they
              -- are _not_ children of this helper thread, and helper thread doesn't throw
              -- synchronous exceptions, so, any exception that the try above would catch
              -- must be an async exception.
              -- We were killed by an async exception, do nothing.
              Left (SomeException
_ :: E.SomeException) -> () -> STM ()
forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
              -- Child thread died, propagate it
              Right (Left SomeException
e)              -> STM Bool -> STM ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (STM Bool -> STM ()) -> STM Bool -> STM ()
forall a b. (a -> b) -> a -> b
$ TMVar SomeException -> SomeException -> STM Bool
forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar SomeException
excVar0 SomeException
e
              -- Successful result from one of the children
              Right (Right a
res)           -> TMVar a -> a -> STM ()
forall a. TMVar a -> a -> STM ()
putTMVar TMVar a
resVar a
res

          -- And kill all of the threads
          [DList ThreadId] -> (DList ThreadId -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [DList ThreadId]
workerTids ((DList ThreadId -> IO ()) -> IO ())
-> (DList ThreadId -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \DList ThreadId
tids' ->
            -- NOTE: Replacing A.AsyncCancelled with KillThread as the
            -- 'A.AsyncCancelled' constructor is not exported in older versions
            -- of the async package
            -- for_ (tids' []) $ \workerTid -> E.throwTo workerTid A.AsyncCancelled
            [ThreadId] -> (ThreadId -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (DList ThreadId -> [ThreadId]
forall a. DList a -> [a]
dlistToList DList ThreadId
tids') ((ThreadId -> IO ()) -> IO ()) -> (ThreadId -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ThreadId
workerTid -> ThreadId -> IO ()
C.killThread ThreadId
workerTid

        (STM a, DList ThreadId) -> IO (STM a, DList ThreadId)
forall (f :: * -> *) a. Applicative f => a -> f a
pure ( TMVar a -> STM a
forall a. TMVar a -> STM a
readTMVar TMVar a
resVar
             , ThreadId
helperTid ThreadId -> DList ThreadId -> DList ThreadId
forall a. a -> DList a -> DList a
`dlistCons` [DList ThreadId] -> DList ThreadId
forall a. [DList a] -> DList a
dlistConcatAll [DList ThreadId]
workerTids
             )

  TMVar SomeException
excVar <- IO (TMVar SomeException)
forall a. IO (TMVar a)
newEmptyTMVarIO
  (STM a
getRes, DList ThreadId
tids0) <- TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
forall a.
TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
go TMVar SomeException
excVar Flat a
f0
  let tids :: [ThreadId]
tids = DList ThreadId -> [ThreadId]
forall a. DList a -> [a]
dlistToList DList ThreadId
tids0
      tidCount :: Int
tidCount = [ThreadId] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ThreadId]
tids
      allDone :: Int -> Bool
allDone Int
count =
        if Int
count Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
> Int
tidCount
          then String -> Bool
forall a. HasCallStack => String -> a
error (String
"allDone: count ("
                      String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
count
                      String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
") should never be greater than tidCount ("
                      String -> ShowS
forall a. Semigroup a => a -> a -> a
<> Int -> String
forall a. Show a => a -> String
show Int
tidCount
                      String -> ShowS
forall a. Semigroup a => a -> a -> a
<> String
")")
          else Int
count Int -> Int -> Bool
forall a. Eq a => a -> a -> Bool
== Int
tidCount

  -- Automatically retry if we get killed by a
  -- BlockedIndefinitelyOnSTM. For more information, see:
  --
  -- + https:\/\/github.com\/simonmar\/async\/issues\/14
  -- + https:\/\/github.com\/simonmar\/async\/pull\/15
  --
  let autoRetry :: IO a -> IO a
autoRetry IO a
action =
        IO a
action IO a -> (BlockedIndefinitelyOnSTM -> IO a) -> IO a
forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch`
        \BlockedIndefinitelyOnSTM
E.BlockedIndefinitelyOnSTM -> IO a -> IO a
autoRetry IO a
action

  -- Restore the original masking state while blocking and catch
  -- exceptions to allow the parent thread to be killed early.
  Either SomeException (Either SomeException a)
res <- IO (Either SomeException a)
-> IO (Either SomeException (Either SomeException a))
forall e a. Exception e => IO a -> IO (Either e a)
E.try (IO (Either SomeException a)
 -> IO (Either SomeException (Either SomeException a)))
-> IO (Either SomeException a)
-> IO (Either SomeException (Either SomeException a))
forall a b. (a -> b) -> a -> b
$ IO (Either SomeException a) -> IO (Either SomeException a)
forall b. IO b -> IO b
restore (IO (Either SomeException a) -> IO (Either SomeException a))
-> IO (Either SomeException a) -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ IO (Either SomeException a) -> IO (Either SomeException a)
forall b. IO b -> IO b
autoRetry (IO (Either SomeException a) -> IO (Either SomeException a))
-> IO (Either SomeException a) -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$ STM (Either SomeException a) -> IO (Either SomeException a)
forall a. STM a -> IO a
atomically (STM (Either SomeException a) -> IO (Either SomeException a))
-> STM (Either SomeException a) -> IO (Either SomeException a)
forall a b. (a -> b) -> a -> b
$
         (SomeException -> Either SomeException a
forall a b. a -> Either a b
Left (SomeException -> Either SomeException a)
-> STM SomeException -> STM (Either SomeException a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> TMVar SomeException -> STM SomeException
forall a. TMVar a -> STM a
readTMVar TMVar SomeException
excVar) STM (Either SomeException a)
-> STM (Either SomeException a) -> STM (Either SomeException a)
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
         (a -> Either SomeException a
forall a b. b -> Either a b
Right (a -> Either SomeException a)
-> STM a -> STM (Either SomeException a)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM a
getRes)

  Int
count0 <- STM Int -> IO Int
forall a. STM a -> IO a
atomically (STM Int -> IO Int) -> STM Int -> IO Int
forall a b. (a -> b) -> a -> b
$ TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
resultCountVar
  Bool -> IO () -> IO ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Int -> Bool
allDone Int
count0) (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    -- Kill all of the threads
    -- NOTE: Replacing A.AsyncCancelled with KillThread as the
    -- 'A.AsyncCancelled' constructor is not exported in older versions
    -- of the async package
    -- for_ tids $ \tid -> E.throwTo tid A.AsyncCancelled
    [ThreadId] -> (ThreadId -> IO ()) -> IO ()
forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [ThreadId]
tids ((ThreadId -> IO ()) -> IO ()) -> (ThreadId -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ThreadId
tid -> ThreadId -> IO ()
C.killThread ThreadId
tid

    -- Wait for all of the threads to die. We're going to restore the original
    -- masking state here, just in case there's a bug in the cleanup code of a
    -- child thread, so that we can be killed by an async exception. We decided
    -- this is a better behavior than hanging indefinitely and wait for a SIGKILL.
    IO () -> IO ()
forall b. IO b -> IO b
restore (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
      Int
count <- TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
resultCountVar
      -- retries until resultCountVar has increased to the threadId count returned by go
      Bool -> STM ()
check (Bool -> STM ()) -> Bool -> STM ()
forall a b. (a -> b) -> a -> b
$ Int -> Bool
allDone Int
count

  -- Return the result or throw an exception. Yes, we could use
  -- either or join, but explicit pattern matching is nicer here.
  case Either SomeException (Either SomeException a)
res of
    -- Parent thread was killed with an async exception
    Left SomeException
e          -> SomeException -> IO a
forall e a. Exception e => e -> IO a
E.throwIO (SomeException
e :: E.SomeException)
    -- Some child thread died
    Right (Left SomeException
e)  -> SomeException -> IO a
forall e a. Exception e => e -> IO a
E.throwIO SomeException
e
    -- Everything worked!
    Right (Right a
x) -> a -> IO a
forall (f :: * -> *) a. Applicative f => a -> f a
pure a
x
{-# INLINEABLE runFlat #-}

--------------------------------------------------------------------------------
#else
--------------------------------------------------------------------------------

-- | Unlifted 'A.mapConcurrently'.
--
-- @since 0.1.0.0
mapConcurrently :: MonadUnliftIO m => Traversable t => (a -> m b) -> t a -> m (t b)
mapConcurrently f t = withRunInIO $ \run -> A.mapConcurrently (run . f) t
{-# INLINE mapConcurrently #-}

-- | Unlifted 'A.mapConcurrently_'.
--
-- @since 0.1.0.0
mapConcurrently_ :: MonadUnliftIO m => Foldable f => (a -> m b) -> f a -> m ()
mapConcurrently_ f t = withRunInIO $ \run -> A.mapConcurrently_ (run . f) t
{-# INLINE mapConcurrently_ #-}

--------------------------------------------------------------------------------
#endif
--------------------------------------------------------------------------------

-- | Like 'mapConcurrently' from async, but instead of one thread per
-- element, it does pooling from a set of threads. This is useful in
-- scenarios where resource consumption is bounded and for use cases
-- where too many concurrent tasks aren't allowed.
--
-- === __Example usage__
--
-- @
-- import Say
--
-- action :: Int -> IO Int
-- action n = do
--   tid <- myThreadId
--   sayString $ show tid
--   threadDelay (2 * 10^6) -- 2 seconds
--   return n
--
-- main :: IO ()
-- main = do
--   yx \<- pooledMapConcurrentlyN 5 (\\x -\> action x) [1..5]
--   print yx
-- @
--
-- On executing you can see that five threads have been spawned:
--
-- @
-- \$ ./pool
-- ThreadId 36
-- ThreadId 38
-- ThreadId 40
-- ThreadId 42
-- ThreadId 44
-- [1,2,3,4,5]
-- @
--
--
-- Let's modify the above program such that there are less threads
-- than the number of items in the list:
--
-- @
-- import Say
--
-- action :: Int -> IO Int
-- action n = do
--   tid <- myThreadId
--   sayString $ show tid
--   threadDelay (2 * 10^6) -- 2 seconds
--   return n
--
-- main :: IO ()
-- main = do
--   yx \<- pooledMapConcurrentlyN 3 (\\x -\> action x) [1..5]
--   print yx
-- @
-- On executing you can see that only three threads are active totally:
--
-- @
-- \$ ./pool
-- ThreadId 35
-- ThreadId 37
-- ThreadId 39
-- ThreadId 35
-- ThreadId 39
-- [1,2,3,4,5]
-- @
--
-- @since 0.2.10
pooledMapConcurrentlyN :: (MonadUnliftIO m, Traversable t)
                      => Int -- ^ Max. number of threads. Should not be less than 1.
                      -> (a -> m b) -> t a -> m (t b)
pooledMapConcurrentlyN :: Int -> (a -> m b) -> t a -> m (t b)
pooledMapConcurrentlyN Int
numProcs a -> m b
f t a
xs =
    ((forall a. m a -> IO a) -> IO (t b)) -> m (t b)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (t b)) -> m (t b))
-> ((forall a. m a -> IO a) -> IO (t b)) -> m (t b)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> Int -> (a -> IO b) -> t a -> IO (t b)
forall (t :: * -> *) a b.
Traversable t =>
Int -> (a -> IO b) -> t a -> IO (t b)
pooledMapConcurrentlyIO Int
numProcs (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (a -> m b) -> a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f) t a
xs

-- | Similar to 'pooledMapConcurrentlyN' but with number of threads
-- set from 'getNumCapabilities'. Usually this is useful for CPU bound
-- tasks.
--
-- @since 0.2.10
pooledMapConcurrently :: (MonadUnliftIO m, Traversable t) => (a -> m b) -> t a -> m (t b)
pooledMapConcurrently :: (a -> m b) -> t a -> m (t b)
pooledMapConcurrently a -> m b
f t a
xs = do
  ((forall a. m a -> IO a) -> IO (t b)) -> m (t b)
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO (t b)) -> m (t b))
-> ((forall a. m a -> IO a) -> IO (t b)) -> m (t b)
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
    Int
numProcs <- IO Int
getNumCapabilities
    Int -> (a -> IO b) -> t a -> IO (t b)
forall (t :: * -> *) a b.
Traversable t =>
Int -> (a -> IO b) -> t a -> IO (t b)
pooledMapConcurrentlyIO Int
numProcs (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (a -> m b) -> a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f) t a
xs

-- | Similar to 'pooledMapConcurrentlyN' but with flipped arguments.
--
-- @since 0.2.10
pooledForConcurrentlyN :: (MonadUnliftIO m, Traversable t)
                      => Int -- ^ Max. number of threads. Should not be less than 1.
                      -> t a -> (a -> m b) -> m (t b)
pooledForConcurrentlyN :: Int -> t a -> (a -> m b) -> m (t b)
pooledForConcurrentlyN Int
numProcs = ((a -> m b) -> t a -> m (t b)) -> t a -> (a -> m b) -> m (t b)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Int -> (a -> m b) -> t a -> m (t b)
forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
Int -> (a -> m b) -> t a -> m (t b)
pooledMapConcurrentlyN Int
numProcs)

-- | Similar to 'pooledForConcurrentlyN' but with number of threads
-- set from 'getNumCapabilities'. Usually this is useful for CPU bound
-- tasks.
--
-- @since 0.2.10
pooledForConcurrently :: (MonadUnliftIO m, Traversable t) => t a -> (a -> m b) -> m (t b)
pooledForConcurrently :: t a -> (a -> m b) -> m (t b)
pooledForConcurrently = ((a -> m b) -> t a -> m (t b)) -> t a -> (a -> m b) -> m (t b)
forall a b c. (a -> b -> c) -> b -> a -> c
flip (a -> m b) -> t a -> m (t b)
forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
(a -> m b) -> t a -> m (t b)
pooledMapConcurrently

pooledMapConcurrentlyIO :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
pooledMapConcurrentlyIO :: Int -> (a -> IO b) -> t a -> IO (t b)
pooledMapConcurrentlyIO Int
numProcs a -> IO b
f t a
xs =
    if (Int
numProcs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1)
    then String -> IO (t b)
forall a. HasCallStack => String -> a
error String
"pooledMapconcurrentlyIO: number of threads < 1"
    else Int -> (a -> IO b) -> t a -> IO (t b)
forall (t :: * -> *) a b.
Traversable t =>
Int -> (a -> IO b) -> t a -> IO (t b)
pooledMapConcurrentlyIO' Int
numProcs a -> IO b
f t a
xs

-- | Performs the actual pooling for the tasks. This function will
-- continue execution until the task queue becomes empty. When one of
-- the pooled thread finishes it's task, it will pickup the next task
-- from the queue if an job is available.
pooledConcurrently
  :: Int -- ^ Max. number of threads. Should not be less than 1.
  -> IORef [a] -- ^ Task queue. These are required as inputs for the jobs.
  -> (a -> IO ()) -- ^ The task which will be run concurrently (but
                 -- will be pooled properly).
  -> IO ()
pooledConcurrently :: Int -> IORef [a] -> (a -> IO ()) -> IO ()
pooledConcurrently Int
numProcs IORef [a]
jobsVar a -> IO ()
f = do
  Int -> IO () -> IO ()
forall (m :: * -> *) a.
(Applicative m, MonadUnliftIO m) =>
Int -> m a -> m ()
replicateConcurrently_ Int
numProcs (IO () -> IO ()) -> IO () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    let loop :: IO ()
loop = do
          Maybe a
mbJob :: Maybe a <- IORef [a] -> ([a] -> ([a], Maybe a)) -> IO (Maybe a)
forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [a]
jobsVar (([a] -> ([a], Maybe a)) -> IO (Maybe a))
-> ([a] -> ([a], Maybe a)) -> IO (Maybe a)
forall a b. (a -> b) -> a -> b
$ \[a]
x -> case [a]
x of
            [] -> ([], Maybe a
forall a. Maybe a
Nothing)
            a
var : [a]
vars -> ([a]
vars, a -> Maybe a
forall a. a -> Maybe a
Just a
var)
          case Maybe a
mbJob of
            Maybe a
Nothing -> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just a
x -> do
              a -> IO ()
f a
x
              IO ()
loop
     in IO ()
loop

pooledMapConcurrentlyIO' ::
    Traversable t => Int  -- ^ Max. number of threads. Should not be less than 1.
                  -> (a -> IO b)
                  -> t a
                  -> IO (t b)
pooledMapConcurrentlyIO' :: Int -> (a -> IO b) -> t a -> IO (t b)
pooledMapConcurrentlyIO' Int
numProcs a -> IO b
f t a
xs = do
  -- prepare one IORef per result...
  t (a, IORef b)
jobs :: t (a, IORef b) <-
    t a -> (a -> IO (a, IORef b)) -> IO (t (a, IORef b))
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for t a
xs (\a
x -> (a
x, ) (IORef b -> (a, IORef b)) -> IO (IORef b) -> IO (a, IORef b)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> b -> IO (IORef b)
forall a. a -> IO (IORef a)
newIORef (String -> b
forall a. HasCallStack => String -> a
error String
"pooledMapConcurrentlyIO': empty IORef"))
  -- ...put all the inputs in a queue..
  IORef [(a, IORef b)]
jobsVar :: IORef [(a, IORef b)] <- [(a, IORef b)] -> IO (IORef [(a, IORef b)])
forall a. a -> IO (IORef a)
newIORef (t (a, IORef b) -> [(a, IORef b)]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList t (a, IORef b)
jobs)
  -- ...run `numProcs` threads in parallel, each
  -- of them consuming the queue and filling in
  -- the respective IORefs.
  Int -> IORef [(a, IORef b)] -> ((a, IORef b) -> IO ()) -> IO ()
forall a. Int -> IORef [a] -> (a -> IO ()) -> IO ()
pooledConcurrently Int
numProcs IORef [(a, IORef b)]
jobsVar (((a, IORef b) -> IO ()) -> IO ())
-> ((a, IORef b) -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ (a
x, IORef b
outRef) -> a -> IO b
f a
x IO b -> (b -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IORef b -> b -> IO ()
forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef b
outRef      -- Read all the IORefs
  t (a, IORef b) -> ((a, IORef b) -> IO b) -> IO (t b)
forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for t (a, IORef b)
jobs (\(a
_, IORef b
outputRef) -> IORef b -> IO b
forall a. IORef a -> IO a
readIORef IORef b
outputRef)

pooledMapConcurrentlyIO_' ::
  Foldable t => Int -> (a -> IO ()) -> t a -> IO ()
pooledMapConcurrentlyIO_' :: Int -> (a -> IO ()) -> t a -> IO ()
pooledMapConcurrentlyIO_' Int
numProcs a -> IO ()
f t a
jobs = do
  IORef [a]
jobsVar :: IORef [a] <- [a] -> IO (IORef [a])
forall a. a -> IO (IORef a)
newIORef (t a -> [a]
forall (t :: * -> *) a. Foldable t => t a -> [a]
toList t a
jobs)
  Int -> IORef [a] -> (a -> IO ()) -> IO ()
forall a. Int -> IORef [a] -> (a -> IO ()) -> IO ()
pooledConcurrently Int
numProcs IORef [a]
jobsVar a -> IO ()
f

pooledMapConcurrentlyIO_ :: Foldable t => Int -> (a -> IO b) -> t a -> IO ()
pooledMapConcurrentlyIO_ :: Int -> (a -> IO b) -> t a -> IO ()
pooledMapConcurrentlyIO_ Int
numProcs a -> IO b
f t a
xs =
    if (Int
numProcs Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1)
    then String -> IO ()
forall a. HasCallStack => String -> a
error String
"pooledMapconcurrentlyIO_: number of threads < 1"
    else Int -> (a -> IO ()) -> t a -> IO ()
forall (t :: * -> *) a.
Foldable t =>
Int -> (a -> IO ()) -> t a -> IO ()
pooledMapConcurrentlyIO_' Int
numProcs (\a
x -> a -> IO b
f a
x IO b -> IO () -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> IO ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()) t a
xs

-- | Like 'pooledMapConcurrentlyN' but with the return value
-- discarded.
--
-- @since 0.2.10
pooledMapConcurrentlyN_ :: (MonadUnliftIO m, Foldable f)
                        => Int -- ^ Max. number of threads. Should not be less than 1.
                        -> (a -> m b) -> f a -> m ()
pooledMapConcurrentlyN_ :: Int -> (a -> m b) -> f a -> m ()
pooledMapConcurrentlyN_ Int
numProcs a -> m b
f f a
t =
  ((forall a. m a -> IO a) -> IO ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> Int -> (a -> IO b) -> f a -> IO ()
forall (t :: * -> *) a b.
Foldable t =>
Int -> (a -> IO b) -> t a -> IO ()
pooledMapConcurrentlyIO_ Int
numProcs (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (a -> m b) -> a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f) f a
t

-- | Like 'pooledMapConcurrently' but with the return value discarded.
--
-- @since 0.2.10
pooledMapConcurrently_ :: (MonadUnliftIO m, Foldable f) => (a -> m b) -> f a -> m ()
pooledMapConcurrently_ :: (a -> m b) -> f a -> m ()
pooledMapConcurrently_ a -> m b
f f a
t =
  ((forall a. m a -> IO a) -> IO ()) -> m ()
forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO (((forall a. m a -> IO a) -> IO ()) -> m ())
-> ((forall a. m a -> IO a) -> IO ()) -> m ()
forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
    Int
numProcs <- IO Int
getNumCapabilities
    Int -> (a -> IO b) -> f a -> IO ()
forall (t :: * -> *) a b.
Foldable t =>
Int -> (a -> IO b) -> t a -> IO ()
pooledMapConcurrentlyIO_ Int
numProcs (m b -> IO b
forall a. m a -> IO a
run (m b -> IO b) -> (a -> m b) -> a -> IO b
forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f) f a
t

-- | Like 'pooledMapConcurrently_' but with flipped arguments.
--
-- @since 0.2.10
pooledForConcurrently_ :: (MonadUnliftIO m, Foldable f) => f a -> (a -> m b) -> m ()
pooledForConcurrently_ :: f a -> (a -> m b) -> m ()
pooledForConcurrently_ = ((a -> m b) -> f a -> m ()) -> f a -> (a -> m b) -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (a -> m b) -> f a -> m ()
forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
(a -> m b) -> f a -> m ()
pooledMapConcurrently_

-- | Like 'pooledMapConcurrentlyN_' but with flipped arguments.
--
-- @since 0.2.10
pooledForConcurrentlyN_ :: (MonadUnliftIO m, Foldable t)
                        => Int -- ^ Max. number of threads. Should not be less than 1.
                        -> t a -> (a -> m b) -> m ()
pooledForConcurrentlyN_ :: Int -> t a -> (a -> m b) -> m ()
pooledForConcurrentlyN_ Int
numProcs = ((a -> m b) -> t a -> m ()) -> t a -> (a -> m b) -> m ()
forall a b c. (a -> b -> c) -> b -> a -> c
flip (Int -> (a -> m b) -> t a -> m ()
forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
Int -> (a -> m b) -> f a -> m ()
pooledMapConcurrentlyN_ Int
numProcs)


-- | Pooled version of 'replicateConcurrently'. Performs the action in
-- the pooled threads.
--
-- @since 0.2.10
pooledReplicateConcurrentlyN :: (MonadUnliftIO m)
                             => Int -- ^ Max. number of threads. Should not be less than 1.
                             -> Int -- ^ Number of times to perform the action.
                             -> m a -> m [a]
pooledReplicateConcurrentlyN :: Int -> Int -> m a -> m [a]
pooledReplicateConcurrentlyN Int
numProcs Int
cnt m a
task =
    if Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1
    then [a] -> m [a]
forall (m :: * -> *) a. Monad m => a -> m a
return []
    else Int -> (Int -> m a) -> [Int] -> m [a]
forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
Int -> (a -> m b) -> t a -> m (t b)
pooledMapConcurrentlyN Int
numProcs (\Int
_ -> m a
task) [Int
1..Int
cnt]

-- | Similar to 'pooledReplicateConcurrentlyN' but with number of
-- threads set from 'getNumCapabilities'. Usually this is useful for
-- CPU bound tasks.
--
-- @since 0.2.10
pooledReplicateConcurrently :: (MonadUnliftIO m)
                            => Int -- ^ Number of times to perform the action.
                            -> m a -> m [a]
pooledReplicateConcurrently :: Int -> m a -> m [a]
pooledReplicateConcurrently Int
cnt m a
task =
    if Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1
    then [a] -> m [a]
forall (m :: * -> *) a. Monad m => a -> m a
return []
    else (Int -> m a) -> [Int] -> m [a]
forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
(a -> m b) -> t a -> m (t b)
pooledMapConcurrently (\Int
_ -> m a
task) [Int
1..Int
cnt]

-- | Pooled version of 'replicateConcurrently_'. Performs the action in
-- the pooled threads.
--
-- @since 0.2.10
pooledReplicateConcurrentlyN_ :: (MonadUnliftIO m)
                              => Int -- ^ Max. number of threads. Should not be less than 1.
                              -> Int -- ^ Number of times to perform the action.
                              -> m a -> m ()
pooledReplicateConcurrentlyN_ :: Int -> Int -> m a -> m ()
pooledReplicateConcurrentlyN_ Int
numProcs Int
cnt m a
task =
  if Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1
  then () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
  else Int -> (Int -> m a) -> [Int] -> m ()
forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
Int -> (a -> m b) -> f a -> m ()
pooledMapConcurrentlyN_ Int
numProcs (\Int
_ -> m a
task) [Int
1..Int
cnt]

-- | Similar to 'pooledReplicateConcurrently_' but with number of
-- threads set from 'getNumCapabilities'. Usually this is useful for
-- CPU bound tasks.
--
-- @since 0.2.10
pooledReplicateConcurrently_ :: (MonadUnliftIO m)
                             => Int -- ^ Number of times to perform the action.
                             -> m a -> m ()
pooledReplicateConcurrently_ :: Int -> m a -> m ()
pooledReplicateConcurrently_ Int
cnt m a
task =
  if Int
cnt Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
< Int
1
  then () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
  else (Int -> m a) -> [Int] -> m ()
forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
(a -> m b) -> f a -> m ()
pooledMapConcurrently_ (\Int
_ -> m a
task) [Int
1..Int
cnt]