module LogicGrowsOnTrees.Parallel.Common.Workgroup
(
WorkgroupRequestQueueMonad(..)
, InnerMonad
, MessageForSupervisorReceivers(..)
, WorkerId
, WorkgroupCallbacks(..)
, WorkgroupControllerMonad(..)
, changeNumberOfWorkers
, setNumberOfWorkersAsync
, setNumberOfWorkers
, runWorkgroup
) where
import Control.Applicative (Applicative,(<$>))
import Control.Lens (makeLenses)
import Control.Lens.Getter (use)
import Control.Lens.Lens ((<<%=))
import Control.Lens.Setter ((.=),(%=))
import Control.Monad (forM_,replicateM_,void)
import Control.Monad.CatchIO (MonadCatchIO)
import Control.Monad.IO.Class (MonadIO,liftIO)
import Control.Monad.Reader.Class (asks)
import Control.Monad.State.Class (MonadState)
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.Reader (ReaderT,ask,runReaderT)
import Control.Monad.Trans.State.Strict (StateT,evalStateT)
import Data.Composition ((.*))
import Data.IntSet (IntSet)
import qualified Data.IntSet as IntSet
import Data.Monoid (Monoid(mempty))
import Data.PSQueue (Binding((:->)),PSQ)
import qualified Data.PSQueue as PSQ
import Data.Word (Word,Word64)
import qualified System.Log.Logger as Logger
import System.Log.Logger (Priority(INFO))
import System.Log.Logger.TH
import Text.Printf (printf)
import LogicGrowsOnTrees.Parallel.Common.Message
import LogicGrowsOnTrees.Parallel.Common.RequestQueue
import LogicGrowsOnTrees.Parallel.Common.Supervisor
import LogicGrowsOnTrees.Parallel.Main (RunOutcomeFor,extractRunOutcomeFromSupervisorOutcome)
import LogicGrowsOnTrees.Parallel.ExplorationMode
import LogicGrowsOnTrees.Workload
deriveLoggers "Logger" [INFO]
class RequestQueueMonad m ⇒ WorkgroupRequestQueueMonad m where
changeNumberOfWorkersAsync :: (Word → Word) → (Word → IO ()) → m ()
type WorkerId = Int
type RemovalPriority = Word64
type InnerMonad inner_state = StateT inner_state IO
data WorkgroupCallbacks inner_state = WorkgroupCallbacks
{
createWorker :: WorkerId → InnerMonad inner_state ()
, destroyWorker :: WorkerId → Bool → InnerMonad inner_state ()
, killAllWorkers :: [WorkerId] → InnerMonad inner_state ()
, sendProgressUpdateRequestTo :: WorkerId → InnerMonad inner_state ()
, sendWorkloadStealRequestTo :: WorkerId → InnerMonad inner_state ()
, sendWorkloadTo :: WorkerId → Workload → InnerMonad inner_state ()
}
data WorkgroupState = WorkgroupState
{ _pending_quit :: !IntSet
, _next_worker_id :: !WorkerId
, _next_priority :: !RemovalPriority
, _removal_queue :: !(PSQ WorkerId RemovalPriority)
}
$( makeLenses ''WorkgroupState )
type WorkgroupStateMonad inner_state = StateT WorkgroupState (ReaderT (WorkgroupCallbacks inner_state) (InnerMonad inner_state))
type WorkgroupMonad inner_state exploration_mode = SupervisorMonad exploration_mode WorkerId (WorkgroupStateMonad inner_state)
newtype WorkgroupControllerMonad inner_state exploration_mode α = C { unwrapC :: RequestQueueReader exploration_mode WorkerId (WorkgroupStateMonad inner_state) α} deriving (Applicative,Functor,Monad,MonadCatchIO,MonadIO,RequestQueueMonad)
instance HasExplorationMode (WorkgroupControllerMonad inner_state exploration_mode) where
type ExplorationModeFor (WorkgroupControllerMonad inner_state exploration_mode) = exploration_mode
instance WorkgroupRequestQueueMonad (WorkgroupControllerMonad inner_state exploration_mode) where
changeNumberOfWorkersAsync computeNewNumberOfWorkers receiveNewNumberOfWorkers = C $ ask >>= (enqueueRequest $ do
old_number_of_workers ← numberOfWorkers
let new_number_of_workers = computeNewNumberOfWorkers old_number_of_workers
case new_number_of_workers `compare` old_number_of_workers of
GT → replicateM_ (fromIntegral $ new_number_of_workers old_number_of_workers) hireAWorker
LT → replicateM_ (fromIntegral $ old_number_of_workers new_number_of_workers) fireAWorker
EQ → return ()
infoM $ printf "Number of workers has been changed from %i to %i" old_number_of_workers new_number_of_workers
liftIO . receiveNewNumberOfWorkers $ new_number_of_workers
)
changeNumberOfWorkers :: WorkgroupRequestQueueMonad m ⇒ (Word → Word) → m Word
changeNumberOfWorkers = syncAsync . changeNumberOfWorkersAsync
setNumberOfWorkersAsync :: WorkgroupRequestQueueMonad m ⇒ Word → IO () → m ()
setNumberOfWorkersAsync n callback = changeNumberOfWorkersAsync (const n) (const callback)
setNumberOfWorkers :: WorkgroupRequestQueueMonad m ⇒ Word → m ()
setNumberOfWorkers = void . syncAsync . changeNumberOfWorkersAsync . const
runWorkgroup ::
ExplorationMode exploration_mode →
inner_state →
(MessageForSupervisorReceivers exploration_mode WorkerId → WorkgroupCallbacks inner_state)
→
ProgressFor exploration_mode →
WorkgroupControllerMonad inner_state exploration_mode () →
IO (RunOutcomeFor exploration_mode)
runWorkgroup exploration_mode initial_inner_state constructCallbacks starting_progress (C controller) = do
request_queue ← newRequestQueue
let receiveStolenWorkloadFromWorker = flip enqueueRequest request_queue .* receiveStolenWorkload
receiveProgressUpdateFromWorker = flip enqueueRequest request_queue .* receiveProgressUpdate
receiveFailureFromWorker = flip enqueueRequest request_queue .* receiveWorkerFailure
receiveFinishedFromWorker worker_id final_progress = flip enqueueRequest request_queue $ do
removal_flag ← IntSet.member worker_id <$> use pending_quit
infoM $ if removal_flag
then "Worker " ++ show worker_id ++ " has finished, and will be removed."
else "Worker " ++ show worker_id ++ " has finished, and will look for another workload."
receiveWorkerFinishedWithRemovalFlag removal_flag worker_id final_progress
receiveQuitFromWorker worker_id = flip enqueueRequest request_queue $ do
infoM $ "Worker " ++ show worker_id ++ " has quit."
quitting ← IntSet.member worker_id <$> (pending_quit <<%= IntSet.delete worker_id)
if quitting
then removeWorkerIfPresent worker_id
else receiveWorkerFailure worker_id $ "Worker " ++ show worker_id ++ " quit prematurely."
broadcastProgressUpdateToWorkers = \worker_ids →
asks sendProgressUpdateRequestTo >>= liftInner . forM_ worker_ids
broadcastWorkloadStealToWorkers = \worker_ids →
asks sendWorkloadStealRequestTo >>= liftInner . forM_ worker_ids
receiveCurrentProgress = receiveProgress request_queue
sendWorkloadToWorker = \workload worker_id → do
infoM $ "Activating worker " ++ show worker_id
asks sendWorkloadTo >>= liftInner . ($ workload) . ($ worker_id)
bumpWorkerRemovalPriority worker_id
forkControllerThread request_queue controller
run_outcome ←
flip evalStateT initial_inner_state
.
flip runReaderT (constructCallbacks MessageForSupervisorReceivers{..})
.
flip evalStateT initial_state
$
do supervisor_outcome@SupervisorOutcome{supervisorRemainingWorkers} ←
runSupervisorStartingFrom
exploration_mode
starting_progress
SupervisorCallbacks{..}
(requestQueueProgram (return ()) request_queue)
asks killAllWorkers >>= liftInner . ($ supervisorRemainingWorkers)
return $ extractRunOutcomeFromSupervisorOutcome supervisor_outcome
killControllerThreads request_queue
return run_outcome
where
initial_state =
WorkgroupState
{ _pending_quit = mempty
, _next_worker_id = 0
, _next_priority = maxBound
, _removal_queue = PSQ.empty
}
bumpWorkerRemovalPriority ::
(MonadState WorkgroupState m) ⇒
WorkerId →
m ()
bumpWorkerRemovalPriority worker_id =
(next_priority <<%= pred) >>= (removal_queue %=) . PSQ.insert worker_id
fireAWorker :: WorkgroupMonad inner_state exploration_mode ()
fireAWorker =
tryGetWaitingWorker
>>= \x → case x of
Just worker_id → do
infoM $ "Removing waiting worker " ++ show worker_id ++ "."
removeWorker worker_id
removeWorkerFromRemovalQueue worker_id
pending_quit %= IntSet.insert worker_id
asks destroyWorker >>= liftInnerToSupervisor . ($ False) . ($ worker_id)
Nothing → do
(worker_id,new_removal_queue) ← do
(PSQ.minView <$> use removal_queue) >>=
\x → case x of
Nothing → error "No workers found to be removed!"
Just (worker_id :-> _,rest_queue) → return (worker_id,rest_queue)
infoM $ "Removing active worker " ++ show worker_id ++ "."
removal_queue .= new_removal_queue
pending_quit %= IntSet.insert worker_id
asks destroyWorker >>= liftInnerToSupervisor . ($ True) . ($ worker_id)
hireAWorker :: WorkgroupMonad inner_state exploration_mode ()
hireAWorker = do
worker_id ← next_worker_id <<%= succ
bumpWorkerRemovalPriority worker_id
asks createWorker >>= liftInnerToSupervisor . ($ worker_id)
addWorker worker_id
liftInner :: InnerMonad inner_state α → WorkgroupStateMonad inner_state α
liftInner = lift . lift
liftInnerToSupervisor :: InnerMonad inner_state α → WorkgroupMonad inner_state exploration_mode α
liftInnerToSupervisor = lift . liftInner
numberOfWorkers :: WorkgroupMonad inner_state exploration_mode Word
numberOfWorkers = fromIntegral . PSQ.size <$> use removal_queue
removeWorkerFromRemovalQueue :: WorkerId → WorkgroupMonad inner_state exploration_mode ()
removeWorkerFromRemovalQueue = (removal_queue %=) . PSQ.delete