{-# LANGUAGE CPP
           , DeriveDataTypeable
           , NoImplicitPrelude
           , ImpredicativeTypes
           , RankNTypes #-}

#if __GLASGOW_HASKELL__ >= 701
{-# LANGUAGE Trustworthy #-}
#endif

--------------------------------------------------------------------------------
-- |
-- Module     : Control.Concurrent.Thread.Group
-- Copyright  : (c) 2010-2012 Bas van Dijk & Roel van Dijk
-- License    : BSD3 (see the file LICENSE)
-- Maintainer : Bas van Dijk <v.dijk.bas@gmail.com>
--            , Roel van Dijk <vandijk.roel@gmail.com>
--
-- This module extends @Control.Concurrent.Thread@ with the ability to wait for
-- a group of threads to terminate.
--
-- This module exports equivalently named functions from @Control.Concurrent@,
-- (@GHC.Conc@), and @Control.Concurrent.Thread@. Avoid ambiguities by importing
-- this module qualified. May we suggest:
--
-- @
-- import Control.Concurrent.Thread.Group ( ThreadGroup )
-- import qualified Control.Concurrent.Thread.Group as ThreadGroup ( ... )
-- @
--
--------------------------------------------------------------------------------

module Control.Concurrent.Thread.Group
    ( ThreadGroup
    , new
    , nrOfRunning
    , wait
    , waitN

      -- * Forking threads
    , forkIO
    , forkOS
    , forkOn
    , forkIOWithUnmask
    , forkOnWithUnmask
    ) where


--------------------------------------------------------------------------------
-- Imports
--------------------------------------------------------------------------------

-- from base:
import qualified Control.Concurrent     ( forkOS
                                        , forkIOWithUnmask
                                        , forkOnWithUnmask
                                        )
import Control.Concurrent               ( ThreadId )
import Control.Concurrent.MVar          ( newEmptyMVar, putMVar, readMVar )
import Control.Exception                ( try, mask )
import Control.Monad                    ( return, (>>=), when )
import Data.Function                    ( (.), ($) )
import Data.Functor                     ( fmap )
import Data.Eq                          ( Eq )
import Data.Ord                         ( (>=) )
import Data.Int                         ( Int )
import Data.Typeable                    ( Typeable )
import Prelude                          ( ($!), (+), subtract )
import System.IO                        ( IO )

-- from stm:
import Control.Concurrent.STM.TVar      ( TVar, newTVarIO, readTVar, writeTVar )
import Control.Concurrent.STM           ( STM, atomically, retry )

-- from threads:
import Control.Concurrent.Thread        ( Result )
import Control.Concurrent.Raw           ( rawForkIO, rawForkOn )
#ifdef __HADDOCK__
import qualified Control.Concurrent.Thread as Thread ( forkIO
                                                     , forkOS
                                                     , forkOn
                                                     , forkIOWithUnmask
                                                     , forkOnWithUnmask
                                                     )
#endif


--------------------------------------------------------------------------------
-- * Thread groups
--------------------------------------------------------------------------------

{-| A @ThreadGroup@ can be understood as a counter which counts the number of
threads that were added to the group minus the ones that have terminated.

More formally a @ThreadGroup@ has the following semantics:

* 'new' initializes the counter to 0.

* Forking a thread increments the counter.

* When a forked thread terminates, whether normally or by raising an exception,
  the counter is decremented.

* 'nrOfRunning' yields a transaction that returns the counter.

* 'wait' blocks as long as the counter is greater than 0.

* 'waitN' blocks as long as the counter is greater or equal to the
   specified number.
-}
newtype ThreadGroup = ThreadGroup (TVar Int) deriving (ThreadGroup -> ThreadGroup -> Bool
(ThreadGroup -> ThreadGroup -> Bool)
-> (ThreadGroup -> ThreadGroup -> Bool) -> Eq ThreadGroup
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: ThreadGroup -> ThreadGroup -> Bool
$c/= :: ThreadGroup -> ThreadGroup -> Bool
== :: ThreadGroup -> ThreadGroup -> Bool
$c== :: ThreadGroup -> ThreadGroup -> Bool
Eq, Typeable)

-- | Create an empty group of threads.
new :: IO ThreadGroup
new :: IO ThreadGroup
new = (TVar Int -> ThreadGroup) -> IO (TVar Int) -> IO ThreadGroup
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap TVar Int -> ThreadGroup
ThreadGroup (IO (TVar Int) -> IO ThreadGroup)
-> IO (TVar Int) -> IO ThreadGroup
forall a b. (a -> b) -> a -> b
$ Int -> IO (TVar Int)
forall a. a -> IO (TVar a)
newTVarIO Int
0

{-| Yield a transaction that returns the number of running threads in the
group.

Note that because this function yields a 'STM' computation, the returned number
is guaranteed to be consistent inside the transaction.
-}
nrOfRunning :: ThreadGroup -> STM Int
nrOfRunning :: ThreadGroup -> STM Int
nrOfRunning (ThreadGroup TVar Int
numThreadsTV) = TVar Int -> STM Int
forall a. TVar a -> STM a
readTVar TVar Int
numThreadsTV

-- | Block until all threads in the group have terminated.
--
-- Note that: @wait = 'waitN' 1@.
wait :: ThreadGroup -> IO ()
wait :: ThreadGroup -> IO ()
wait = Int -> ThreadGroup -> IO ()
waitN Int
1

-- | Block until there are fewer than @N@ running threads in the group.
waitN :: Int -> ThreadGroup -> IO ()
waitN :: Int -> ThreadGroup -> IO ()
waitN Int
i ThreadGroup
tg = STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ ThreadGroup -> STM Int
nrOfRunning ThreadGroup
tg STM Int -> (Int -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Int
n -> Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Int
n Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
>= Int
i) STM ()
forall a. STM a
retry


--------------------------------------------------------------------------------
-- * Forking threads
--------------------------------------------------------------------------------

-- | Same as @Control.Concurrent.Thread.'Thread.forkIO'@ but additionaly adds
-- the thread to the group.
forkIO :: ThreadGroup -> IO a -> IO (ThreadId, IO (Result a))
forkIO :: ThreadGroup -> IO a -> IO (ThreadId, IO (Result a))
forkIO = (IO () -> IO ThreadId)
-> ThreadGroup -> IO a -> IO (ThreadId, IO (Result a))
forall a.
(IO () -> IO ThreadId)
-> ThreadGroup -> IO a -> IO (ThreadId, IO (Result a))
fork IO () -> IO ThreadId
rawForkIO

-- | Same as @Control.Concurrent.Thread.'Thread.forkOS'@ but additionaly adds
-- the thread to the group.
forkOS :: ThreadGroup -> IO a -> IO (ThreadId, IO (Result a))
forkOS :: ThreadGroup -> IO a -> IO (ThreadId, IO (Result a))
forkOS = (IO () -> IO ThreadId)
-> ThreadGroup -> IO a -> IO (ThreadId, IO (Result a))
forall a.
(IO () -> IO ThreadId)
-> ThreadGroup -> IO a -> IO (ThreadId, IO (Result a))
fork IO () -> IO ThreadId
Control.Concurrent.forkOS

-- | Same as @Control.Concurrent.Thread.'Thread.forkOn'@ but
-- additionaly adds the thread to the group.
forkOn :: Int -> ThreadGroup -> IO a -> IO (ThreadId, IO (Result a))
forkOn :: Int -> ThreadGroup -> IO a -> IO (ThreadId, IO (Result a))
forkOn = (IO () -> IO ThreadId)
-> ThreadGroup -> IO a -> IO (ThreadId, IO (Result a))
forall a.
(IO () -> IO ThreadId)
-> ThreadGroup -> IO a -> IO (ThreadId, IO (Result a))
fork ((IO () -> IO ThreadId)
 -> ThreadGroup -> IO a -> IO (ThreadId, IO (Result a)))
-> (Int -> IO () -> IO ThreadId)
-> Int
-> ThreadGroup
-> IO a
-> IO (ThreadId, IO (Result a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> IO () -> IO ThreadId
rawForkOn

-- | Same as @Control.Concurrent.Thread.'Thread.forkIOWithUnmask'@ but
-- additionaly adds the thread to the group.
forkIOWithUnmask
    :: ThreadGroup
    -> ((forall b. IO b -> IO b) -> IO a)
    -> IO (ThreadId, IO (Result a))
forkIOWithUnmask :: ThreadGroup
-> ((forall b. IO b -> IO b) -> IO a)
-> IO (ThreadId, IO (Result a))
forkIOWithUnmask = (((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId)
-> ThreadGroup
-> ((forall b. IO b -> IO b) -> IO a)
-> IO (ThreadId, IO (Result a))
forall a.
(((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId)
-> ThreadGroup
-> ((forall b. IO b -> IO b) -> IO a)
-> IO (ThreadId, IO (Result a))
forkWithUnmask ((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId
Control.Concurrent.forkIOWithUnmask

-- | Like @Control.Concurrent.Thread.'Thread.forkOnWithUnmask'@ but
-- additionaly adds the thread to the group.
forkOnWithUnmask
    :: Int
    -> ThreadGroup
    -> ((forall b. IO b -> IO b) -> IO a)
    -> IO (ThreadId, IO (Result a))
forkOnWithUnmask :: Int
-> ThreadGroup
-> ((forall b. IO b -> IO b) -> IO a)
-> IO (ThreadId, IO (Result a))
forkOnWithUnmask = (((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId)
-> ThreadGroup
-> ((forall b. IO b -> IO b) -> IO a)
-> IO (ThreadId, IO (Result a))
forall a.
(((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId)
-> ThreadGroup
-> ((forall b. IO b -> IO b) -> IO a)
-> IO (ThreadId, IO (Result a))
forkWithUnmask ((((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId)
 -> ThreadGroup
 -> ((forall b. IO b -> IO b) -> IO a)
 -> IO (ThreadId, IO (Result a)))
-> (Int -> ((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId)
-> Int
-> ThreadGroup
-> ((forall b. IO b -> IO b) -> IO a)
-> IO (ThreadId, IO (Result a))
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Int -> ((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId
Control.Concurrent.forkOnWithUnmask


--------------------------------------------------------------------------------
-- Utils
--------------------------------------------------------------------------------

fork :: (IO () -> IO ThreadId)
     -> ThreadGroup
     -> IO a
     -> IO (ThreadId, IO (Result a))
fork :: (IO () -> IO ThreadId)
-> ThreadGroup -> IO a -> IO (ThreadId, IO (Result a))
fork IO () -> IO ThreadId
doFork (ThreadGroup TVar Int
numThreadsTV) IO a
a = do
  MVar (Result a)
res <- IO (MVar (Result a))
forall a. IO (MVar a)
newEmptyMVar
  ThreadId
tid <- ((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId
forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask (((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId)
-> ((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore -> do
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar Int
numThreadsTV (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
    IO () -> IO ThreadId
doFork (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ do
      IO a -> IO (Result a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall b. IO b -> IO b
restore IO a
a) IO (Result a) -> (Result a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar (Result a) -> Result a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Result a)
res
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar Int
numThreadsTV (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
  (ThreadId, IO (Result a)) -> IO (ThreadId, IO (Result a))
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
tid, MVar (Result a) -> IO (Result a)
forall a. MVar a -> IO a
readMVar MVar (Result a)
res)

forkWithUnmask
    :: (((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId)
    -> ThreadGroup
    -> ((forall b. IO b -> IO b) -> IO a)
    -> IO (ThreadId, IO (Result a))
forkWithUnmask :: (((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId)
-> ThreadGroup
-> ((forall b. IO b -> IO b) -> IO a)
-> IO (ThreadId, IO (Result a))
forkWithUnmask ((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId
doForkWithUnmask = \(ThreadGroup TVar Int
numThreadsTV) (forall b. IO b -> IO b) -> IO a
f -> do
  MVar (Result a)
res <- IO (MVar (Result a))
forall a. IO (MVar a)
newEmptyMVar
  ThreadId
tid <- ((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId
forall b. ((forall b. IO b -> IO b) -> IO b) -> IO b
mask (((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId)
-> ((forall b. IO b -> IO b) -> IO ThreadId) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
restore -> do
    STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar Int
numThreadsTV (Int -> Int -> Int
forall a. Num a => a -> a -> a
+ Int
1)
    ((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId
doForkWithUnmask (((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId)
-> ((forall b. IO b -> IO b) -> IO ()) -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ \forall b. IO b -> IO b
unmask -> do
      IO a -> IO (Result a)
forall e a. Exception e => IO a -> IO (Either e a)
try (IO a -> IO a
forall b. IO b -> IO b
restore (IO a -> IO a) -> IO a -> IO a
forall a b. (a -> b) -> a -> b
$ (forall b. IO b -> IO b) -> IO a
f forall b. IO b -> IO b
unmask) IO (Result a) -> (Result a -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= MVar (Result a) -> Result a -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Result a)
res
      STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ TVar Int -> (Int -> Int) -> STM ()
forall a. TVar a -> (a -> a) -> STM ()
modifyTVar TVar Int
numThreadsTV (Int -> Int -> Int
forall a. Num a => a -> a -> a
subtract Int
1)
  (ThreadId, IO (Result a)) -> IO (ThreadId, IO (Result a))
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
tid, MVar (Result a) -> IO (Result a)
forall a. MVar a -> IO a
readMVar MVar (Result a)
res)

-- | Strictly modify the contents of a 'TVar'.
modifyTVar :: TVar a -> (a -> a) -> STM ()
modifyTVar :: TVar a -> (a -> a) -> STM ()
modifyTVar TVar a
tv a -> a
f = TVar a -> STM a
forall a. TVar a -> STM a
readTVar TVar a
tv STM a -> (a -> STM ()) -> STM ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= TVar a -> a -> STM ()
forall a. TVar a -> a -> STM ()
writeTVar TVar a
tv (a -> STM ()) -> (a -> a) -> a -> STM ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
.! a -> a
f

-- | Strict function composition
(.!) :: (b -> c) -> (a -> b) -> (a -> c)
b -> c
f .! :: (b -> c) -> (a -> b) -> a -> c
.! a -> b
g = \a
x -> b -> c
f (b -> c) -> b -> c
forall a b. (a -> b) -> a -> b
$! a -> b
g a
x