{-# LANGUAGE CPP                 #-}
{-# LANGUAGE TupleSections       #-}
{-# LANGUAGE DeriveDataTypeable  #-}
{-# LANGUAGE DeriveFunctor       #-}
{-# LANGUAGE DeriveGeneric       #-}
{-# LANGUAGE GADTs               #-}
{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE RankNTypes          #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving  #-}
module UnliftIO.Internals.Async where

import           Control.Applicative
import           Control.Concurrent       (threadDelay, getNumCapabilities)
import qualified Control.Concurrent       as C
import           Control.Concurrent.Async (Async)
import qualified Control.Concurrent.Async as A
import           Control.Concurrent.STM
import           Control.Exception        (Exception, SomeException)
import           Control.Monad            (forever, liftM, unless, void, (>=>))
import           Control.Monad.IO.Unlift
import           Data.Foldable            (for_, traverse_)
import           Data.Typeable            (Typeable)
import           Data.IORef (IORef, readIORef, atomicWriteIORef, newIORef, atomicModifyIORef')
import qualified UnliftIO.Exception       as UE

-- For the implementation of Conc below, we do not want any of the
-- smart async exception handling logic from UnliftIO.Exception, since
-- (eg) we're low-level enough to need to explicit be throwing async
-- exceptions synchronously.
import qualified Control.Exception        as E
import           GHC.Generics             (Generic)

#if MIN_VERSION_base(4,9,0)
import           Data.Semigroup
#else
import           Data.Monoid              hiding (Alt)
#endif
import           Data.Foldable            (Foldable, toList)
import           Data.Traversable         (Traversable, for, traverse)

-- | Unlifted 'A.async'.
--
-- @since 0.1.0.0
async :: MonadUnliftIO m => m a -> m (Async a)
async :: forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
async m a
m = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a. IO a -> IO (Async a)
A.async forall a b. (a -> b) -> a -> b
$ forall a. m a -> IO a
run m a
m

-- | Unlifted 'A.asyncBound'.
--
-- @since 0.1.0.0
asyncBound :: MonadUnliftIO m => m a -> m (Async a)
asyncBound :: forall (m :: * -> *) a. MonadUnliftIO m => m a -> m (Async a)
asyncBound m a
m = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a. IO a -> IO (Async a)
A.asyncBound forall a b. (a -> b) -> a -> b
$ forall a. m a -> IO a
run m a
m

-- | Unlifted 'A.asyncOn'.
--
-- @since 0.1.0.0
asyncOn :: MonadUnliftIO m => Int -> m a -> m (Async a)
asyncOn :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> m a -> m (Async a)
asyncOn Int
i m a
m = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a. Int -> IO a -> IO (Async a)
A.asyncOn Int
i forall a b. (a -> b) -> a -> b
$ forall a. m a -> IO a
run m a
m

-- | Unlifted 'A.asyncWithUnmask'.
--
-- @since 0.1.0.0
asyncWithUnmask :: MonadUnliftIO m => ((forall b. m b -> m b) -> m a) -> m (Async a)
asyncWithUnmask :: forall (m :: * -> *) a.
MonadUnliftIO m =>
((forall b. m b -> m b) -> m a) -> m (Async a)
asyncWithUnmask (forall b. m b -> m b) -> m a
m =
  forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a. ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
A.asyncWithUnmask forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
unmask -> forall a. m a -> IO a
run forall a b. (a -> b) -> a -> b
$ (forall b. m b -> m b) -> m a
m forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall b. IO b -> IO b
unmask forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. m a -> IO a
run

-- | Unlifted 'A.asyncOnWithUnmask'.
--
-- @since 0.1.0.0
asyncOnWithUnmask :: MonadUnliftIO m => Int -> ((forall b. m b -> m b) -> m a) -> m (Async a)
asyncOnWithUnmask :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> ((forall b. m b -> m b) -> m a) -> m (Async a)
asyncOnWithUnmask Int
i (forall b. m b -> m b) -> m a
m =
  forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a. Int -> ((forall b. IO b -> IO b) -> IO a) -> IO (Async a)
A.asyncOnWithUnmask Int
i forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
unmask -> forall a. m a -> IO a
run forall a b. (a -> b) -> a -> b
$ (forall b. m b -> m b) -> m a
m forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall b. IO b -> IO b
unmask forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. m a -> IO a
run

-- | Unlifted 'A.withAsync'.
--
-- @since 0.1.0.0
withAsync :: MonadUnliftIO m => m a -> (Async a -> m b) -> m b
withAsync :: forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync m a
a Async a -> m b
b = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a b. IO a -> (Async a -> IO b) -> IO b
A.withAsync (forall a. m a -> IO a
run m a
a) (forall a. m a -> IO a
run forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> m b
b)

-- | Unlifted 'A.withAsyncBound'.
--
-- @since 0.1.0.0
withAsyncBound :: MonadUnliftIO m => m a -> (Async a -> m b) -> m b
withAsyncBound :: forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsyncBound m a
a Async a -> m b
b = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a b. IO a -> (Async a -> IO b) -> IO b
A.withAsyncBound (forall a. m a -> IO a
run m a
a) (forall a. m a -> IO a
run forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> m b
b)

-- | Unlifted 'A.withAsyncOn'.
--
-- @since 0.1.0.0
withAsyncOn :: MonadUnliftIO m => Int -> m a -> (Async a -> m b) -> m b
withAsyncOn :: forall (m :: * -> *) a b.
MonadUnliftIO m =>
Int -> m a -> (Async a -> m b) -> m b
withAsyncOn Int
i m a
a Async a -> m b
b = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a b. Int -> IO a -> (Async a -> IO b) -> IO b
A.withAsyncOn Int
i (forall a. m a -> IO a
run m a
a) (forall a. m a -> IO a
run forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> m b
b)

-- | Unlifted 'A.withAsyncWithUnmask'.
--
-- @since 0.1.0.0
withAsyncWithUnmask
  :: MonadUnliftIO m
  => ((forall c. m c -> m c) -> m a)
  -> (Async a -> m b)
  -> m b
withAsyncWithUnmask :: forall (m :: * -> *) a b.
MonadUnliftIO m =>
((forall c. m c -> m c) -> m a) -> (Async a -> m b) -> m b
withAsyncWithUnmask (forall c. m c -> m c) -> m a
a Async a -> m b
b =
  forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a b.
((forall b. IO b -> IO b) -> IO a) -> (Async a -> IO b) -> IO b
A.withAsyncWithUnmask
    (\forall b. IO b -> IO b
unmask -> forall a. m a -> IO a
run forall a b. (a -> b) -> a -> b
$ (forall c. m c -> m c) -> m a
a forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall b. IO b -> IO b
unmask forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. m a -> IO a
run)
    (forall a. m a -> IO a
run forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> m b
b)

-- | Unlifted 'A.withAsyncOnWithMask'.
--
-- @since 0.1.0.0
withAsyncOnWithUnmask
  :: MonadUnliftIO m
  => Int
  -> ((forall c. m c -> m c) -> m a)
  -> (Async a -> m b)
  -> m b
withAsyncOnWithUnmask :: forall (m :: * -> *) a b.
MonadUnliftIO m =>
Int -> ((forall c. m c -> m c) -> m a) -> (Async a -> m b) -> m b
withAsyncOnWithUnmask Int
i (forall c. m c -> m c) -> m a
a Async a -> m b
b =
  forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a b.
Int
-> ((forall b. IO b -> IO b) -> IO a) -> (Async a -> IO b) -> IO b
A.withAsyncOnWithUnmask Int
i
    (\forall b. IO b -> IO b
unmask -> forall a. m a -> IO a
run forall a b. (a -> b) -> a -> b
$ (forall c. m c -> m c) -> m a
a forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall b. IO b -> IO b
unmask forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. m a -> IO a
run)
    (forall a. m a -> IO a
run forall b c a. (b -> c) -> (a -> b) -> a -> c
. Async a -> m b
b)

-- | Lifted 'A.wait'.
--
-- @since 0.1.0.0
wait :: MonadIO m => Async a -> m a
wait :: forall (m :: * -> *) a. MonadIO m => Async a -> m a
wait = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> IO a
A.wait

-- | Lifted 'A.poll'.
--
-- @since 0.1.0.0
poll :: MonadIO m => Async a -> m (Maybe (Either SomeException a))
poll :: forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Maybe (Either SomeException a))
poll = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> IO (Maybe (Either SomeException a))
A.poll

-- | Lifted 'A.waitCatch'.
--
-- @since 0.1.0.0
waitCatch :: MonadIO m => Async a -> m (Either SomeException a)
waitCatch :: forall (m :: * -> *) a.
MonadIO m =>
Async a -> m (Either SomeException a)
waitCatch = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> IO (Either SomeException a)
A.waitCatch

-- | Lifted 'A.cancel'.
--
-- @since 0.1.0.0
cancel :: MonadIO m => Async a -> m ()
cancel :: forall (m :: * -> *) a. MonadIO m => Async a -> m ()
cancel = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> IO ()
A.cancel

-- | Lifted 'A.uninterruptibleCancel'.
--
-- @since 0.1.0.0
uninterruptibleCancel :: MonadIO m => Async a -> m ()
uninterruptibleCancel :: forall (m :: * -> *) a. MonadIO m => Async a -> m ()
uninterruptibleCancel = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> IO ()
A.uninterruptibleCancel

-- | Lifted 'A.cancelWith'. Additionally uses 'UE.toAsyncException' to
-- ensure async exception safety.
--
-- @since 0.1.0.0
cancelWith :: (Exception e, MonadIO m) => Async a -> e -> m ()
cancelWith :: forall e (m :: * -> *) a.
(Exception e, MonadIO m) =>
Async a -> e -> m ()
cancelWith Async a
a e
e = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall e a. Exception e => Async a -> e -> IO ()
A.cancelWith Async a
a (forall e. Exception e => e -> SomeException
UE.toAsyncException e
e))

-- | Lifted 'A.waitAny'.
--
-- @since 0.1.0.0
waitAny :: MonadIO m => [Async a] -> m (Async a, a)
waitAny :: forall (m :: * -> *) a. MonadIO m => [Async a] -> m (Async a, a)
waitAny = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. [Async a] -> IO (Async a, a)
A.waitAny

-- | Lifted 'A.waitAnyCatch'.
--
-- @since 0.1.0.0
waitAnyCatch :: MonadIO m => [Async a] -> m (Async a, Either SomeException a)
waitAnyCatch :: forall (m :: * -> *) a.
MonadIO m =>
[Async a] -> m (Async a, Either SomeException a)
waitAnyCatch = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. [Async a] -> IO (Async a, Either SomeException a)
A.waitAnyCatch

-- | Lifted 'A.waitAnyCancel'.
--
-- @since 0.1.0.0
waitAnyCancel :: MonadIO m => [Async a] -> m (Async a, a)
waitAnyCancel :: forall (m :: * -> *) a. MonadIO m => [Async a] -> m (Async a, a)
waitAnyCancel = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. [Async a] -> IO (Async a, a)
A.waitAnyCancel

-- | Lifted 'A.waitAnyCatchCancel'.
--
-- @since 0.1.0.0
waitAnyCatchCancel :: MonadIO m => [Async a] -> m (Async a, Either SomeException a)
waitAnyCatchCancel :: forall (m :: * -> *) a.
MonadIO m =>
[Async a] -> m (Async a, Either SomeException a)
waitAnyCatchCancel = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. [Async a] -> IO (Async a, Either SomeException a)
A.waitAnyCatchCancel

-- | Lifted 'A.waitEither'.
--
-- @since 0.1.0.0
waitEither :: MonadIO m => Async a -> Async b -> m (Either a b)
waitEither :: forall (m :: * -> *) a b.
MonadIO m =>
Async a -> Async b -> m (Either a b)
waitEither Async a
a Async b
b = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a b. Async a -> Async b -> IO (Either a b)
A.waitEither Async a
a Async b
b)

-- | Lifted 'A.waitEitherCatch'.
--
-- @since 0.1.0.0
waitEitherCatch :: MonadIO m => Async a -> Async b -> m (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch :: forall (m :: * -> *) a b.
MonadIO m =>
Async a
-> Async b
-> m (Either (Either SomeException a) (Either SomeException b))
waitEitherCatch Async a
a Async b
b = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
A.waitEitherCatch Async a
a Async b
b)

-- | Lifted 'A.waitEitherCancel'.
--
-- @since 0.1.0.0
waitEitherCancel :: MonadIO m => Async a -> Async b -> m (Either a b)
waitEitherCancel :: forall (m :: * -> *) a b.
MonadIO m =>
Async a -> Async b -> m (Either a b)
waitEitherCancel Async a
a Async b
b = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a b. Async a -> Async b -> IO (Either a b)
A.waitEitherCancel Async a
a Async b
b)

-- | Lifted 'A.waitEitherCatchCancel'.
--
-- @since 0.1.0.0
waitEitherCatchCancel :: MonadIO m => Async a -> Async b -> m (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchCancel :: forall (m :: * -> *) a b.
MonadIO m =>
Async a
-> Async b
-> m (Either (Either SomeException a) (Either SomeException b))
waitEitherCatchCancel Async a
a Async b
b = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a b.
Async a
-> Async b
-> IO (Either (Either SomeException a) (Either SomeException b))
A.waitEitherCatchCancel Async a
a Async b
b)

-- | Lifted 'A.waitEither_'.
--
-- @since 0.1.0.0
waitEither_ :: MonadIO m => Async a -> Async b -> m ()
waitEither_ :: forall (m :: * -> *) a b. MonadIO m => Async a -> Async b -> m ()
waitEither_ Async a
a Async b
b = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a b. Async a -> Async b -> IO ()
A.waitEither_ Async a
a Async b
b)

-- | Lifted 'A.waitBoth'.
--
-- @since 0.1.0.0
waitBoth :: MonadIO m => Async a -> Async b -> m (a, b)
waitBoth :: forall (m :: * -> *) a b.
MonadIO m =>
Async a -> Async b -> m (a, b)
waitBoth Async a
a Async b
b = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a b. Async a -> Async b -> IO (a, b)
A.waitBoth Async a
a Async b
b)

-- | Lifted 'A.link'.
--
-- @since 0.1.0.0
link :: MonadIO m => Async a -> m ()
link :: forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Async a -> IO ()
A.link

-- | Lifted 'A.link2'.
--
-- @since 0.1.0.0
link2 :: MonadIO m => Async a -> Async b -> m ()
link2 :: forall (m :: * -> *) a b. MonadIO m => Async a -> Async b -> m ()
link2 Async a
a Async b
b = forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall a b. Async a -> Async b -> IO ()
A.link2 Async a
a Async b
b)

-- | Unlifted 'A.race'.
--
-- @since 0.1.0.0
race :: MonadUnliftIO m => m a -> m b -> m (Either a b)
race :: forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race m a
a m b
b = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a b. IO a -> IO b -> IO (Either a b)
A.race (forall a. m a -> IO a
run m a
a) (forall a. m a -> IO a
run m b
b)

-- | Unlifted 'A.race_'.
--
-- @since 0.1.0.0
race_ :: MonadUnliftIO m => m a -> m b -> m ()
race_ :: forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
race_ m a
a m b
b = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a b. IO a -> IO b -> IO ()
A.race_ (forall a. m a -> IO a
run m a
a) (forall a. m a -> IO a
run m b
b)

-- | Unlifted 'A.concurrently'.
--
-- @since 0.1.0.0
concurrently :: MonadUnliftIO m => m a -> m b -> m (a, b)
concurrently :: forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m (a, b)
concurrently m a
a m b
b = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a b. IO a -> IO b -> IO (a, b)
A.concurrently (forall a. m a -> IO a
run m a
a) (forall a. m a -> IO a
run m b
b)

-- | Unlifted 'A.concurrently_'.
--
-- @since 0.1.0.0
concurrently_ :: MonadUnliftIO m => m a -> m b -> m ()
concurrently_ :: forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m ()
concurrently_ m a
a m b
b = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a b. IO a -> IO b -> IO ()
A.concurrently_ (forall a. m a -> IO a
run m a
a) (forall a. m a -> IO a
run m b
b)

-- | Unlifted 'A.Concurrently'.
--
-- @since 0.1.0.0
newtype Concurrently m a = Concurrently
  { forall (m :: * -> *) a. Concurrently m a -> m a
runConcurrently :: m a
  }

-- | @since 0.1.0.0
instance Monad m => Functor (Concurrently m) where
  fmap :: forall a b. (a -> b) -> Concurrently m a -> Concurrently m b
fmap a -> b
f (Concurrently m a
a) = forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM a -> b
f m a
a

-- | @since 0.1.0.0
instance MonadUnliftIO m => Applicative (Concurrently m) where
  pure :: forall a. a -> Concurrently m a
pure = forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (m :: * -> *) a. Monad m => a -> m a
return
  Concurrently m (a -> b)
fs <*> :: forall a b.
Concurrently m (a -> b) -> Concurrently m a -> Concurrently m b
<*> Concurrently m a
as =
    forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (\(a -> b
f, a
a) -> a -> b
f a
a) (forall (m :: * -> *) a b. MonadUnliftIO m => m a -> m b -> m (a, b)
concurrently m (a -> b)
fs m a
as)

-- | Composing two unlifted 'Concurrently' values using 'Alternative' is the
-- equivalent to using a 'race' combinator, the asynchrounous sub-routine that
-- returns a value first is the one that gets it's value returned, the slowest
-- sub-routine gets cancelled and it's thread is killed.
--
-- @since 0.1.0.0
instance MonadUnliftIO m => Alternative (Concurrently m) where
  -- | Care should be taken when using the 'empty' value of the 'Alternative'
  -- interface, as it will create a thread that delays for a long period of
  -- time. The reason behind this implementation is that any other computation
  -- will finish first than the 'empty' value. This implementation is less than
  -- ideal, and in a perfect world, we would have a typeclass family that allows
  -- '(<|>)' but not 'empty'.
  --
  -- @since 0.1.0.0
  empty :: forall a. Concurrently m a
empty = forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (Int -> IO ()
threadDelay forall a. Bounded a => a
maxBound))
  Concurrently m a
as <|> :: forall a. Concurrently m a -> Concurrently m a -> Concurrently m a
<|> Concurrently m a
bs =
    forall (m :: * -> *) a. m a -> Concurrently m a
Concurrently forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a1 r. Monad m => (a1 -> r) -> m a1 -> m r
liftM (forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall a. a -> a
id forall a. a -> a
id) (forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> m b -> m (Either a b)
race m a
as m a
bs)

--------------------------------------------------------------------------------
#if MIN_VERSION_base(4,9,0)
--------------------------------------------------------------------------------
-- | Only defined by @async@ for @base >= 4.9@.
--
-- @since 0.1.0.0
instance (MonadUnliftIO m, Semigroup a) => Semigroup (Concurrently m a) where
  <> :: Concurrently m a -> Concurrently m a -> Concurrently m a
(<>) = forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 forall a. Semigroup a => a -> a -> a
(<>)

-- | @since 0.1.0.0
instance (Semigroup a, Monoid a, MonadUnliftIO m) => Monoid (Concurrently m a) where
  mempty :: Concurrently m a
mempty = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Monoid a => a
mempty
  mappend :: Concurrently m a -> Concurrently m a -> Concurrently m a
mappend = forall a. Semigroup a => a -> a -> a
(<>)
--------------------------------------------------------------------------------
#else
--------------------------------------------------------------------------------
-- | @since 0.1.0.0
instance (Monoid a, MonadUnliftIO m) => Monoid (Concurrently m a) where
  mempty = pure mempty
  mappend = liftA2 mappend
--------------------------------------------------------------------------------
#endif
--------------------------------------------------------------------------------

-- | Similar to 'mapConcurrently' but with arguments flipped
--
-- @since 0.1.0.0
forConcurrently :: MonadUnliftIO m => Traversable t => t a -> (a -> m b) -> m (t b)
forConcurrently :: forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
t a -> (a -> m b) -> m (t b)
forConcurrently = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
(a -> m b) -> t a -> m (t b)
mapConcurrently
{-# INLINE forConcurrently #-}

-- | Similar to 'mapConcurrently_' but with arguments flipped
--
-- @since 0.1.0.0
forConcurrently_ :: MonadUnliftIO m => Foldable f => f a -> (a -> m b) -> m ()
forConcurrently_ :: forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
f a -> (a -> m b) -> m ()
forConcurrently_ = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
(a -> m b) -> f a -> m ()
mapConcurrently_
{-# INLINE forConcurrently_ #-}

-- | Unlifted 'A.replicateConcurrently'.
--
-- @since 0.1.0.0
#if MIN_VERSION_base(4,7,0)
#else
replicateConcurrently :: (Functor m, MonadUnliftIO m) => Int -> m a -> m [a]
#endif
replicateConcurrently :: Int -> f a -> f [a]
replicateConcurrently Int
cnt f a
m =
  case forall a. Ord a => a -> a -> Ordering
compare Int
cnt Int
1 of
    Ordering
LT -> forall (f :: * -> *) a. Applicative f => a -> f a
pure []
    Ordering
EQ -> (forall a. a -> [a] -> [a]
:[]) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> f a
m
    Ordering
GT -> forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
(a -> m b) -> t a -> m (t b)
mapConcurrently forall a. a -> a
id (forall a. Int -> a -> [a]
replicate Int
cnt f a
m)
{-# INLINE replicateConcurrently #-}

-- | Unlifted 'A.replicateConcurrently_'.
--
-- @since 0.1.0.0
#if MIN_VERSION_base(4,7,0)
replicateConcurrently_ :: (Applicative m, MonadUnliftIO m) => Int -> m a -> m ()
#else
replicateConcurrently_ :: (MonadUnliftIO m) => Int -> m a -> m ()
#endif
replicateConcurrently_ :: forall (m :: * -> *) a.
(Applicative m, MonadUnliftIO m) =>
Int -> m a -> m ()
replicateConcurrently_ Int
cnt m a
m =
  case forall a. Ord a => a -> a -> Ordering
compare Int
cnt Int
1 of
    Ordering
LT -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
    Ordering
EQ -> forall (f :: * -> *) a. Functor f => f a -> f ()
void m a
m
    Ordering
GT -> forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
(a -> m b) -> f a -> m ()
mapConcurrently_ forall a. a -> a
id (forall a. Int -> a -> [a]
replicate Int
cnt m a
m)
{-# INLINE replicateConcurrently_ #-}

-- Conc uses GHC features that are not supported in versions <= to ghc-7.10
-- so we are going to export/use it when we have a higher version only.
--------------------------------------------------------------------------------
#if MIN_VERSION_base(4,8,0)
--------------------------------------------------------------------------------

-- | Executes a 'Traversable' container of items concurrently, it uses the 'Flat'
-- type internally.
--
-- @since 0.1.0.0
mapConcurrently :: MonadUnliftIO m => Traversable t => (a -> m b) -> t a -> m (t b)
mapConcurrently :: forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
(a -> m b) -> t a -> m (t b)
mapConcurrently a -> m b
f t a
t = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a. Flat a -> IO a
runFlat forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse
  (forall a. FlatApp a -> Flat a
FlatApp forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. IO a -> FlatApp a
FlatAction forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. m a -> IO a
run forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f)
  t a
t
{-# INLINE mapConcurrently #-}

-- | Executes a 'Traversable' container of items concurrently, it uses the 'Flat'
-- type internally. This function ignores the results.
--
-- @since 0.1.0.0
mapConcurrently_ :: MonadUnliftIO m => Foldable f => (a -> m b) -> f a -> m ()
mapConcurrently_ :: forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
(a -> m b) -> f a -> m ()
mapConcurrently_ a -> m b
f f a
t = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall a. Flat a -> IO a
runFlat forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
(a -> f b) -> t a -> f ()
traverse_
  (forall a. FlatApp a -> Flat a
FlatApp forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. IO a -> FlatApp a
FlatAction forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. m a -> IO a
run forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f)
  f a
t
{-# INLINE mapConcurrently_ #-}


-- More efficient Conc implementation

-- | A more efficient alternative to 'Concurrently', which reduces the
-- number of threads that need to be forked. For more information, see
-- [this blog post](https://www.fpcomplete.com/blog/transformations-on-applicative-concurrent-computations/).
-- This is provided as a separate type to @Concurrently@ as it has a slightly different API.
--
-- Use the 'conc' function to construct values of type 'Conc', and
-- 'runConc' to execute the composed actions. You can use the
-- @Applicative@ instance to run different actions and wait for all of
-- them to complete, or the @Alternative@ instance to wait for the
-- first thread to complete.
--
-- In the event of a runtime exception thrown by any of the children
-- threads, or an asynchronous exception received in the parent
-- thread, all threads will be killed with an 'A.AsyncCancelled'
-- exception and the original exception rethrown. If multiple
-- exceptions are generated by different threads, there are no
-- guarantees on which exception will end up getting rethrown.
--
-- For many common use cases, you may prefer using helper functions in
-- this module like 'mapConcurrently'.
--
-- There are some intentional differences in behavior to
-- @Concurrently@:
--
-- * Children threads are always launched in an unmasked state, not
--   the inherited state of the parent thread.
--
-- Note that it is a programmer error to use the @Alternative@
-- instance in such a way that there are no alternatives to an empty,
-- e.g. @runConc (empty <|> empty)@. In such a case, a 'ConcException'
-- will be thrown. If there was an @Alternative@ in the standard
-- libraries without @empty@, this library would use it instead.
--
-- @since 0.2.9.0
data Conc m a where
  Action :: m a -> Conc m a
  Apply   :: Conc m (v -> a) -> Conc m v -> Conc m a
  LiftA2 :: (x -> y -> a) -> Conc m x -> Conc m y -> Conc m a

  -- Just an optimization to avoid spawning extra threads
  Pure :: a -> Conc m a

  -- I thought there would be an optimization available from having a
  -- data constructor that explicit doesn't care about the first
  -- result. Turns out it doesn't help much: we still need to keep a
  -- TMVar below to know when the thread completes.
  --
  -- Then :: Conc m a -> Conc m b -> Conc m b

  Alt :: Conc m a -> Conc m a -> Conc m a
  Empty :: Conc m a

deriving instance Functor m => Functor (Conc m)
-- fmap f (Action routine) = Action (fmap f routine)
-- fmap f (LiftA2 g x y)   = LiftA2 (fmap f g) x y
-- fmap f (Pure val)       = Pure (f val)
-- fmap f (Alt a b)        = Alt (fmap f a) (fmap f b)
-- fmap f Empty            = Empty

-- | Construct a value of type 'Conc' from an action. Compose these
-- values using the typeclass instances (most commonly 'Applicative'
-- and 'Alternative') and then run with 'runConc'.
--
-- @since 0.2.9.0
conc :: m a -> Conc m a
conc :: forall (m :: * -> *) a. m a -> Conc m a
conc = forall (m :: * -> *) a. m a -> Conc m a
Action
{-# INLINE conc #-}

-- | Run a 'Conc' value on multiple threads.
--
-- @since 0.2.9.0
runConc :: MonadUnliftIO m => Conc m a -> m a
runConc :: forall (m :: * -> *) a. MonadUnliftIO m => Conc m a -> m a
runConc = forall (m :: * -> *) a. MonadUnliftIO m => Conc m a -> m (Flat a)
flatten forall (m :: * -> *) a b c.
Monad m =>
(a -> m b) -> (b -> m c) -> a -> m c
>=> (forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. Flat a -> IO a
runFlat)
{-# INLINE runConc #-}

-- | @since 0.2.9.0
instance MonadUnliftIO m => Applicative (Conc m) where
  pure :: forall a. a -> Conc m a
pure = forall a (m :: * -> *). a -> Conc m a
Pure
  {-# INLINE pure #-}
  -- | Following is an example of how an 'Applicative' expands to a Tree
  --
  -- @@@
  -- downloadA :: IO String
  -- downloadB :: IO String
  --
  -- (f <$> conc downloadA <*> conc downloadB <*> pure 123)
  --
  --   (((f <$> a) <*> b) <*> c))
  --        (1)    (2)    (3)
  --
  -- (1)
  --   Action (fmap f downloadA)
  -- (2)
  --   Apply (Action (fmap f downloadA)) (Action downloadB)
  -- (3)
  --   Apply (Apply (Action (fmap f downloadA)) (Action downloadB))
  --        (Pure 123)
  -- @@@
  --
  <*> :: forall a b. Conc m (a -> b) -> Conc m a -> Conc m b
(<*>) = forall (m :: * -> *) v a. Conc m (v -> a) -> Conc m v -> Conc m a
Apply
  {-# INLINE (<*>) #-}
  -- See comment above on Then
  -- (*>) = Then
#if MIN_VERSION_base(4,11,0)
  liftA2 :: forall a b c. (a -> b -> c) -> Conc m a -> Conc m b -> Conc m c
liftA2 = forall v y a (m :: * -> *).
(v -> y -> a) -> Conc m v -> Conc m y -> Conc m a
LiftA2
  {-# INLINE liftA2 #-}
#endif

  Conc m a
a *> :: forall a b. Conc m a -> Conc m b -> Conc m b
*> Conc m b
b = forall v y a (m :: * -> *).
(v -> y -> a) -> Conc m v -> Conc m y -> Conc m a
LiftA2 (\a
_ b
x -> b
x) Conc m a
a Conc m b
b
  {-# INLINE (*>) #-}

-- | @since 0.2.9.0
instance MonadUnliftIO m => Alternative (Conc m) where
  empty :: forall a. Conc m a
empty = forall (m :: * -> *) a. Conc m a
Empty -- this is so ugly, we don't actually want to provide it!
  {-# INLINE empty #-}
  <|> :: forall a. Conc m a -> Conc m a -> Conc m a
(<|>) = forall (m :: * -> *) a. Conc m a -> Conc m a -> Conc m a
Alt
  {-# INLINE (<|>) #-}

#if MIN_VERSION_base(4, 11, 0)
-- | @since 0.2.9.0
instance (MonadUnliftIO m, Semigroup a) => Semigroup (Conc m a) where
  <> :: Conc m a -> Conc m a -> Conc m a
(<>) = forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 forall a. Semigroup a => a -> a -> a
(<>)
  {-# INLINE (<>) #-}
#endif

-- | @since 0.2.9.0
instance (Monoid a, MonadUnliftIO m) => Monoid (Conc m a) where
  mempty :: Conc m a
mempty = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. Monoid a => a
mempty
  {-# INLINE mempty #-}
  mappend :: Conc m a -> Conc m a -> Conc m a
mappend = forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 forall a. Monoid a => a -> a -> a
mappend
  {-# INLINE mappend #-}

-------------------------
-- Conc implementation --
-------------------------

-- Data types for flattening out the original @Conc@ into a simplified
-- view. Goals:
--
-- * We want to get rid of the Empty data constructor. We don't want
--   it anyway, it's only there because of the Alternative typeclass.
--
-- * We want to ensure that there is no nesting of Alt data
--   constructors. There is a bookkeeping overhead to each time we
--   need to track raced threads, and we want to minimize that
--   bookkeeping.
--
-- * We want to ensure that, when racing, we're always racing at least
--   two threads.
--
-- * We want to simplify down to IO.

-- | Flattened structure, either Applicative or Alternative
data Flat a
  = FlatApp !(FlatApp a)
  -- | Flattened Alternative. Has at least 2 entries, which must be
  -- FlatApp (no nesting of FlatAlts).
  | FlatAlt !(FlatApp a) !(FlatApp a) ![FlatApp a]

deriving instance Functor Flat
-- fmap f (FlatApp a) =
--  FlatApp (fmap f a)
-- fmap f (FlatAlt (FlatApp a) (FlatApp b) xs) =
--   FlatAlt (FlatApp (fmap f a)) (FlatApp (fmap f b)) (map (fmap f) xs)
instance Applicative Flat where
  pure :: forall a. a -> Flat a
pure = forall a. FlatApp a -> Flat a
FlatApp forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a. Applicative f => a -> f a
pure
  <*> :: forall a b. Flat (a -> b) -> Flat a -> Flat b
(<*>) Flat (a -> b)
f Flat a
a = forall a. FlatApp a -> Flat a
FlatApp (forall v y a. (v -> y -> a) -> Flat v -> Flat y -> FlatApp a
FlatLiftA2 forall a. a -> a
id Flat (a -> b)
f Flat a
a)
#if MIN_VERSION_base(4,11,0)
  liftA2 :: forall a b c. (a -> b -> c) -> Flat a -> Flat b -> Flat c
liftA2 a -> b -> c
f Flat a
a Flat b
b = forall a. FlatApp a -> Flat a
FlatApp (forall v y a. (v -> y -> a) -> Flat v -> Flat y -> FlatApp a
FlatLiftA2 a -> b -> c
f Flat a
a Flat b
b)
#endif

-- | Flattened Applicative. No Alternative stuff directly in here, but may be in
-- the children. Notice this type doesn't have a type parameter for monadic
-- contexts, it hardwires the base monad to IO given concurrency relies
-- eventually on that.
--
-- @since 0.2.9.0
data FlatApp a where
  FlatPure   :: a -> FlatApp a
  FlatAction :: IO a -> FlatApp a
  FlatApply   :: Flat (v -> a) -> Flat v -> FlatApp a
  FlatLiftA2 :: (x -> y -> a) -> Flat x -> Flat y -> FlatApp a

deriving instance Functor FlatApp
instance Applicative FlatApp where
  pure :: forall a. a -> FlatApp a
pure = forall a. a -> FlatApp a
FlatPure
  <*> :: forall a b. FlatApp (a -> b) -> FlatApp a -> FlatApp b
(<*>) FlatApp (a -> b)
mf FlatApp a
ma = forall v a. Flat (v -> a) -> Flat v -> FlatApp a
FlatApply (forall a. FlatApp a -> Flat a
FlatApp FlatApp (a -> b)
mf) (forall a. FlatApp a -> Flat a
FlatApp FlatApp a
ma)
#if MIN_VERSION_base(4,11,0)
  liftA2 :: forall a b c. (a -> b -> c) -> FlatApp a -> FlatApp b -> FlatApp c
liftA2 a -> b -> c
f FlatApp a
a FlatApp b
b = forall v y a. (v -> y -> a) -> Flat v -> Flat y -> FlatApp a
FlatLiftA2 a -> b -> c
f (forall a. FlatApp a -> Flat a
FlatApp FlatApp a
a) (forall a. FlatApp a -> Flat a
FlatApp FlatApp b
b)
#endif

-- | Things that can go wrong in the structure of a 'Conc'. These are
-- /programmer errors/.
--
-- @since 0.2.9.0
data ConcException
  = EmptyWithNoAlternative
  deriving (forall x. Rep ConcException x -> ConcException
forall x. ConcException -> Rep ConcException x
forall a.
(forall x. a -> Rep a x) -> (forall x. Rep a x -> a) -> Generic a
$cto :: forall x. Rep ConcException x -> ConcException
$cfrom :: forall x. ConcException -> Rep ConcException x
Generic, Int -> ConcException -> ShowS
[ConcException] -> ShowS
ConcException -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [ConcException] -> ShowS
$cshowList :: [ConcException] -> ShowS
show :: ConcException -> String
$cshow :: ConcException -> String
showsPrec :: Int -> ConcException -> ShowS
$cshowsPrec :: Int -> ConcException -> ShowS
Show, Typeable, ConcException -> ConcException -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ConcException -> ConcException -> Bool
$c/= :: ConcException -> ConcException -> Bool
== :: ConcException -> ConcException -> Bool
$c== :: ConcException -> ConcException -> Bool
Eq, Eq ConcException
ConcException -> ConcException -> Bool
ConcException -> ConcException -> Ordering
ConcException -> ConcException -> ConcException
forall a.
Eq a
-> (a -> a -> Ordering)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> Bool)
-> (a -> a -> a)
-> (a -> a -> a)
-> Ord a
min :: ConcException -> ConcException -> ConcException
$cmin :: ConcException -> ConcException -> ConcException
max :: ConcException -> ConcException -> ConcException
$cmax :: ConcException -> ConcException -> ConcException
>= :: ConcException -> ConcException -> Bool
$c>= :: ConcException -> ConcException -> Bool
> :: ConcException -> ConcException -> Bool
$c> :: ConcException -> ConcException -> Bool
<= :: ConcException -> ConcException -> Bool
$c<= :: ConcException -> ConcException -> Bool
< :: ConcException -> ConcException -> Bool
$c< :: ConcException -> ConcException -> Bool
compare :: ConcException -> ConcException -> Ordering
$ccompare :: ConcException -> ConcException -> Ordering
Ord)
instance E.Exception ConcException

-- | Simple difference list, for nicer types below
type DList a = [a] -> [a]

dlistConcat :: DList a -> DList a -> DList a
dlistConcat :: forall a. DList a -> DList a -> DList a
dlistConcat = forall b c a. (b -> c) -> (a -> b) -> a -> c
(.)
{-# INLINE dlistConcat #-}

dlistCons :: a -> DList a -> DList a
dlistCons :: forall a. a -> DList a -> DList a
dlistCons a
a DList a
as = forall a. a -> [a] -> [a]
dlistSingleton a
a forall a. DList a -> DList a -> DList a
`dlistConcat` DList a
as
{-# INLINE dlistCons #-}

dlistConcatAll :: [DList a] -> DList a
dlistConcatAll :: forall a. [DList a] -> DList a
dlistConcatAll = forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr forall b c a. (b -> c) -> (a -> b) -> a -> c
(.) forall a. a -> a
id
{-# INLINE dlistConcatAll #-}

dlistToList :: DList a -> [a]
dlistToList :: forall a. DList a -> [a]
dlistToList = (forall a b. (a -> b) -> a -> b
$ [])
{-# INLINE dlistToList #-}

dlistSingleton :: a -> DList a
dlistSingleton :: forall a. a -> [a] -> [a]
dlistSingleton a
a = (a
aforall a. a -> [a] -> [a]
:)
{-# INLINE dlistSingleton #-}

dlistEmpty :: DList a
dlistEmpty :: forall a. DList a
dlistEmpty = forall a. a -> a
id
{-# INLINE dlistEmpty #-}

-- | Turn a 'Conc' into a 'Flat'. Note that thanks to the ugliness of
-- 'empty', this may fail, e.g. @flatten Empty@.
--
-- @since 0.2.9.0
flatten :: forall m a. MonadUnliftIO m => Conc m a -> m (Flat a)
flatten :: forall (m :: * -> *) a. MonadUnliftIO m => Conc m a -> m (Flat a)
flatten Conc m a
c0 = forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do

  -- why not app?
  let both :: forall k. Conc m k -> IO (Flat k)
      both :: forall k. Conc m k -> IO (Flat k)
both Conc m k
Empty = forall e a. Exception e => e -> IO a
E.throwIO ConcException
EmptyWithNoAlternative
      both (Action m k
m) = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a. FlatApp a -> Flat a
FlatApp forall a b. (a -> b) -> a -> b
$ forall a. IO a -> FlatApp a
FlatAction forall a b. (a -> b) -> a -> b
$ forall a. m a -> IO a
run m k
m
      both (Apply Conc m (v -> k)
cf Conc m v
ca) = do
        Flat (v -> k)
f <- forall k. Conc m k -> IO (Flat k)
both Conc m (v -> k)
cf
        Flat v
a <- forall k. Conc m k -> IO (Flat k)
both Conc m v
ca
        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a. FlatApp a -> Flat a
FlatApp forall a b. (a -> b) -> a -> b
$ forall v a. Flat (v -> a) -> Flat v -> FlatApp a
FlatApply Flat (v -> k)
f Flat v
a
      both (LiftA2 x -> y -> k
f Conc m x
ca Conc m y
cb) = do
        Flat x
a <- forall k. Conc m k -> IO (Flat k)
both Conc m x
ca
        Flat y
b <- forall k. Conc m k -> IO (Flat k)
both Conc m y
cb
        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a. FlatApp a -> Flat a
FlatApp forall a b. (a -> b) -> a -> b
$ forall v y a. (v -> y -> a) -> Flat v -> Flat y -> FlatApp a
FlatLiftA2 x -> y -> k
f Flat x
a Flat y
b
      both (Alt Conc m k
ca Conc m k
cb) = do
        DList (FlatApp k)
a <- forall k. Conc m k -> IO (DList (FlatApp k))
alt Conc m k
ca
        DList (FlatApp k)
b <- forall k. Conc m k -> IO (DList (FlatApp k))
alt Conc m k
cb
        case forall a. DList a -> [a]
dlistToList (DList (FlatApp k)
a forall a. DList a -> DList a -> DList a
`dlistConcat` DList (FlatApp k)
b) of
          []    -> forall e a. Exception e => e -> IO a
E.throwIO ConcException
EmptyWithNoAlternative
          [FlatApp k
x]   -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a. FlatApp a -> Flat a
FlatApp FlatApp k
x
          FlatApp k
x:FlatApp k
y:[FlatApp k]
z -> forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a. FlatApp a -> FlatApp a -> [FlatApp a] -> Flat a
FlatAlt FlatApp k
x FlatApp k
y [FlatApp k]
z
      both (Pure k
a) = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ forall a. FlatApp a -> Flat a
FlatApp forall a b. (a -> b) -> a -> b
$ forall a. a -> FlatApp a
FlatPure k
a

      -- Returns a difference list for cheaper concatenation
      alt :: forall k. Conc m k -> IO (DList (FlatApp k))
      alt :: forall k. Conc m k -> IO (DList (FlatApp k))
alt Conc m k
Empty = forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a. DList a
dlistEmpty
      alt (Apply Conc m (v -> k)
cf Conc m v
ca) = do
        Flat (v -> k)
f <- forall k. Conc m k -> IO (Flat k)
both Conc m (v -> k)
cf
        Flat v
a <- forall k. Conc m k -> IO (Flat k)
both Conc m v
ca
        forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> [a] -> [a]
dlistSingleton forall a b. (a -> b) -> a -> b
$ forall v a. Flat (v -> a) -> Flat v -> FlatApp a
FlatApply Flat (v -> k)
f Flat v
a)
      alt (Alt Conc m k
ca Conc m k
cb) = do
        DList (FlatApp k)
a <- forall k. Conc m k -> IO (DList (FlatApp k))
alt Conc m k
ca
        DList (FlatApp k)
b <- forall k. Conc m k -> IO (DList (FlatApp k))
alt Conc m k
cb
        forall (f :: * -> *) a. Applicative f => a -> f a
pure forall a b. (a -> b) -> a -> b
$ DList (FlatApp k)
a forall a. DList a -> DList a -> DList a
`dlistConcat` DList (FlatApp k)
b
      alt (Action m k
m) = forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> [a] -> [a]
dlistSingleton forall a b. (a -> b) -> a -> b
$ forall a. IO a -> FlatApp a
FlatAction (forall a. m a -> IO a
run m k
m))
      alt (LiftA2 x -> y -> k
f Conc m x
ca Conc m y
cb) = do
        Flat x
a <- forall k. Conc m k -> IO (Flat k)
both Conc m x
ca
        Flat y
b <- forall k. Conc m k -> IO (Flat k)
both Conc m y
cb
        forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> [a] -> [a]
dlistSingleton forall a b. (a -> b) -> a -> b
$ forall v y a. (v -> y -> a) -> Flat v -> Flat y -> FlatApp a
FlatLiftA2 x -> y -> k
f Flat x
a Flat y
b)
      alt (Pure k
a) = forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. a -> [a] -> [a]
dlistSingleton forall a b. (a -> b) -> a -> b
$ forall a. a -> FlatApp a
FlatPure k
a)

  forall k. Conc m k -> IO (Flat k)
both Conc m a
c0

-- | Run a @Flat a@ on multiple threads.
runFlat :: Flat a -> IO a

-- Silly, simple optimizations
runFlat :: forall a. Flat a -> IO a
runFlat (FlatApp (FlatAction IO a
io)) = IO a
io
runFlat (FlatApp (FlatPure a
x)) = forall (f :: * -> *) a. Applicative f => a -> f a
pure a
x

-- Start off with all exceptions masked so we can install proper cleanup.
runFlat Flat a
f0 = forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
E.uninterruptibleMask forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore -> do
  -- How many threads have been spawned and finished their task? We need to
  -- ensure we kill all child threads and wait for them to die.
  TVar Int
resultCountVar <- forall a. a -> IO (TVar a)
newTVarIO Int
0

  -- Forks off as many threads as necessary to run the given Flat a,
  -- and returns:
  --
  -- + An STM action that will block until completion and return the
  --   result.
  --
  -- + The IDs of all forked threads. These need to be tracked so they
  --   can be killed (either when an exception is thrown, or when one
  --   of the alternatives completes first).
  --
  -- It would be nice to have the returned STM action return an Either
  -- and keep the SomeException values somewhat explicit, but in all
  -- my testing this absolutely kills performance. Instead, we're
  -- going to use a hack of providing a TMVar to fill up with a
  -- SomeException when things fail.
  --
  -- TODO: Investigate why performance degradation on Either
  let go :: forall a.
            TMVar E.SomeException
         -> Flat a
         -> IO (STM a, DList C.ThreadId)
      go :: forall a.
TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
go TMVar SomeException
_excVar (FlatApp (FlatPure a
x)) = forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall (f :: * -> *) a. Applicative f => a -> f a
pure a
x, forall a. DList a
dlistEmpty)
      go TMVar SomeException
excVar (FlatApp (FlatAction IO a
io)) = do
        TMVar a
resVar <- forall a. IO (TMVar a)
newEmptyTMVarIO
        ThreadId
tid <- ((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId
C.forkIOWithUnmask forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore1 -> do
          Either SomeException a
res <- forall e a. Exception e => IO a -> IO (Either e a)
E.try forall a b. (a -> b) -> a -> b
$ forall b. IO b -> IO b
restore1 IO a
io
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
            forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
resultCountVar (forall a. Num a => a -> a -> a
+ Int
1)
            case Either SomeException a
res of
              Left SomeException
e  -> forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar SomeException
excVar SomeException
e
              Right a
x -> forall a. TMVar a -> a -> STM ()
putTMVar TMVar a
resVar a
x
        forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall a. TMVar a -> STM a
readTMVar TMVar a
resVar, forall a. a -> [a] -> [a]
dlistSingleton ThreadId
tid)
      go TMVar SomeException
excVar (FlatApp (FlatApply Flat (v -> a)
cf Flat v
ca)) = do
        (STM (v -> a)
f, DList ThreadId
tidsf) <- forall a.
TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
go TMVar SomeException
excVar Flat (v -> a)
cf
        (STM v
a, DList ThreadId
tidsa) <- forall a.
TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
go TMVar SomeException
excVar Flat v
ca
        forall (f :: * -> *) a. Applicative f => a -> f a
pure (STM (v -> a)
f forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> STM v
a, DList ThreadId
tidsf forall a. DList a -> DList a -> DList a
`dlistConcat` DList ThreadId
tidsa)
      go TMVar SomeException
excVar (FlatApp (FlatLiftA2 x -> y -> a
f Flat x
a Flat y
b)) = do
        (STM x
a', DList ThreadId
tidsa) <- forall a.
TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
go TMVar SomeException
excVar Flat x
a
        (STM y
b', DList ThreadId
tidsb) <- forall a.
TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
go TMVar SomeException
excVar Flat y
b
        forall (f :: * -> *) a. Applicative f => a -> f a
pure (forall (f :: * -> *) a b c.
Applicative f =>
(a -> b -> c) -> f a -> f b -> f c
liftA2 x -> y -> a
f STM x
a' STM y
b', DList ThreadId
tidsa forall a. DList a -> DList a -> DList a
`dlistConcat` DList ThreadId
tidsb)

      go TMVar SomeException
excVar0 (FlatAlt FlatApp a
x FlatApp a
y [FlatApp a]
z) = do
        -- As soon as one of the children finishes, we need to kill the siblings,
        -- we're going to create our own excVar here to pass to the children, so
        -- we can prevent the ThreadKilled exceptions we throw to the children
        -- here from propagating and taking down the whole system.
        TMVar SomeException
excVar <- forall a. IO (TMVar a)
newEmptyTMVarIO
        TMVar a
resVar <- forall a. IO (TMVar a)
newEmptyTMVarIO
        [(STM a, DList ThreadId)]
pairs <- forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
(a -> f b) -> t a -> f (t b)
traverse (forall a.
TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
go TMVar SomeException
excVar forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall a. FlatApp a -> Flat a
FlatApp) (FlatApp a
xforall a. a -> [a] -> [a]
:FlatApp a
yforall a. a -> [a] -> [a]
:[FlatApp a]
z)
        let ([STM a]
blockers, [DList ThreadId]
workerTids) = forall a b. [(a, b)] -> ([a], [b])
unzip [(STM a, DList ThreadId)]
pairs

        -- Fork a helper thread to wait for the first child to
        -- complete, or for one of them to die with an exception so we
        -- can propagate it to excVar0.
        ThreadId
helperTid <- ((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId
C.forkIOWithUnmask forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore1 -> do
          Either SomeException (Either SomeException a)
eres <- forall e a. Exception e => IO a -> IO (Either e a)
E.try forall a b. (a -> b) -> a -> b
$ forall b. IO b -> IO b
restore1 forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall (t :: * -> *) a b.
Foldable t =>
(a -> b -> b) -> b -> t a -> b
foldr
            (\STM a
blocker STM (Either SomeException a)
rest -> (forall a b. b -> Either a b
Right forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM a
blocker) forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> STM (Either SomeException a)
rest)
            (forall a b. a -> Either a b
Left forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TMVar a -> STM a
readTMVar TMVar SomeException
excVar)
            [STM a]
blockers
          forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
            forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
resultCountVar (forall a. Num a => a -> a -> a
+ Int
1)
            case Either SomeException (Either SomeException a)
eres of
              -- NOTE: The child threads are spawned from @traverse go@ call above, they
              -- are _not_ children of this helper thread, and helper thread doesn't throw
              -- synchronous exceptions, so, any exception that the try above would catch
              -- must be an async exception.
              -- We were killed by an async exception, do nothing.
              Left (SomeException
_ :: E.SomeException) -> forall (f :: * -> *) a. Applicative f => a -> f a
pure ()
              -- Child thread died, propagate it
              Right (Left SomeException
e)              -> forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall a. TMVar a -> a -> STM Bool
tryPutTMVar TMVar SomeException
excVar0 SomeException
e
              -- Successful result from one of the children
              Right (Right a
res)           -> forall a. TMVar a -> a -> STM ()
putTMVar TMVar a
resVar a
res

          -- And kill all of the threads
          forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [DList ThreadId]
workerTids forall a b. (a -> b) -> a -> b
$ \DList ThreadId
tids' ->
            -- NOTE: Replacing A.AsyncCancelled with KillThread as the
            -- 'A.AsyncCancelled' constructor is not exported in older versions
            -- of the async package
            -- for_ (tids' []) $ \workerTid -> E.throwTo workerTid A.AsyncCancelled
            forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ (forall a. DList a -> [a]
dlistToList DList ThreadId
tids') forall a b. (a -> b) -> a -> b
$ \ThreadId
workerTid -> ThreadId -> IO ()
C.killThread ThreadId
workerTid

        forall (f :: * -> *) a. Applicative f => a -> f a
pure ( forall a. TMVar a -> STM a
readTMVar TMVar a
resVar
             , ThreadId
helperTid forall a. a -> DList a -> DList a
`dlistCons` forall a. [DList a] -> DList a
dlistConcatAll [DList ThreadId]
workerTids
             )

  TMVar SomeException
excVar <- forall a. IO (TMVar a)
newEmptyTMVarIO
  (STM a
getRes, DList ThreadId
tids0) <- forall a.
TMVar SomeException -> Flat a -> IO (STM a, DList ThreadId)
go TMVar SomeException
excVar Flat a
f0
  let tids :: [ThreadId]
tids = forall a. DList a -> [a]
dlistToList DList ThreadId
tids0
      tidCount :: Int
tidCount = forall (t :: * -> *) a. Foldable t => t a -> Int
length [ThreadId]
tids
      allDone :: Int -> Bool
allDone Int
count =
        if Int
count forall a. Ord a => a -> a -> Bool
> Int
tidCount
          then forall a. HasCallStack => String -> a
error (String
"allDone: count ("
                      forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
count
                      forall a. Semigroup a => a -> a -> a
<> String
") should never be greater than tidCount ("
                      forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> String
show Int
tidCount
                      forall a. Semigroup a => a -> a -> a
<> String
")")
          else Int
count forall a. Eq a => a -> a -> Bool
== Int
tidCount

  -- Automatically retry if we get killed by a
  -- BlockedIndefinitelyOnSTM. For more information, see:
  --
  -- + https:\/\/github.com\/simonmar\/async\/issues\/14
  -- + https:\/\/github.com\/simonmar\/async\/pull\/15
  --
  let autoRetry :: IO a -> IO a
autoRetry IO a
action =
        IO a
action forall e a. Exception e => IO a -> (e -> IO a) -> IO a
`E.catch`
        \BlockedIndefinitelyOnSTM
E.BlockedIndefinitelyOnSTM -> IO a -> IO a
autoRetry IO a
action

  -- Restore the original masking state while blocking and catch
  -- exceptions to allow the parent thread to be killed early.
  Either SomeException (Either SomeException a)
res <- forall e a. Exception e => IO a -> IO (Either e a)
E.try forall a b. (a -> b) -> a -> b
$ forall b. IO b -> IO b
restore forall a b. (a -> b) -> a -> b
$ forall b. IO b -> IO b
autoRetry forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$
         (forall a b. a -> Either a b
Left forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. TMVar a -> STM a
readTMVar TMVar SomeException
excVar) forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|>
         (forall a b. b -> Either a b
Right forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> STM a
getRes)

  Int
count0 <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar Int
resultCountVar
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Int -> Bool
allDone Int
count0) forall a b. (a -> b) -> a -> b
$ do
    -- Kill all of the threads
    -- NOTE: Replacing A.AsyncCancelled with KillThread as the
    -- 'A.AsyncCancelled' constructor is not exported in older versions
    -- of the async package
    -- for_ tids $ \tid -> E.throwTo tid A.AsyncCancelled
    forall (t :: * -> *) (f :: * -> *) a b.
(Foldable t, Applicative f) =>
t a -> (a -> f b) -> f ()
for_ [ThreadId]
tids forall a b. (a -> b) -> a -> b
$ \ThreadId
tid -> ThreadId -> IO ()
C.killThread ThreadId
tid

    -- Wait for all of the threads to die. We're going to restore the original
    -- masking state here, just in case there's a bug in the cleanup code of a
    -- child thread, so that we can be killed by an async exception. We decided
    -- this is a better behavior than hanging indefinitely and wait for a SIGKILL.
    forall b. IO b -> IO b
restore forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
      Int
count <- forall a. TVar a -> STM a
readTVar TVar Int
resultCountVar
      -- retries until resultCountVar has increased to the threadId count returned by go
      Bool -> STM ()
check forall a b. (a -> b) -> a -> b
$ Int -> Bool
allDone Int
count

  -- Return the result or throw an exception. Yes, we could use
  -- either or join, but explicit pattern matching is nicer here.
  case Either SomeException (Either SomeException a)
res of
    -- Parent thread was killed with an async exception
    Left SomeException
e          -> forall e a. Exception e => e -> IO a
E.throwIO (SomeException
e :: E.SomeException)
    -- Some child thread died
    Right (Left SomeException
e)  -> forall e a. Exception e => e -> IO a
E.throwIO SomeException
e
    -- Everything worked!
    Right (Right a
x) -> forall (f :: * -> *) a. Applicative f => a -> f a
pure a
x
{-# INLINEABLE runFlat #-}

--------------------------------------------------------------------------------
#else
--------------------------------------------------------------------------------

-- | Unlifted 'A.mapConcurrently'.
--
-- @since 0.1.0.0
mapConcurrently :: MonadUnliftIO m => Traversable t => (a -> m b) -> t a -> m (t b)
mapConcurrently f t = withRunInIO $ \run -> A.mapConcurrently (run . f) t
{-# INLINE mapConcurrently #-}

-- | Unlifted 'A.mapConcurrently_'.
--
-- @since 0.1.0.0
mapConcurrently_ :: MonadUnliftIO m => Foldable f => (a -> m b) -> f a -> m ()
mapConcurrently_ f t = withRunInIO $ \run -> A.mapConcurrently_ (run . f) t
{-# INLINE mapConcurrently_ #-}

--------------------------------------------------------------------------------
#endif
--------------------------------------------------------------------------------

-- | Like 'mapConcurrently' from async, but instead of one thread per
-- element, it does pooling from a set of threads. This is useful in
-- scenarios where resource consumption is bounded and for use cases
-- where too many concurrent tasks aren't allowed.
--
-- === __Example usage__
--
-- @
-- import Say
--
-- action :: Int -> IO Int
-- action n = do
--   tid <- myThreadId
--   sayString $ show tid
--   threadDelay (2 * 10^6) -- 2 seconds
--   return n
--
-- main :: IO ()
-- main = do
--   yx \<- pooledMapConcurrentlyN 5 (\\x -\> action x) [1..5]
--   print yx
-- @
--
-- On executing you can see that five threads have been spawned:
--
-- @
-- \$ ./pool
-- ThreadId 36
-- ThreadId 38
-- ThreadId 40
-- ThreadId 42
-- ThreadId 44
-- [1,2,3,4,5]
-- @
--
--
-- Let's modify the above program such that there are less threads
-- than the number of items in the list:
--
-- @
-- import Say
--
-- action :: Int -> IO Int
-- action n = do
--   tid <- myThreadId
--   sayString $ show tid
--   threadDelay (2 * 10^6) -- 2 seconds
--   return n
--
-- main :: IO ()
-- main = do
--   yx \<- pooledMapConcurrentlyN 3 (\\x -\> action x) [1..5]
--   print yx
-- @
-- On executing you can see that only three threads are active totally:
--
-- @
-- \$ ./pool
-- ThreadId 35
-- ThreadId 37
-- ThreadId 39
-- ThreadId 35
-- ThreadId 39
-- [1,2,3,4,5]
-- @
--
-- @since 0.2.10
pooledMapConcurrentlyN :: (MonadUnliftIO m, Traversable t)
                      => Int -- ^ Max. number of threads. Should not be less than 1.
                      -> (a -> m b) -> t a -> m (t b)
pooledMapConcurrentlyN :: forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
Int -> (a -> m b) -> t a -> m (t b)
pooledMapConcurrentlyN Int
numProcs a -> m b
f t a
xs =
    forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall (t :: * -> *) a b.
Traversable t =>
Int -> (a -> IO b) -> t a -> IO (t b)
pooledMapConcurrentlyIO Int
numProcs (forall a. m a -> IO a
run forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f) t a
xs

-- | Similar to 'pooledMapConcurrentlyN' but with number of threads
-- set from 'getNumCapabilities'. Usually this is useful for CPU bound
-- tasks.
--
-- @since 0.2.10
pooledMapConcurrently :: (MonadUnliftIO m, Traversable t) => (a -> m b) -> t a -> m (t b)
pooledMapConcurrently :: forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
(a -> m b) -> t a -> m (t b)
pooledMapConcurrently a -> m b
f t a
xs = do
  forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
    Int
numProcs <- IO Int
getNumCapabilities
    forall (t :: * -> *) a b.
Traversable t =>
Int -> (a -> IO b) -> t a -> IO (t b)
pooledMapConcurrentlyIO Int
numProcs (forall a. m a -> IO a
run forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f) t a
xs

-- | Similar to 'pooledMapConcurrentlyN' but with flipped arguments.
--
-- @since 0.2.10
pooledForConcurrentlyN :: (MonadUnliftIO m, Traversable t)
                      => Int -- ^ Max. number of threads. Should not be less than 1.
                      -> t a -> (a -> m b) -> m (t b)
pooledForConcurrentlyN :: forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
Int -> t a -> (a -> m b) -> m (t b)
pooledForConcurrentlyN Int
numProcs = forall a b c. (a -> b -> c) -> b -> a -> c
flip (forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
Int -> (a -> m b) -> t a -> m (t b)
pooledMapConcurrentlyN Int
numProcs)

-- | Similar to 'pooledForConcurrentlyN' but with number of threads
-- set from 'getNumCapabilities'. Usually this is useful for CPU bound
-- tasks.
--
-- @since 0.2.10
pooledForConcurrently :: (MonadUnliftIO m, Traversable t) => t a -> (a -> m b) -> m (t b)
pooledForConcurrently :: forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
t a -> (a -> m b) -> m (t b)
pooledForConcurrently = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
(a -> m b) -> t a -> m (t b)
pooledMapConcurrently

pooledMapConcurrentlyIO :: Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
pooledMapConcurrentlyIO :: forall (t :: * -> *) a b.
Traversable t =>
Int -> (a -> IO b) -> t a -> IO (t b)
pooledMapConcurrentlyIO Int
numProcs a -> IO b
f t a
xs =
    if (Int
numProcs forall a. Ord a => a -> a -> Bool
< Int
1)
    then forall a. HasCallStack => String -> a
error String
"pooledMapconcurrentlyIO: number of threads < 1"
    else forall (t :: * -> *) a b.
Traversable t =>
Int -> (a -> IO b) -> t a -> IO (t b)
pooledMapConcurrentlyIO' Int
numProcs a -> IO b
f t a
xs

-- | Performs the actual pooling for the tasks. This function will
-- continue execution until the task queue becomes empty. When one of
-- the pooled thread finishes it's task, it will pickup the next task
-- from the queue if an job is available.
pooledConcurrently
  :: Int -- ^ Max. number of threads. Should not be less than 1.
  -> IORef [a] -- ^ Task queue. These are required as inputs for the jobs.
  -> (a -> IO ()) -- ^ The task which will be run concurrently (but
                 -- will be pooled properly).
  -> IO ()
pooledConcurrently :: forall a. Int -> IORef [a] -> (a -> IO ()) -> IO ()
pooledConcurrently Int
numProcs IORef [a]
jobsVar a -> IO ()
f = do
  forall (m :: * -> *) a.
(Applicative m, MonadUnliftIO m) =>
Int -> m a -> m ()
replicateConcurrently_ Int
numProcs forall a b. (a -> b) -> a -> b
$ do
    let loop :: IO ()
loop = do
          Maybe a
mbJob :: Maybe a <- forall a b. IORef a -> (a -> (a, b)) -> IO b
atomicModifyIORef' IORef [a]
jobsVar forall a b. (a -> b) -> a -> b
$ \[a]
x -> case [a]
x of
            [] -> ([], forall a. Maybe a
Nothing)
            a
var : [a]
vars -> ([a]
vars, forall a. a -> Maybe a
Just a
var)
          case Maybe a
mbJob of
            Maybe a
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Just a
x -> do
              a -> IO ()
f a
x
              IO ()
loop
     in IO ()
loop

pooledMapConcurrentlyIO' ::
    Traversable t => Int  -- ^ Max. number of threads. Should not be less than 1.
                  -> (a -> IO b)
                  -> t a
                  -> IO (t b)
pooledMapConcurrentlyIO' :: forall (t :: * -> *) a b.
Traversable t =>
Int -> (a -> IO b) -> t a -> IO (t b)
pooledMapConcurrentlyIO' Int
numProcs a -> IO b
f t a
xs = do
  -- prepare one IORef per result...
  t (a, IORef b)
jobs :: t (a, IORef b) <-
    forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for t a
xs (\a
x -> (a
x, ) forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall a. a -> IO (IORef a)
newIORef (forall a. HasCallStack => String -> a
error String
"pooledMapConcurrentlyIO': empty IORef"))
  -- ...put all the inputs in a queue..
  IORef [(a, IORef b)]
jobsVar :: IORef [(a, IORef b)] <- forall a. a -> IO (IORef a)
newIORef (forall (t :: * -> *) a. Foldable t => t a -> [a]
toList t (a, IORef b)
jobs)
  -- ...run `numProcs` threads in parallel, each
  -- of them consuming the queue and filling in
  -- the respective IORefs.
  forall a. Int -> IORef [a] -> (a -> IO ()) -> IO ()
pooledConcurrently Int
numProcs IORef [(a, IORef b)]
jobsVar forall a b. (a -> b) -> a -> b
$ \ (a
x, IORef b
outRef) -> a -> IO b
f a
x forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall a. IORef a -> a -> IO ()
atomicWriteIORef IORef b
outRef      -- Read all the IORefs
  forall (t :: * -> *) (f :: * -> *) a b.
(Traversable t, Applicative f) =>
t a -> (a -> f b) -> f (t b)
for t (a, IORef b)
jobs (\(a
_, IORef b
outputRef) -> forall a. IORef a -> IO a
readIORef IORef b
outputRef)

pooledMapConcurrentlyIO_' ::
  Foldable t => Int -> (a -> IO ()) -> t a -> IO ()
pooledMapConcurrentlyIO_' :: forall (t :: * -> *) a.
Foldable t =>
Int -> (a -> IO ()) -> t a -> IO ()
pooledMapConcurrentlyIO_' Int
numProcs a -> IO ()
f t a
jobs = do
  IORef [a]
jobsVar :: IORef [a] <- forall a. a -> IO (IORef a)
newIORef (forall (t :: * -> *) a. Foldable t => t a -> [a]
toList t a
jobs)
  forall a. Int -> IORef [a] -> (a -> IO ()) -> IO ()
pooledConcurrently Int
numProcs IORef [a]
jobsVar a -> IO ()
f

pooledMapConcurrentlyIO_ :: Foldable t => Int -> (a -> IO b) -> t a -> IO ()
pooledMapConcurrentlyIO_ :: forall (t :: * -> *) a b.
Foldable t =>
Int -> (a -> IO b) -> t a -> IO ()
pooledMapConcurrentlyIO_ Int
numProcs a -> IO b
f t a
xs =
    if (Int
numProcs forall a. Ord a => a -> a -> Bool
< Int
1)
    then forall a. HasCallStack => String -> a
error String
"pooledMapconcurrentlyIO_: number of threads < 1"
    else forall (t :: * -> *) a.
Foldable t =>
Int -> (a -> IO ()) -> t a -> IO ()
pooledMapConcurrentlyIO_' Int
numProcs (\a
x -> a -> IO b
f a
x forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> forall (m :: * -> *) a. Monad m => a -> m a
return ()) t a
xs

-- | Like 'pooledMapConcurrentlyN' but with the return value
-- discarded.
--
-- @since 0.2.10
pooledMapConcurrentlyN_ :: (MonadUnliftIO m, Foldable f)
                        => Int -- ^ Max. number of threads. Should not be less than 1.
                        -> (a -> m b) -> f a -> m ()
pooledMapConcurrentlyN_ :: forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
Int -> (a -> m b) -> f a -> m ()
pooledMapConcurrentlyN_ Int
numProcs a -> m b
f f a
t =
  forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> forall (t :: * -> *) a b.
Foldable t =>
Int -> (a -> IO b) -> t a -> IO ()
pooledMapConcurrentlyIO_ Int
numProcs (forall a. m a -> IO a
run forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f) f a
t

-- | Like 'pooledMapConcurrently' but with the return value discarded.
--
-- @since 0.2.10
pooledMapConcurrently_ :: (MonadUnliftIO m, Foldable f) => (a -> m b) -> f a -> m ()
pooledMapConcurrently_ :: forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
(a -> m b) -> f a -> m ()
pooledMapConcurrently_ a -> m b
f f a
t =
  forall (m :: * -> *) b.
MonadUnliftIO m =>
((forall a. m a -> IO a) -> IO b) -> m b
withRunInIO forall a b. (a -> b) -> a -> b
$ \forall a. m a -> IO a
run -> do
    Int
numProcs <- IO Int
getNumCapabilities
    forall (t :: * -> *) a b.
Foldable t =>
Int -> (a -> IO b) -> t a -> IO ()
pooledMapConcurrentlyIO_ Int
numProcs (forall a. m a -> IO a
run forall b c a. (b -> c) -> (a -> b) -> a -> c
. a -> m b
f) f a
t

-- | Like 'pooledMapConcurrently_' but with flipped arguments.
--
-- @since 0.2.10
pooledForConcurrently_ :: (MonadUnliftIO m, Foldable f) => f a -> (a -> m b) -> m ()
pooledForConcurrently_ :: forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
f a -> (a -> m b) -> m ()
pooledForConcurrently_ = forall a b c. (a -> b -> c) -> b -> a -> c
flip forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
(a -> m b) -> f a -> m ()
pooledMapConcurrently_

-- | Like 'pooledMapConcurrentlyN_' but with flipped arguments.
--
-- @since 0.2.10
pooledForConcurrentlyN_ :: (MonadUnliftIO m, Foldable t)
                        => Int -- ^ Max. number of threads. Should not be less than 1.
                        -> t a -> (a -> m b) -> m ()
pooledForConcurrentlyN_ :: forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Foldable t) =>
Int -> t a -> (a -> m b) -> m ()
pooledForConcurrentlyN_ Int
numProcs = forall a b c. (a -> b -> c) -> b -> a -> c
flip (forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
Int -> (a -> m b) -> f a -> m ()
pooledMapConcurrentlyN_ Int
numProcs)


-- | Pooled version of 'replicateConcurrently'. Performs the action in
-- the pooled threads.
--
-- @since 0.2.10
pooledReplicateConcurrentlyN :: (MonadUnliftIO m)
                             => Int -- ^ Max. number of threads. Should not be less than 1.
                             -> Int -- ^ Number of times to perform the action.
                             -> m a -> m [a]
pooledReplicateConcurrentlyN :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> Int -> m a -> m [a]
pooledReplicateConcurrentlyN Int
numProcs Int
cnt m a
task =
    if Int
cnt forall a. Ord a => a -> a -> Bool
< Int
1
    then forall (m :: * -> *) a. Monad m => a -> m a
return []
    else forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
Int -> (a -> m b) -> t a -> m (t b)
pooledMapConcurrentlyN Int
numProcs (\Int
_ -> m a
task) [Int
1..Int
cnt]

-- | Similar to 'pooledReplicateConcurrentlyN' but with number of
-- threads set from 'getNumCapabilities'. Usually this is useful for
-- CPU bound tasks.
--
-- @since 0.2.10
pooledReplicateConcurrently :: (MonadUnliftIO m)
                            => Int -- ^ Number of times to perform the action.
                            -> m a -> m [a]
pooledReplicateConcurrently :: forall (m :: * -> *) a. MonadUnliftIO m => Int -> m a -> m [a]
pooledReplicateConcurrently Int
cnt m a
task =
    if Int
cnt forall a. Ord a => a -> a -> Bool
< Int
1
    then forall (m :: * -> *) a. Monad m => a -> m a
return []
    else forall (m :: * -> *) (t :: * -> *) a b.
(MonadUnliftIO m, Traversable t) =>
(a -> m b) -> t a -> m (t b)
pooledMapConcurrently (\Int
_ -> m a
task) [Int
1..Int
cnt]

-- | Pooled version of 'replicateConcurrently_'. Performs the action in
-- the pooled threads.
--
-- @since 0.2.10
pooledReplicateConcurrentlyN_ :: (MonadUnliftIO m)
                              => Int -- ^ Max. number of threads. Should not be less than 1.
                              -> Int -- ^ Number of times to perform the action.
                              -> m a -> m ()
pooledReplicateConcurrentlyN_ :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> Int -> m a -> m ()
pooledReplicateConcurrentlyN_ Int
numProcs Int
cnt m a
task =
  if Int
cnt forall a. Ord a => a -> a -> Bool
< Int
1
  then forall (m :: * -> *) a. Monad m => a -> m a
return ()
  else forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
Int -> (a -> m b) -> f a -> m ()
pooledMapConcurrentlyN_ Int
numProcs (\Int
_ -> m a
task) [Int
1..Int
cnt]

-- | Similar to 'pooledReplicateConcurrently_' but with number of
-- threads set from 'getNumCapabilities'. Usually this is useful for
-- CPU bound tasks.
--
-- @since 0.2.10
pooledReplicateConcurrently_ :: (MonadUnliftIO m)
                             => Int -- ^ Number of times to perform the action.
                             -> m a -> m ()
pooledReplicateConcurrently_ :: forall (m :: * -> *) a. MonadUnliftIO m => Int -> m a -> m ()
pooledReplicateConcurrently_ Int
cnt m a
task =
  if Int
cnt forall a. Ord a => a -> a -> Bool
< Int
1
  then forall (m :: * -> *) a. Monad m => a -> m a
return ()
  else forall (m :: * -> *) (f :: * -> *) a b.
(MonadUnliftIO m, Foldable f) =>
(a -> m b) -> f a -> m ()
pooledMapConcurrently_ (\Int
_ -> m a
task) [Int
1..Int
cnt]