{-# LANGUAGE FlexibleContexts #-}
-----------------------------------------------------------------------------
-- |
-- Module      :  Distribution.Client.JobControl
-- Copyright   :  (c) Duncan Coutts 2012
-- License     :  BSD-like
--
-- Maintainer  :  cabal-devel@haskell.org
-- Stability   :  provisional
-- Portability :  portable
--
-- A job control concurrency abstraction
-----------------------------------------------------------------------------
module Distribution.Client.JobControl (
    JobControl,
    newSerialJobControl,
    newParallelJobControl,
    spawnJob,
    collectJob,
    remainingJobs,
    cancelJobs,

    JobLimit,
    newJobLimit,
    withJobLimit,

    Lock,
    newLock,
    criticalSection
  ) where

import Distribution.Client.Compat.Prelude
import Prelude ()

import Control.Monad (forever, replicateM_)
import Control.Concurrent (forkIO)
import Control.Concurrent.MVar
import Control.Concurrent.STM (STM, atomically)
import Control.Concurrent.STM.TVar
import Control.Concurrent.STM.TChan
import Control.Exception (bracket_, try)
import Distribution.Compat.Stack
import Distribution.Client.Compat.Semaphore


-- | A simple concurrency abstraction. Jobs can be spawned and can complete
-- in any order. This allows both serial and parallel implementations.
--
data JobControl m a = JobControl {
       -- | Add a new job to the pool of jobs
       forall (m :: * -> *) a. JobControl m a -> m a -> m ()
spawnJob    :: m a -> m (),

       -- | Wait until one job is complete
       forall (m :: * -> *) a. JobControl m a -> m a
collectJob  :: m a,

       -- | Returns True if there are any outstanding jobs
       -- (ie spawned but yet to be collected)
       forall (m :: * -> *) a. JobControl m a -> m Bool
remainingJobs :: m Bool,

       -- | Try to cancel any outstanding but not-yet-started jobs.
       -- Call 'remainingJobs' after this to find out if any jobs are left
       -- (ie could not be cancelled).
       forall (m :: * -> *) a. JobControl m a -> m ()
cancelJobs  :: m ()
     }


-- | Make a 'JobControl' that executes all jobs serially and in order.
-- It only executes jobs on demand when they are collected, not eagerly.
--
-- Cancelling will cancel /all/ jobs that have not been collected yet.
--
newSerialJobControl :: IO (JobControl IO a)
newSerialJobControl :: forall a. IO (JobControl IO a)
newSerialJobControl = do
    TChan (IO a)
qVar <- forall a. IO (TChan a)
newTChanIO
    forall (m :: * -> *) a. Monad m => a -> m a
return JobControl {
      spawnJob :: IO a -> IO ()
spawnJob      = forall a. TChan (IO a) -> IO a -> IO ()
spawn     TChan (IO a)
qVar,
      collectJob :: IO a
collectJob    = forall a. TChan (IO a) -> IO a
collect   TChan (IO a)
qVar,
      remainingJobs :: IO Bool
remainingJobs = forall a. TChan (IO a) -> IO Bool
remaining TChan (IO a)
qVar,
      cancelJobs :: IO ()
cancelJobs    = forall a. TChan (IO a) -> IO ()
cancel    TChan (IO a)
qVar
    }
  where
    spawn :: TChan (IO a) -> IO a -> IO ()
    spawn :: forall a. TChan (IO a) -> IO a -> IO ()
spawn TChan (IO a)
qVar IO a
job = forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> a -> STM ()
writeTChan TChan (IO a)
qVar IO a
job

    collect :: TChan (IO a) -> IO a
    collect :: forall a. TChan (IO a) -> IO a
collect TChan (IO a)
qVar =
      forall (m :: * -> *) a. Monad m => m (m a) -> m a
join forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> STM a
readTChan TChan (IO a)
qVar

    remaining :: TChan (IO a) -> IO Bool
    remaining :: forall a. TChan (IO a) -> IO Bool
remaining TChan (IO a)
qVar  = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Bool -> Bool
not forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> STM Bool
isEmptyTChan TChan (IO a)
qVar

    cancel :: TChan (IO a) -> IO ()
    cancel :: forall a. TChan (IO a) -> IO ()
cancel TChan (IO a)
qVar = do
      [IO a]
_ <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> STM [a]
readAllTChan TChan (IO a)
qVar
      forall (m :: * -> *) a. Monad m => a -> m a
return ()

-- | Make a 'JobControl' that eagerly executes jobs in parallel, with a given
-- maximum degree of parallelism.
--
-- Cancelling will cancel jobs that have not yet begun executing, but jobs
-- that have already been executed or are currently executing cannot be
-- cancelled.
--
newParallelJobControl :: WithCallStack (Int -> IO (JobControl IO a))
newParallelJobControl :: forall a. WithCallStack (Int -> IO (JobControl IO a))
newParallelJobControl Int
n | Int
n forall a. Ord a => a -> a -> Bool
< Int
1 Bool -> Bool -> Bool
|| Int
n forall a. Ord a => a -> a -> Bool
> Int
1000 =
  forall a. HasCallStack => [Char] -> a
error forall a b. (a -> b) -> a -> b
$ [Char]
"newParallelJobControl: not a sensible number of jobs: " forall a. [a] -> [a] -> [a]
++ forall a. Show a => a -> [Char]
show Int
n
newParallelJobControl Int
maxJobLimit = do
    TChan (IO a)
inqVar   <- forall a. IO (TChan a)
newTChanIO
    TChan (Either SomeException a)
outqVar  <- forall a. IO (TChan a)
newTChanIO
    TVar Int
countVar <- forall a. a -> IO (TVar a)
newTVarIO Int
0
    forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ Int
maxJobLimit forall a b. (a -> b) -> a -> b
$
      IO () -> IO ThreadId
forkIO forall a b. (a -> b) -> a -> b
$
        forall a. TChan (IO a) -> TChan (Either SomeException a) -> IO ()
worker TChan (IO a)
inqVar TChan (Either SomeException a)
outqVar
    forall (m :: * -> *) a. Monad m => a -> m a
return JobControl {
      spawnJob :: IO a -> IO ()
spawnJob      = forall a. TChan (IO a) -> TVar Int -> IO a -> IO ()
spawn   TChan (IO a)
inqVar  TVar Int
countVar,
      collectJob :: IO a
collectJob    = forall a. TChan (Either SomeException a) -> TVar Int -> IO a
collect TChan (Either SomeException a)
outqVar TVar Int
countVar,
      remainingJobs :: IO Bool
remainingJobs = TVar Int -> IO Bool
remaining       TVar Int
countVar,
      cancelJobs :: IO ()
cancelJobs    = forall a. TChan (IO a) -> TVar Int -> IO ()
cancel  TChan (IO a)
inqVar  TVar Int
countVar
    }
  where
    worker ::  TChan (IO a) -> TChan (Either SomeException a) -> IO ()
    worker :: forall a. TChan (IO a) -> TChan (Either SomeException a) -> IO ()
worker TChan (IO a)
inqVar TChan (Either SomeException a)
outqVar =
      forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ do
        IO a
job <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> STM a
readTChan TChan (IO a)
inqVar
        Either SomeException a
res <- forall e a. Exception e => IO a -> IO (Either e a)
try IO a
job
        forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TChan a -> a -> STM ()
writeTChan TChan (Either SomeException a)
outqVar Either SomeException a
res

    spawn :: TChan (IO a) -> TVar Int -> IO a -> IO ()
    spawn :: forall a. TChan (IO a) -> TVar Int -> IO a -> IO ()
spawn TChan (IO a)
inqVar TVar Int
countVar IO a
job =
      forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
        forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
countVar (forall a. Num a => a -> a -> a
+Int
1)
        forall a. TChan a -> a -> STM ()
writeTChan TChan (IO a)
inqVar IO a
job

    collect :: TChan (Either SomeException a) -> TVar Int -> IO a
    collect :: forall a. TChan (Either SomeException a) -> TVar Int -> IO a
collect TChan (Either SomeException a)
outqVar TVar Int
countVar = do
      Either SomeException a
res <- forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
        forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
countVar (forall a. Num a => a -> a -> a
subtract Int
1)
        forall a. TChan a -> STM a
readTChan TChan (Either SomeException a)
outqVar
      forall a c b. (a -> c) -> (b -> c) -> Either a b -> c
either forall e a. Exception e => e -> IO a
throwIO forall (m :: * -> *) a. Monad m => a -> m a
return Either SomeException a
res

    remaining :: TVar Int -> IO Bool
    remaining :: TVar Int -> IO Bool
remaining TVar Int
countVar = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (forall a. Eq a => a -> a -> Bool
/=Int
0) forall a b. (a -> b) -> a -> b
$ forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ forall a. TVar a -> STM a
readTVar TVar Int
countVar

    cancel :: TChan (IO a) -> TVar Int -> IO ()
    cancel :: forall a. TChan (IO a) -> TVar Int -> IO ()
cancel TChan (IO a)
inqVar TVar Int
countVar =
      forall a. STM a -> IO a
atomically forall a b. (a -> b) -> a -> b
$ do
        [IO a]
xs <- forall a. TChan a -> STM [a]
readAllTChan TChan (IO a)
inqVar
        forall a. TVar a -> (a -> a) -> STM ()
modifyTVar' TVar Int
countVar (forall a. Num a => a -> a -> a
subtract (forall (t :: * -> *) a. Foldable t => t a -> Int
length [IO a]
xs))

readAllTChan :: TChan a -> STM [a]
readAllTChan :: forall a. TChan a -> STM [a]
readAllTChan TChan a
qvar = [a] -> STM [a]
go []
  where
    go :: [a] -> STM [a]
go [a]
xs = do
      Maybe a
mx <- forall a. TChan a -> STM (Maybe a)
tryReadTChan TChan a
qvar
      case Maybe a
mx of
        Maybe a
Nothing -> forall (m :: * -> *) a. Monad m => a -> m a
return (forall a. [a] -> [a]
reverse [a]
xs)
        Just a
x  -> [a] -> STM [a]
go (a
xforall a. a -> [a] -> [a]
:[a]
xs)

-------------------------
-- Job limits and locks
--

data JobLimit = JobLimit QSem

newJobLimit :: Int -> IO JobLimit
newJobLimit :: Int -> IO JobLimit
newJobLimit Int
n =
  forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap QSem -> JobLimit
JobLimit (Int -> IO QSem
newQSem Int
n)

withJobLimit :: JobLimit -> IO a -> IO a
withJobLimit :: forall a. JobLimit -> IO a -> IO a
withJobLimit (JobLimit QSem
sem) =
  forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (QSem -> IO ()
waitQSem QSem
sem) (QSem -> IO ()
signalQSem QSem
sem)

newtype Lock = Lock (MVar ())

newLock :: IO Lock
newLock :: IO Lock
newLock = forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap MVar () -> Lock
Lock forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (MVar a)
newMVar ()

criticalSection :: Lock -> IO a -> IO a
criticalSection :: forall a. Lock -> IO a -> IO a
criticalSection (Lock MVar ()
lck) IO a
act = forall a b c. IO a -> IO b -> IO c -> IO c
bracket_ (forall a. MVar a -> IO a
takeMVar MVar ()
lck) (forall a. MVar a -> a -> IO ()
putMVar MVar ()
lck ()) IO a
act