module LogicGrowsOnTrees.Parallel.Common.RequestQueue
(
RequestQueueMonad(..)
, Request
, RequestQueue(..)
, RequestQueueReader
, addWorkerCountListener
, getCurrentProgress
, getCurrentStatistics
, getNumberOfWorkers
, requestProgressUpdate
, syncAsync
, addProgressReceiver
, enqueueRequest
, enqueueRequestAndWait
, newRequestQueue
, tryDequeueRequest
, processAllRequests
, receiveProgress
, requestQueueProgram
, forkControllerThread
, killControllerThreads
, CPUTimeTracker
, newCPUTimeTracker
, startCPUTimeTracker
, getCurrentCPUTime
, getQuantityAsync
) where
import Prelude hiding (catch)
import Control.Applicative ((<$>),(<*>),liftA2)
import Control.Arrow ((&&&))
import Control.Concurrent (ThreadId,forkIO,killThread)
import Control.Concurrent.MVar (newEmptyMVar,putMVar,takeMVar)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan (TChan,newTChanIO,readTChan,tryReadTChan,writeTChan)
import Control.Concurrent.STM.TVar (TVar,modifyTVar',newTVarIO,readTVar,writeTVar)
import Control.Exception (BlockedIndefinitelyOnMVar(..),catch,finally,mask)
import Control.Monad.CatchIO (MonadCatchIO)
import Control.Monad ((>=>),join,liftM,liftM3,unless)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Reader (ReaderT(..),ask)
import Data.Composition ((.*))
import Data.IORef (IORef,atomicModifyIORef,readIORef,newIORef)
import Data.List (delete)
import Data.Time.Clock (NominalDiffTime,UTCTime,diffUTCTime,getCurrentTime)
import qualified LogicGrowsOnTrees.Parallel.Common.Supervisor as Supervisor
import LogicGrowsOnTrees.Parallel.Common.Supervisor
(RunStatistics
,SupervisorFullConstraint
,SupervisorMonad
,SupervisorProgram(..)
)
import LogicGrowsOnTrees.Parallel.ExplorationMode
class (HasExplorationMode m, Functor m, MonadCatchIO m) ⇒ RequestQueueMonad m where
abort :: m ()
addWorkerCountListenerAsync :: (Int → IO ()) → IO () → m ()
fork :: m () → m ThreadId
getCurrentProgressAsync :: (ProgressFor (ExplorationModeFor m) → IO ()) → m ()
getCurrentStatisticsAsync :: (RunStatistics → IO ()) → m ()
getNumberOfWorkersAsync :: (Int → IO ()) → m ()
requestProgressUpdateAsync :: (ProgressFor (ExplorationModeFor m) → IO ()) → m ()
setWorkloadBufferSize :: Int → m ()
type Request exploration_mode worker_id m = SupervisorMonad exploration_mode worker_id m ()
data RequestQueue exploration_mode worker_id m = RequestQueue
{
requests :: !(TChan (Request exploration_mode worker_id m))
, receivers :: !(IORef [ProgressFor exploration_mode → IO ()])
, controllerThreads :: !(IORef [ThreadId])
}
type RequestQueueReader exploration_mode worker_id m = ReaderT (RequestQueue exploration_mode worker_id m) IO
instance HasExplorationMode (RequestQueueReader exploration_mode worker_id m) where
type ExplorationModeFor (RequestQueueReader exploration_mode worker_id m) = exploration_mode
instance (SupervisorFullConstraint worker_id m, MonadCatchIO m) ⇒ RequestQueueMonad (RequestQueueReader exploration_mode worker_id m) where
abort = ask >>= enqueueRequest Supervisor.abortSupervisor
addWorkerCountListenerAsync listener callback = ask >>= enqueueRequest (Supervisor.addWorkerCountListener listener >> liftIO callback)
fork m = ask >>= flip forkControllerThread' m
getCurrentProgressAsync = (ask >>=) . getQuantityAsync Supervisor.getCurrentProgress
getCurrentStatisticsAsync = (ask >>=) . getQuantityAsync Supervisor.getCurrentStatistics
getNumberOfWorkersAsync = (ask >>=) . getQuantityAsync Supervisor.getNumberOfWorkers
requestProgressUpdateAsync receiveUpdatedProgress =
ask
>>=
liftIO
.
liftA2 (>>)
(addProgressReceiver receiveUpdatedProgress)
(enqueueRequest Supervisor.performGlobalProgressUpdate)
setWorkloadBufferSize size = ask >>= enqueueRequest (Supervisor.setWorkloadBufferSize size)
addWorkerCountListener :: RequestQueueMonad m ⇒ (Int → IO ()) → m ()
addWorkerCountListener listener = syncAsync (\callback → addWorkerCountListenerAsync listener (callback ()))
getCurrentProgress :: RequestQueueMonad m ⇒ m (ProgressFor (ExplorationModeFor m))
getCurrentProgress = syncAsync getCurrentProgressAsync
getCurrentStatistics :: RequestQueueMonad m ⇒ m RunStatistics
getCurrentStatistics = syncAsync getCurrentStatisticsAsync
getNumberOfWorkers :: RequestQueueMonad m ⇒ m Int
getNumberOfWorkers = syncAsync getNumberOfWorkersAsync
requestProgressUpdate :: RequestQueueMonad m ⇒ m (ProgressFor (ExplorationModeFor m))
requestProgressUpdate = syncAsync requestProgressUpdateAsync
syncAsync :: MonadIO m ⇒ ((α → IO ()) → m ()) → m α
syncAsync runCommandAsync = do
result_mvar ← liftIO newEmptyMVar
runCommandAsync (putMVar result_mvar)
liftIO $
takeMVar result_mvar
`catch`
(\BlockedIndefinitelyOnMVar → error $ "blocked forever while waiting for controller to respond to request")
addProgressReceiver ::
MonadIO m' ⇒
(ProgressFor exploration_mode → IO ()) →
RequestQueue exploration_mode worker_id m →
m' ()
addProgressReceiver receiver =
liftIO
.
flip atomicModifyIORef ((receiver:) &&& const ())
.
receivers
enqueueRequest ::
MonadIO m' ⇒
Request exploration_mode worker_id m →
RequestQueue exploration_mode worker_id m →
m' ()
enqueueRequest = flip $
(liftIO . atomically)
.*
(writeTChan . requests)
enqueueRequestAndWait ::
(MonadIO m, MonadIO m') ⇒
Request exploration_mode worker_id m →
RequestQueue exploration_mode worker_id m →
m' ()
enqueueRequestAndWait request request_queue = do
signal ← liftIO newEmptyMVar
enqueueRequest (request >> liftIO (putMVar signal ())) request_queue
liftIO $ takeMVar signal
newRequestQueue ::
MonadIO m' ⇒
m' (RequestQueue exploration_mode worker_id m)
newRequestQueue = liftIO $ liftM3 RequestQueue newTChanIO (newIORef []) (newIORef [])
tryDequeueRequest ::
MonadIO m' ⇒
RequestQueue exploration_mode worker_id m →
m' (Maybe (Request exploration_mode worker_id m))
tryDequeueRequest =
liftIO
.
atomically
.
tryReadTChan
.
requests
processAllRequests ::
MonadIO m ⇒
RequestQueue exploration_mode worker_id m →
SupervisorMonad exploration_mode worker_id m ()
processAllRequests (RequestQueue requests _ _) = go
where
go =
(liftIO . atomically . tryReadTChan) requests
>>=
maybe (return ()) (>> go)
receiveProgress ::
MonadIO m' ⇒
RequestQueue exploration_mode worker_id m →
ProgressFor exploration_mode →
m' ()
receiveProgress queue progress =
liftIO
.
join
.
liftM (sequence_ . map ($ progress))
.
flip atomicModifyIORef (const [] &&& id)
.
receivers
$
queue
requestQueueProgram ::
MonadIO m ⇒
SupervisorMonad exploration_mode worker_id m () →
RequestQueue exploration_mode worker_id m →
SupervisorProgram exploration_mode worker_id m
requestQueueProgram initialize =
flip (BlockingProgram initialize) id
.
liftIO
.
atomically
.
readTChan
.
requests
forkControllerThread ::
MonadIO m' ⇒
RequestQueue exploration_mode worker_id m →
RequestQueueReader exploration_mode worker_id m () →
m' ()
forkControllerThread = liftM (const ()) .* forkControllerThread'
forkControllerThread' ::
MonadIO m' ⇒
RequestQueue exploration_mode worker_id m →
RequestQueueReader exploration_mode worker_id m () →
m' ThreadId
forkControllerThread' request_queue controller = liftIO $ do
start_signal ← newEmptyMVar
rec thread_id ← forkIO . mask $ \restore →
(restore $ takeMVar start_signal >> runReaderT controller request_queue)
`finally`
(atomicModifyIORef (controllerThreads request_queue) (delete thread_id &&& const ()))
atomicModifyIORef (controllerThreads request_queue) ((thread_id:) &&& const ())
putMVar start_signal ()
return thread_id
killControllerThreads ::
MonadIO m' ⇒
RequestQueue exploration_mode worker_id m →
m' ()
killControllerThreads = liftIO . readIORef . controllerThreads >=> liftIO . mapM_ killThread
data CPUTimeTracker = CPUTimeTracker
{ trackerStarted :: IORef Bool
, trackerLastTime :: TVar (Maybe (UTCTime,Int))
, trackerTotalTime :: TVar NominalDiffTime
}
newCPUTimeTracker :: NominalDiffTime → IO CPUTimeTracker
newCPUTimeTracker initial_cpu_time =
CPUTimeTracker
<$> newIORef False
<*> newTVarIO Nothing
<*> newTVarIO initial_cpu_time
computeAdditionalTime current_time (last_time,last_number_of_workers) =
(current_time `diffUTCTime` last_time) * fromIntegral last_number_of_workers
startCPUTimeTracker :: RequestQueueMonad m ⇒ CPUTimeTracker → m ()
startCPUTimeTracker CPUTimeTracker{..} = do
already_started ← liftIO $ atomicModifyIORef trackerStarted (const True &&& id)
unless already_started . addWorkerCountListener $ \current_number_of_workers → do
current_time ← getCurrentTime
atomically $ do
maybe_last ← readTVar trackerLastTime
flip (maybe (return ())) maybe_last $ \last →
modifyTVar' trackerTotalTime (+ computeAdditionalTime current_time last)
writeTVar trackerLastTime $ Just (current_time,current_number_of_workers)
getCurrentCPUTime :: CPUTimeTracker → IO NominalDiffTime
getCurrentCPUTime CPUTimeTracker{..} = do
current_time ← getCurrentTime
atomically $ do
old_total_time ← readTVar trackerTotalTime
maybe_last ← readTVar trackerLastTime
return $ old_total_time + maybe 0 (computeAdditionalTime current_time) maybe_last
getQuantityAsync ::
( MonadIO m'
, SupervisorFullConstraint worker_id m
) ⇒
SupervisorMonad exploration_mode worker_id m α →
(α → IO ()) →
RequestQueue exploration_mode worker_id m →
m' ()
getQuantityAsync getQuantity receiveQuantity =
enqueueRequest $ getQuantity >>= liftIO . receiveQuantity