module LogicGrowsOnTrees.Parallel.Common.Supervisor
(
FunctionOfTimeStatistics(..)
, IndependentMeasurementsStatistics(..)
, RunStatistics(..)
, SupervisorFullConstraint
, SupervisorMonadConstraint
, SupervisorWorkerIdConstraint
, SupervisorCallbacks(..)
, SupervisorMonad
, SupervisorOutcome(..)
, SupervisorOutcomeFor
, SupervisorProgram(..)
, SupervisorTerminationReason(..)
, SupervisorTerminationReasonFor
, addWorker
, performGlobalProgressUpdate
, receiveProgressUpdate
, receiveStolenWorkload
, receiveWorkerFailure
, receiveWorkerFinished
, receiveWorkerFinishedAndRemoved
, receiveWorkerFinishedWithRemovalFlag
, removeWorker
, removeWorkerIfPresent
, abortSupervisor
, beginSupervisorOccupied
, disableSupervisorDebugMode
, enableSupervisorDebugMode
, endSupervisorOccupied
, setSupervisorDebugMode
, setWorkloadBufferSize
, getCurrentProgress
, getCurrentStatistics
, getNumberOfWorkers
, tryGetWaitingWorker
, runSupervisor
, runSupervisorStartingFrom
, runUnrestrictedSupervisor
, runUnrestrictedSupervisorStartingFrom
) where
import Control.Applicative (Applicative)
import Control.Lens.Setter ((.~),(+=))
import Control.Monad (forever)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Reader.Class (MonadReader(..))
import Control.Monad.State.Class (MonadState(..))
import Control.Monad.Trans.Class (MonadTrans(..))
import Data.Time.Clock (diffUTCTime,getCurrentTime)
import Data.Composition ((.*),(.**))
import qualified System.Log.Logger as Logger
import System.Log.Logger (Priority(DEBUG))
import System.Log.Logger.TH
import LogicGrowsOnTrees.Parallel.Common.Worker (ProgressUpdateFor,StolenWorkloadFor)
import LogicGrowsOnTrees.Parallel.ExplorationMode
import LogicGrowsOnTrees.Path (WalkError(..))
import qualified LogicGrowsOnTrees.Parallel.Common.Supervisor.Implementation as Implementation
import LogicGrowsOnTrees.Parallel.Common.Supervisor.Implementation
( AbortMonad()
, ContextMonad()
, FunctionOfTimeStatistics(..)
, IndependentMeasurementsStatistics(..)
, RunStatistics(..)
, SupervisorCallbacks(..)
, SupervisorFullConstraint
, SupervisorMonadConstraint
, SupervisorOutcome(..)
, SupervisorOutcomeFor
, SupervisorTerminationReason(..)
, SupervisorTerminationReasonFor
, SupervisorWorkerIdConstraint
, current_time
, liftContextToAbort
, liftUserToAbort
, number_of_calls
, time_spent_in_supervisor_monad
)
deriveLoggers "Logger" [DEBUG]
newtype SupervisorMonad exploration_mode worker_id m α =
SupervisorMonad {
unwrapSupervisorMonad :: AbortMonad exploration_mode worker_id m α
} deriving (Applicative,Functor,Monad,MonadIO)
instance MonadTrans (SupervisorMonad exploration_mode worker_id) where
lift = SupervisorMonad . liftUserToAbort
instance MonadReader e m ⇒ MonadReader e (SupervisorMonad exploration_mode worker_id m) where
ask = lift ask
local f = SupervisorMonad . Implementation.localWithinAbort f . unwrapSupervisorMonad
instance MonadState s m ⇒ MonadState s (SupervisorMonad exploration_mode worker_id m) where
get = lift get
put = lift . put
data SupervisorProgram exploration_mode worker_id m =
∀ α. BlockingProgram (SupervisorMonad exploration_mode worker_id m ()) (m α) (α → SupervisorMonad exploration_mode worker_id m ())
| ∀ α. PollingProgram (SupervisorMonad exploration_mode worker_id m ()) (m (Maybe α)) (α → SupervisorMonad exploration_mode worker_id m ())
| UnrestrictedProgram (∀ α. SupervisorMonad exploration_mode worker_id m α)
class WrappableIntoSupervisorMonad w where
wrapIntoSupervisorMonad :: MonadIO m ⇒ w exploration_mode worker_id m α → SupervisorMonad exploration_mode worker_id m α
instance WrappableIntoSupervisorMonad AbortMonad where
wrapIntoSupervisorMonad action = do
time_at_entrance ← liftIO getCurrentTime
result ← SupervisorMonad . local (current_time .~ time_at_entrance) $ do
number_of_calls += 1
debugM "Entering SupervisorMonad"
result ← action
debugM "Exiting SupervisorMonad"
liftIO getCurrentTime >>= (time_spent_in_supervisor_monad +=) . (flip diffUTCTime time_at_entrance)
return result
return result
instance WrappableIntoSupervisorMonad ContextMonad where
wrapIntoSupervisorMonad = wrapIntoSupervisorMonad . liftContextToAbort
addWorker ::
( SupervisorMonadConstraint m
, SupervisorWorkerIdConstraint worker_id
) ⇒
worker_id →
SupervisorMonad exploration_mode worker_id m ()
addWorker = wrapIntoSupervisorMonad . Implementation.addWorker
performGlobalProgressUpdate ::
( SupervisorMonadConstraint m
, SupervisorWorkerIdConstraint worker_id
) ⇒
SupervisorMonad exploration_mode worker_id m ()
performGlobalProgressUpdate = wrapIntoSupervisorMonad Implementation.performGlobalProgressUpdate
receiveProgressUpdate ::
( SupervisorMonadConstraint m
, SupervisorWorkerIdConstraint worker_id
) ⇒
worker_id →
ProgressUpdateFor exploration_mode →
SupervisorMonad exploration_mode worker_id m ()
receiveProgressUpdate = wrapIntoSupervisorMonad .* Implementation.receiveProgressUpdate
receiveStolenWorkload ::
( SupervisorMonadConstraint m
, SupervisorWorkerIdConstraint worker_id
) ⇒
worker_id →
Maybe (StolenWorkloadFor exploration_mode) →
SupervisorMonad exploration_mode worker_id m ()
receiveStolenWorkload = wrapIntoSupervisorMonad .* Implementation.receiveStolenWorkload
receiveWorkerFailure :: SupervisorFullConstraint worker_id m ⇒ worker_id → String → SupervisorMonad exploration_mode worker_id m α
receiveWorkerFailure worker_id message = do
current_progress ← getCurrentProgress
wrapIntoSupervisorMonad
.
Implementation.abortSupervisorWithReason
.
SupervisorFailure current_progress worker_id
$
if message == show TreeEndedBeforeEndOfWalk ||
message == show PastTreeIsInconsistentWithPresentTree
then "The given checkpoint is not consistent with the given tree."
else message
receiveWorkerFinished ::
( SupervisorMonadConstraint m
, SupervisorWorkerIdConstraint worker_id
) ⇒
worker_id →
WorkerFinishedProgressFor exploration_mode →
SupervisorMonad exploration_mode worker_id m ()
receiveWorkerFinished = receiveWorkerFinishedWithRemovalFlag False
receiveWorkerFinishedAndRemoved ::
( SupervisorMonadConstraint m
, SupervisorWorkerIdConstraint worker_id
) ⇒
worker_id →
WorkerFinishedProgressFor exploration_mode →
SupervisorMonad exploration_mode worker_id m ()
receiveWorkerFinishedAndRemoved = receiveWorkerFinishedWithRemovalFlag True
receiveWorkerFinishedWithRemovalFlag ::
( SupervisorMonadConstraint m
, SupervisorWorkerIdConstraint worker_id
) ⇒
Bool →
worker_id →
WorkerFinishedProgressFor exploration_mode →
SupervisorMonad exploration_mode worker_id m ()
receiveWorkerFinishedWithRemovalFlag = wrapIntoSupervisorMonad .** Implementation.receiveWorkerFinishedWithRemovalFlag
removeWorker ::
( SupervisorMonadConstraint m
, SupervisorWorkerIdConstraint worker_id
) ⇒
worker_id →
SupervisorMonad exploration_mode worker_id m ()
removeWorker = wrapIntoSupervisorMonad . Implementation.removeWorker
removeWorkerIfPresent ::
( SupervisorMonadConstraint m
, SupervisorWorkerIdConstraint worker_id
) ⇒
worker_id →
SupervisorMonad exploration_mode worker_id m ()
removeWorkerIfPresent = wrapIntoSupervisorMonad . Implementation.removeWorkerIfPresent
abortSupervisor :: SupervisorFullConstraint worker_id m ⇒ SupervisorMonad exploration_mode worker_id m α
abortSupervisor = wrapIntoSupervisorMonad Implementation.abortSupervisor
beginSupervisorOccupied :: SupervisorMonadConstraint m ⇒ SupervisorMonad exploration_mode worker_id m ()
beginSupervisorOccupied = changeSupervisorOccupiedStatus True
changeSupervisorOccupiedStatus :: SupervisorMonadConstraint m ⇒ Bool → SupervisorMonad exploration_mode worker_id m ()
changeSupervisorOccupiedStatus = wrapIntoSupervisorMonad . Implementation.changeSupervisorOccupiedStatus
endSupervisorOccupied :: SupervisorMonadConstraint m ⇒ SupervisorMonad exploration_mode worker_id m ()
endSupervisorOccupied = changeSupervisorOccupiedStatus False
setWorkloadBufferSize :: SupervisorMonadConstraint m ⇒ Int → SupervisorMonad exploration_mode worker_id m ()
setWorkloadBufferSize = wrapIntoSupervisorMonad . Implementation.setWorkloadBufferSize
getCurrentProgress ::
( SupervisorMonadConstraint m
) ⇒ SupervisorMonad exploration_mode worker_id m (ProgressFor exploration_mode)
getCurrentProgress = wrapIntoSupervisorMonad Implementation.getCurrentProgress
getCurrentStatistics ::
SupervisorFullConstraint worker_id m ⇒
SupervisorMonad exploration_mode worker_id m RunStatistics
getCurrentStatistics = SupervisorMonad Implementation.getCurrentStatistics
getNumberOfWorkers :: SupervisorMonadConstraint m ⇒ SupervisorMonad exploration_mode worker_id m Int
getNumberOfWorkers = wrapIntoSupervisorMonad Implementation.getNumberOfWorkers
tryGetWaitingWorker ::
( SupervisorMonadConstraint m
, SupervisorWorkerIdConstraint worker_id
) ⇒
SupervisorMonad exploration_mode worker_id m (Maybe worker_id)
tryGetWaitingWorker = wrapIntoSupervisorMonad Implementation.tryGetWaitingWorker
disableSupervisorDebugMode :: SupervisorMonadConstraint m ⇒ SupervisorMonad exploration_mode worker_id m ()
disableSupervisorDebugMode = setSupervisorDebugMode False
enableSupervisorDebugMode :: SupervisorMonadConstraint m ⇒ SupervisorMonad exploration_mode worker_id m ()
enableSupervisorDebugMode = setSupervisorDebugMode True
setSupervisorDebugMode :: SupervisorMonadConstraint m ⇒ Bool → SupervisorMonad exploration_mode worker_id m ()
setSupervisorDebugMode = wrapIntoSupervisorMonad . Implementation.setSupervisorDebugMode
runSupervisor ::
( SupervisorMonadConstraint m
, SupervisorWorkerIdConstraint worker_id
) ⇒
ExplorationMode exploration_mode →
SupervisorCallbacks exploration_mode worker_id m →
SupervisorProgram exploration_mode worker_id m →
m (SupervisorOutcomeFor exploration_mode worker_id)
runSupervisor exploration_mode = runSupervisorStartingFrom exploration_mode (initialProgress exploration_mode)
runSupervisorStartingFrom ::
( SupervisorMonadConstraint m
, SupervisorWorkerIdConstraint worker_id
) ⇒
ExplorationMode exploration_mode →
ProgressFor exploration_mode →
SupervisorCallbacks exploration_mode worker_id m →
SupervisorProgram exploration_mode worker_id m →
m (SupervisorOutcomeFor exploration_mode worker_id)
runSupervisorStartingFrom exploration_mode starting_progress callbacks program =
Implementation.runSupervisorStartingFrom
exploration_mode
starting_progress
callbacks
(unwrapSupervisorMonad . runSupervisorProgram $ program)
runSupervisorProgram ::
SupervisorMonadConstraint m ⇒
SupervisorProgram exploration_mode worker_id m →
SupervisorMonad exploration_mode worker_id m α
runSupervisorProgram program =
case program of
BlockingProgram initialize getRequest processRequest → initialize >> forever (do
debugM "Supervisor waiting for request."
request ← lift getRequest
debugM "Supervisor request has arrived; processing request..."
beginSupervisorOccupied
processRequest request
endSupervisorOccupied
debugM "...Supervisor finished processing request."
)
PollingProgram initialize getMaybeRequest processRequest → initialize >> forever (do
maybe_request ← lift getMaybeRequest
case maybe_request of
Nothing → endSupervisorOccupied
Just request → do
beginSupervisorOccupied
processRequest request
)
UnrestrictedProgram run → run
runUnrestrictedSupervisor ::
( SupervisorMonadConstraint m
, SupervisorWorkerIdConstraint worker_id
) ⇒
ExplorationMode exploration_mode →
SupervisorCallbacks exploration_mode worker_id m →
(∀ α. SupervisorMonad exploration_mode worker_id m α) →
m (SupervisorOutcomeFor exploration_mode worker_id)
runUnrestrictedSupervisor exploration_mode callbacks =
runSupervisorStartingFrom exploration_mode (initialProgress exploration_mode) callbacks
.
UnrestrictedProgram
runUnrestrictedSupervisorStartingFrom ::
( SupervisorMonadConstraint m
, SupervisorWorkerIdConstraint worker_id
) ⇒
ExplorationMode exploration_mode →
ProgressFor exploration_mode →
SupervisorCallbacks exploration_mode worker_id m →
(∀ α. SupervisorMonad exploration_mode worker_id m α) →
m (SupervisorOutcomeFor exploration_mode worker_id)
runUnrestrictedSupervisorStartingFrom exploration_mode starting_progress callbacks =
runSupervisorStartingFrom exploration_mode starting_progress callbacks
.
UnrestrictedProgram