module LogicGrowsOnTrees.Parallel.Common.RequestQueue
(
RequestQueueMonad(..)
, Request
, RequestQueue(..)
, RequestQueueReader
, getCurrentProgress
, getNumberOfWorkers
, requestProgressUpdate
, syncAsync
, addProgressReceiver
, enqueueRequest
, enqueueRequestAndWait
, newRequestQueue
, tryDequeueRequest
, processAllRequests
, receiveProgress
, requestQueueProgram
, forkControllerThread
, killControllerThreads
, getQuantityAsync
) where
import Prelude hiding (catch)
import Control.Applicative (liftA2)
import Control.Arrow ((&&&))
import Control.Concurrent (ThreadId,forkIO,killThread)
import Control.Concurrent.MVar (newEmptyMVar,putMVar,takeMVar)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan (TChan,newTChanIO,readTChan,tryReadTChan,writeTChan)
import Control.Exception (BlockedIndefinitelyOnMVar(..),catch,finally)
import Control.Monad.CatchIO (MonadCatchIO)
import Control.Monad ((>=>),join,liftM,liftM3)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Reader (ReaderT(..),ask)
import Data.Composition ((.*))
import Data.IORef (IORef,atomicModifyIORef,readIORef,newIORef)
import Data.List (delete)
import qualified LogicGrowsOnTrees.Parallel.Common.Supervisor as Supervisor
import LogicGrowsOnTrees.Parallel.Common.Supervisor (SupervisorFullConstraint,SupervisorMonad,SupervisorProgram(..))
import LogicGrowsOnTrees.Parallel.ExplorationMode
class (HasExplorationMode m, Functor m, MonadCatchIO m) ⇒ RequestQueueMonad m where
abort :: m ()
fork :: m () → m ThreadId
getCurrentProgressAsync :: (ProgressFor (ExplorationModeFor m) → IO ()) → m ()
getNumberOfWorkersAsync :: (Int → IO ()) → m ()
requestProgressUpdateAsync :: (ProgressFor (ExplorationModeFor m) → IO ()) → m ()
setWorkloadBufferSize :: Int → m ()
type Request exploration_mode worker_id m = SupervisorMonad exploration_mode worker_id m ()
data RequestQueue exploration_mode worker_id m = RequestQueue
{
requests :: !(TChan (Request exploration_mode worker_id m))
, receivers :: !(IORef [ProgressFor exploration_mode → IO ()])
, controllerThreads :: !(IORef [ThreadId])
}
type RequestQueueReader exploration_mode worker_id m = ReaderT (RequestQueue exploration_mode worker_id m) IO
instance HasExplorationMode (RequestQueueReader exploration_mode worker_id m) where
type ExplorationModeFor (RequestQueueReader exploration_mode worker_id m) = exploration_mode
instance (SupervisorFullConstraint worker_id m, MonadCatchIO m) ⇒ RequestQueueMonad (RequestQueueReader exploration_mode worker_id m) where
abort = ask >>= enqueueRequest Supervisor.abortSupervisor
fork m = ask >>= flip forkControllerThread' m
getCurrentProgressAsync = (ask >>=) . getQuantityAsync Supervisor.getCurrentProgress
getNumberOfWorkersAsync = (ask >>=) . getQuantityAsync Supervisor.getNumberOfWorkers
requestProgressUpdateAsync receiveUpdatedProgress =
ask
>>=
liftIO
.
liftA2 (>>)
(addProgressReceiver receiveUpdatedProgress)
(enqueueRequest Supervisor.performGlobalProgressUpdate)
setWorkloadBufferSize size = ask >>= enqueueRequest (Supervisor.setWorkloadBufferSize size)
getCurrentProgress :: RequestQueueMonad m ⇒ m (ProgressFor (ExplorationModeFor m))
getCurrentProgress = syncAsync getCurrentProgressAsync
getNumberOfWorkers :: RequestQueueMonad m ⇒ m Int
getNumberOfWorkers = syncAsync getNumberOfWorkersAsync
requestProgressUpdate :: RequestQueueMonad m ⇒ m (ProgressFor (ExplorationModeFor m))
requestProgressUpdate = syncAsync requestProgressUpdateAsync
syncAsync :: MonadIO m ⇒ ((α → IO ()) → m ()) → m α
syncAsync runCommandAsync = do
result_mvar ← liftIO newEmptyMVar
runCommandAsync (putMVar result_mvar)
liftIO $
takeMVar result_mvar
`catch`
(\BlockedIndefinitelyOnMVar → error $ "blocked forever while waiting for controller to respond to request")
addProgressReceiver ::
MonadIO m' ⇒
(ProgressFor exploration_mode → IO ()) →
RequestQueue exploration_mode worker_id m →
m' ()
addProgressReceiver receiver =
liftIO
.
flip atomicModifyIORef ((receiver:) &&& const ())
.
receivers
enqueueRequest ::
MonadIO m' ⇒
Request exploration_mode worker_id m →
RequestQueue exploration_mode worker_id m →
m' ()
enqueueRequest = flip $
(liftIO . atomically)
.*
(writeTChan . requests)
enqueueRequestAndWait ::
(MonadIO m, MonadIO m') ⇒
Request exploration_mode worker_id m →
RequestQueue exploration_mode worker_id m →
m' ()
enqueueRequestAndWait request request_queue = do
signal ← liftIO newEmptyMVar
enqueueRequest (request >> liftIO (putMVar signal ())) request_queue
liftIO $ takeMVar signal
newRequestQueue ::
MonadIO m' ⇒
m' (RequestQueue exploration_mode worker_id m)
newRequestQueue = liftIO $ liftM3 RequestQueue newTChanIO (newIORef []) (newIORef [])
tryDequeueRequest ::
MonadIO m' ⇒
RequestQueue exploration_mode worker_id m →
m' (Maybe (Request exploration_mode worker_id m))
tryDequeueRequest =
liftIO
.
atomically
.
tryReadTChan
.
requests
processAllRequests ::
MonadIO m ⇒
RequestQueue exploration_mode worker_id m →
SupervisorMonad exploration_mode worker_id m ()
processAllRequests (RequestQueue requests _ _) = go
where
go =
(liftIO . atomically . tryReadTChan) requests
>>=
maybe (return ()) (>> go)
receiveProgress ::
MonadIO m' ⇒
RequestQueue exploration_mode worker_id m →
ProgressFor exploration_mode →
m' ()
receiveProgress queue progress =
liftIO
.
join
.
liftM (sequence_ . map ($ progress))
.
flip atomicModifyIORef (const [] &&& id)
.
receivers
$
queue
requestQueueProgram ::
MonadIO m ⇒
SupervisorMonad exploration_mode worker_id m () →
RequestQueue exploration_mode worker_id m →
SupervisorProgram exploration_mode worker_id m
requestQueueProgram initialize =
flip (BlockingProgram initialize) id
.
liftIO
.
atomically
.
readTChan
.
requests
forkControllerThread ::
MonadIO m' ⇒
RequestQueue exploration_mode worker_id m →
RequestQueueReader exploration_mode worker_id m () →
m' ()
forkControllerThread = liftM (const ()) .* forkControllerThread'
forkControllerThread' ::
MonadIO m' ⇒
RequestQueue exploration_mode worker_id m →
RequestQueueReader exploration_mode worker_id m () →
m' ThreadId
forkControllerThread' request_queue controller = liftIO $ do
start_signal ← newEmptyMVar
rec thread_id ←
forkIO
$
(takeMVar start_signal >> runReaderT controller request_queue)
`finally`
(atomicModifyIORef (controllerThreads request_queue) (delete thread_id &&& const ()))
atomicModifyIORef (controllerThreads request_queue) ((thread_id:) &&& const ())
putMVar start_signal ()
return thread_id
killControllerThreads ::
MonadIO m' ⇒
RequestQueue exploration_mode worker_id m →
m' ()
killControllerThreads = liftIO . readIORef . controllerThreads >=> liftIO . mapM_ killThread
getQuantityAsync ::
( MonadIO m'
, SupervisorFullConstraint worker_id m
) ⇒
SupervisorMonad exploration_mode worker_id m α →
(α → IO ()) →
RequestQueue exploration_mode worker_id m →
m' ()
getQuantityAsync getQuantity receiveQuantity =
enqueueRequest $ getQuantity >>= liftIO . receiveQuantity