module LogicGrowsOnTrees.Parallel.Main
(
Driver(..)
, DriverParameters(..)
, RunOutcome(..)
, RunOutcomeFor
, RunStatistics(..)
, TerminationReason(..)
, TerminationReasonFor
, mainForExploreTree
, mainForExploreTreeIO
, mainForExploreTreeImpure
, mainForExploreTreeUntilFirst
, mainForExploreTreeIOUntilFirst
, mainForExploreTreeImpureUntilFirst
, mainForExploreTreeUntilFoundUsingPull
, mainForExploreTreeIOUntilFoundUsingPull
, mainForExploreTreeImpureUntilFoundUsingPull
, mainForExploreTreeUntilFoundUsingPush
, mainForExploreTreeIOUntilFoundUsingPush
, mainForExploreTreeImpureUntilFoundUsingPush
, genericMain
, simpleMainForExploreTree
, simpleMainForExploreTreeIO
, simpleMainForExploreTreeImpure
, simpleMainForExploreTreeUntilFirst
, simpleMainForExploreTreeIOUntilFirst
, simpleMainForExploreTreeImpureUntilFirst
, simpleMainForExploreTreeUntilFoundUsingPull
, simpleMainForExploreTreeIOUntilFoundUsingPull
, simpleMainForExploreTreeImpureUntilFoundUsingPull
, simpleMainForExploreTreeUntilFoundUsingPush
, simpleMainForExploreTreeIOUntilFoundUsingPush
, simpleMainForExploreTreeImpureUntilFoundUsingPush
, extractRunOutcomeFromSupervisorOutcome
, mainParser
) where
import Prelude hiding (readFile,writeFile)
import Control.Applicative ((<$>),(<*>),pure)
import Control.Concurrent (ThreadId,killThread,threadDelay)
import Control.Exception (finally,handleJust,onException)
import Control.Monad (forever,liftM,mplus,when)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Tools (ifM)
import Data.ByteString.Lazy (readFile,writeFile)
import Data.Char (toLower)
import Data.Composition ((.*))
import Data.Derive.Serialize
import Data.DeriveTH
import Data.Functor.Identity (Identity)
import Data.Maybe (catMaybes)
import Data.Monoid (Monoid(..))
import Data.Prefix.Units (FormatMode(FormatSiAll),formatValue,unitName)
import Data.Serialize
import System.Console.CmdTheLine
import System.Directory (doesFileExist,removeFile,renameFile)
import System.Environment (getProgName)
import System.IO (hPutStr,hPutStrLn,stderr)
import System.IO.Error (isDoesNotExistError)
import qualified System.Log.Logger as Logger
import System.Log.Logger (Priority(..),setLevel,rootLoggerName,updateGlobalLogger)
import System.Log.Logger.TH
import Text.Printf (printf)
import LogicGrowsOnTrees (Tree,TreeIO,TreeT)
import LogicGrowsOnTrees.Checkpoint
import LogicGrowsOnTrees.Parallel.Common.RequestQueue
import LogicGrowsOnTrees.Parallel.Common.Supervisor
( FunctionOfTimeStatistics(..)
, IndependentMeasurementsStatistics(..)
, RunStatistics(..)
, SupervisorTerminationReason(..)
, SupervisorOutcome(..)
)
import LogicGrowsOnTrees.Parallel.ExplorationMode
import LogicGrowsOnTrees.Parallel.Purity
deriveLoggers "Logger" [INFO,NOTICE]
data CheckpointConfiguration = CheckpointConfiguration
{ checkpoint_path :: FilePath
, checkpoint_interval :: Float
} deriving (Eq,Show)
data LoggingConfiguration = LoggingConfiguration
{ log_level :: Priority
} deriving (Eq,Show)
instance Serialize LoggingConfiguration where
put = put . show . log_level
get = LoggingConfiguration . read <$> get
data StatisticsConfiguration = StatisticsConfiguration
{ show_wall_times :: !Bool
, show_supervisor_occupation :: !Bool
, show_supervisor_monad_occupation :: !Bool
, show_supervisor_calls :: !Bool
, show_worker_occupation :: !Bool
, show_worker_wait_times :: !Bool
, show_steal_wait_times :: !Bool
, show_numbers_of_waiting_workers :: !Bool
, show_numbers_of_available_workloads :: !Bool
, show_instantaneous_workload_request_rates :: !Bool
, show_instantaneous_workload_steal_times :: !Bool
} deriving (Eq,Show)
data SupervisorConfiguration = SupervisorConfiguration
{ maybe_checkpoint_configuration :: Maybe CheckpointConfiguration
, maybe_workload_buffer_size_configuration :: Maybe Int
, statistics_configuration :: StatisticsConfiguration
} deriving (Eq,Show)
data SharedConfiguration tree_configuration = SharedConfiguration
{ logging_configuration :: LoggingConfiguration
, tree_configuration :: tree_configuration
} deriving (Eq,Show)
$( derive makeSerialize ''SharedConfiguration )
data Driver
result_monad
shared_configuration
supervisor_configuration
m n
exploration_mode
= ∀ controller_monad.
( RequestQueueMonad (controller_monad exploration_mode)
, ExplorationModeFor (controller_monad exploration_mode) ~ exploration_mode
) ⇒
Driver (
( Serialize (ProgressFor exploration_mode)
, MonadIO result_monad
) ⇒
DriverParameters
shared_configuration
supervisor_configuration
m n
exploration_mode
controller_monad
→ result_monad ()
)
data DriverParameters
shared_configuration
supervisor_configuration
m n
exploration_mode
controller_monad =
DriverParameters
{
shared_configuration_term :: Term shared_configuration
, supervisor_configuration_term :: Term supervisor_configuration
, program_info :: TermInfo
, initializeGlobalState :: shared_configuration → IO ()
, getStartingProgress :: shared_configuration → supervisor_configuration → IO (ProgressFor exploration_mode)
, notifyTerminated :: shared_configuration → supervisor_configuration → RunOutcomeFor exploration_mode → IO ()
, constructExplorationMode :: shared_configuration → ExplorationMode exploration_mode
, constructTree :: shared_configuration → TreeT m (ResultFor exploration_mode)
, purity :: Purity m n
, constructController :: shared_configuration → supervisor_configuration → controller_monad exploration_mode ()
}
data RunOutcome progress final_result = RunOutcome
{
runStatistics :: RunStatistics
, runTerminationReason :: TerminationReason progress final_result
} deriving (Eq,Show)
type RunOutcomeFor exploration_mode = RunOutcome (ProgressFor exploration_mode) (FinalResultFor exploration_mode)
data TerminationReason progress final_result =
Aborted progress
| Completed final_result
| Failure progress String
deriving (Eq,Show)
type TerminationReasonFor exploration_mode = TerminationReason (ProgressFor exploration_mode) (FinalResultFor exploration_mode)
instance ArgVal Priority where
converter = enum $
[DEBUG,INFO,NOTICE,WARNING,ERROR,CRITICAL,ALERT,EMERGENCY]
>>=
\level → let name = show level
in return (name,level) `mplus` return (map toLower name,level)
mainForExploreTree ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration Identity IO (AllMode result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) result → IO ())
→
(tree_configuration → Tree result) →
result_monad ()
mainForExploreTree = genericMain (const AllMode) Pure
mainForExploreTreeIO ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration IO IO (AllMode result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) result → IO ())
→
(tree_configuration → TreeIO result) →
result_monad ()
mainForExploreTreeIO = genericMain (const AllMode) io_purity
mainForExploreTreeImpure ::
(Monoid result, Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration m m (AllMode result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) result → IO ())
→
(tree_configuration → TreeT m result) →
result_monad ()
mainForExploreTreeImpure = genericMain (const AllMode) . ImpureAtopIO
mainForExploreTreeUntilFirst ::
(Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration Identity IO (FirstMode result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome Checkpoint (Maybe (Progress result)) → IO ())
→
(tree_configuration → Tree result) →
result_monad ()
mainForExploreTreeUntilFirst = genericMain (const FirstMode) Pure
mainForExploreTreeIOUntilFirst ::
(Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration IO IO (FirstMode result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome Checkpoint (Maybe (Progress result)) → IO ())
→
(tree_configuration → TreeIO result) →
result_monad ()
mainForExploreTreeIOUntilFirst = genericMain (const FirstMode) io_purity
mainForExploreTreeImpureUntilFirst ::
(Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration m m (FirstMode result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome Checkpoint (Maybe (Progress result)) → IO ())
→
(tree_configuration → TreeT m result) →
result_monad ()
mainForExploreTreeImpureUntilFirst = genericMain (const FirstMode) . ImpureAtopIO
mainForExploreTreeUntilFoundUsingPull ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(tree_configuration → result → Bool) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration Identity IO (FoundModeUsingPull result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
(tree_configuration → Tree result) →
result_monad ()
mainForExploreTreeUntilFoundUsingPull constructCondition = genericMain (FoundModeUsingPull . constructCondition) Pure
mainForExploreTreeIOUntilFoundUsingPull ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(tree_configuration → result → Bool) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration IO IO (FoundModeUsingPull result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
(tree_configuration → TreeIO result) →
result_monad ()
mainForExploreTreeIOUntilFoundUsingPull constructCondition = genericMain (FoundModeUsingPull . constructCondition) io_purity
mainForExploreTreeImpureUntilFoundUsingPull ::
(Monoid result, Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(tree_configuration → result → Bool) →
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration m m (FoundModeUsingPull result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
(tree_configuration → TreeT m result) →
result_monad ()
mainForExploreTreeImpureUntilFoundUsingPull constructCondition = genericMain (FoundModeUsingPull . constructCondition) . ImpureAtopIO
mainForExploreTreeUntilFoundUsingPush ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(tree_configuration → result → Bool) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration Identity IO (FoundModeUsingPush result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
(tree_configuration → Tree result) →
result_monad ()
mainForExploreTreeUntilFoundUsingPush constructCondition = genericMain (FoundModeUsingPush . constructCondition) Pure
mainForExploreTreeIOUntilFoundUsingPush ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(tree_configuration → result → Bool) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration IO IO (FoundModeUsingPush result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
(tree_configuration → TreeIO result) →
result_monad ()
mainForExploreTreeIOUntilFoundUsingPush constructCondition = genericMain (FoundModeUsingPush . constructCondition) io_purity
mainForExploreTreeImpureUntilFoundUsingPush ::
(Monoid result, Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(tree_configuration → result → Bool) →
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration m m (FoundModeUsingPush result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
(tree_configuration → TreeT m result) →
result_monad ()
mainForExploreTreeImpureUntilFoundUsingPush constructCondition = genericMain (FoundModeUsingPush . constructCondition) . ImpureAtopIO
genericMain ::
( MonadIO result_monad
, ResultFor exploration_mode ~ result
, Serialize (ProgressFor exploration_mode)
) ⇒
(tree_configuration → ExplorationMode exploration_mode)
→
Purity m n →
Driver
result_monad
(SharedConfiguration tree_configuration)
SupervisorConfiguration
m n
exploration_mode
→
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcomeFor exploration_mode → IO ())
→
(tree_configuration → TreeT m result) →
result_monad ()
genericMain constructExplorationMode_ purity (Driver run) tree_configuration_term program_info notifyTerminated_ constructTree_ =
run DriverParameters{..}
where
constructExplorationMode = constructExplorationMode_ . tree_configuration
shared_configuration_term = makeSharedConfigurationTerm tree_configuration_term
supervisor_configuration_term =
SupervisorConfiguration
<$> checkpoint_configuration_term
<*> maybe_workload_buffer_size_configuration_term
<*> statistics_configuration_term
initializeGlobalState SharedConfiguration{logging_configuration=LoggingConfiguration{..}} =
updateGlobalLogger rootLoggerName (setLevel log_level)
constructTree = constructTree_ . tree_configuration
getStartingProgress shared_configuration SupervisorConfiguration{..} =
case maybe_checkpoint_configuration of
Nothing → (infoM "Checkpointing is NOT enabled") >> return initial_progress
Just CheckpointConfiguration{..} → do
noticeM $ "Checkpointing enabled"
noticeM $ "Checkpoint file is " ++ checkpoint_path
noticeM $ "Checkpoint interval is " ++ show checkpoint_interval ++ " seconds"
ifM (doesFileExist checkpoint_path)
(noticeM "Loading existing checkpoint file" >> either error id . decodeLazy <$> readFile checkpoint_path)
(return initial_progress)
where
initial_progress = initialProgress . constructExplorationMode $ shared_configuration
notifyTerminated SharedConfiguration{..} SupervisorConfiguration{..} run_outcome@RunOutcome{..} =
case maybe_checkpoint_configuration of
Nothing → doEndOfRun
Just CheckpointConfiguration{checkpoint_path} →
do doEndOfRun
noticeM "Deleting any remaining checkpoint file"
removeFileIfExists checkpoint_path
`finally`
case runTerminationReason of
Aborted checkpoint → writeCheckpointFile checkpoint_path checkpoint
Failure checkpoint _ → writeCheckpointFile checkpoint_path checkpoint
_ → return ()
where
doEndOfRun = do
showStatistics statistics_configuration runStatistics
notifyTerminated_ tree_configuration run_outcome
constructController = const controllerLoop
simpleMainForExploreTree ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration Identity IO (AllMode result) →
(RunOutcome (Progress result) result → IO ())
→
Tree result →
result_monad ()
simpleMainForExploreTree = dispatchToMainFunction mainForExploreTree
simpleMainForExploreTreeIO ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration IO IO (AllMode result) →
(RunOutcome (Progress result) result → IO ())
→
TreeIO result →
result_monad ()
simpleMainForExploreTreeIO = dispatchToMainFunction mainForExploreTreeIO
simpleMainForExploreTreeImpure ::
(Monoid result, Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration m m (AllMode result) →
(RunOutcome (Progress result) result → IO ())
→
TreeT m result →
result_monad ()
simpleMainForExploreTreeImpure = dispatchToMainFunction . mainForExploreTreeImpure
simpleMainForExploreTreeUntilFirst ::
(Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration Identity IO (FirstMode result) →
(RunOutcome Checkpoint (Maybe (Progress result)) → IO ())
→
Tree result →
result_monad ()
simpleMainForExploreTreeUntilFirst = dispatchToMainFunction mainForExploreTreeUntilFirst
simpleMainForExploreTreeIOUntilFirst ::
(Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration IO IO (FirstMode result) →
(RunOutcome Checkpoint (Maybe (Progress result)) → IO ())
→
TreeIO result →
result_monad ()
simpleMainForExploreTreeIOUntilFirst = dispatchToMainFunction mainForExploreTreeIOUntilFirst
simpleMainForExploreTreeImpureUntilFirst ::
(Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration m m (FirstMode result) →
(RunOutcome Checkpoint (Maybe (Progress result)) → IO ())
→
TreeT m result →
result_monad ()
simpleMainForExploreTreeImpureUntilFirst = dispatchToMainFunction . mainForExploreTreeImpureUntilFirst
simpleMainForExploreTreeUntilFoundUsingPull ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(result → Bool) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration Identity IO (FoundModeUsingPull result) →
(RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
Tree result →
result_monad ()
simpleMainForExploreTreeUntilFoundUsingPull = dispatchToMainFunction . mainForExploreTreeUntilFoundUsingPull . const
simpleMainForExploreTreeIOUntilFoundUsingPull ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(result → Bool) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration IO IO (FoundModeUsingPull result) →
(RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
TreeIO result →
result_monad ()
simpleMainForExploreTreeIOUntilFoundUsingPull = dispatchToMainFunction . mainForExploreTreeIOUntilFoundUsingPull . const
simpleMainForExploreTreeImpureUntilFoundUsingPull ::
(Monoid result, Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(result → Bool) →
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration m m (FoundModeUsingPull result) →
(RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
TreeT m result →
result_monad ()
simpleMainForExploreTreeImpureUntilFoundUsingPull = (dispatchToMainFunction .* mainForExploreTreeImpureUntilFoundUsingPull) . const
simpleMainForExploreTreeUntilFoundUsingPush ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(result → Bool) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration Identity IO (FoundModeUsingPush result) →
(RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
Tree result →
result_monad ()
simpleMainForExploreTreeUntilFoundUsingPush = dispatchToMainFunction . mainForExploreTreeUntilFoundUsingPush . const
simpleMainForExploreTreeIOUntilFoundUsingPush ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(result → Bool) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration IO IO (FoundModeUsingPush result) →
(RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
TreeIO result →
result_monad ()
simpleMainForExploreTreeIOUntilFoundUsingPush = dispatchToMainFunction . mainForExploreTreeIOUntilFoundUsingPush . const
simpleMainForExploreTreeImpureUntilFoundUsingPush ::
(Monoid result, Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(result → Bool) →
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration m m (FoundModeUsingPush result) →
(RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
TreeT m result →
result_monad ()
simpleMainForExploreTreeImpureUntilFoundUsingPush = (dispatchToMainFunction .* mainForExploreTreeImpureUntilFoundUsingPush) . const
extractRunOutcomeFromSupervisorOutcome ::
Show worker_id ⇒
SupervisorOutcome fv ip worker_id →
RunOutcome ip fv
extractRunOutcomeFromSupervisorOutcome SupervisorOutcome{..} = RunOutcome{..}
where
runTerminationReason =
case supervisorTerminationReason of
SupervisorAborted remaining_progress → Aborted remaining_progress
SupervisorCompleted result → Completed result
SupervisorFailure remainig_progress worker_id message →
Failure remainig_progress $ "Worker " ++ show worker_id ++ " failed with message: " ++ message
runStatistics = supervisorRunStatistics
mainParser :: Term α → TermInfo → IO α
mainParser term term_info =
(if null (termName term_info)
then getProgName >>= \progname → return $ term_info {termName = progname}
else return term_info
) >>= exec . (term,)
default_terminfo :: TermInfo
default_terminfo = defTI { termDoc = "LogicGrowsOnTrees program" }
dispatchToMainFunction f driver notifyTerminated tree =
f driver
(pure ())
default_terminfo
(const notifyTerminated)
(const tree)
checkpoint_configuration_term :: Term (Maybe CheckpointConfiguration)
checkpoint_configuration_term =
maybe (const Nothing) (Just .* CheckpointConfiguration)
<$> value (flip opt (
(optInfo ["c","checkpoint-file"])
{ optName = "FILEPATH"
, optDoc = "This enables periodic checkpointing with the given path specifying the location of the checkpoint file; if the file already exists then it will be loaded as the initial starting point for the search."
}
) Nothing)
<*> value (flip opt (
(optInfo ["i","checkpoint-interval"])
{ optName = "SECONDS"
, optDoc = "This specifies the time between checkpoints (in seconds, decimals allowed); it is ignored if checkpoint file is not specified."
}
) 60)
logging_configuration_term :: Term LoggingConfiguration
logging_configuration_term =
LoggingConfiguration
<$> value (flip opt (
(optInfo ["l","log-level"])
{ optName = "LEVEL"
, optDoc = "This specifies the upper bound (inclusive) on the importance of the messages that will be logged; it must be one of (in increasing order of importance): DEBUG, INFO, NOTICE, WARNING, ERROR, CRITICAL, ALERT, or EMERGENCY."
}
) WARNING)
statistics_configuration_term :: Term StatisticsConfiguration
statistics_configuration_term =
(\show_all → if show_all then const (StatisticsConfiguration True True True True True True True True True True True) else id)
<$> value (flag ((optInfo ["show-all"]) { optDoc ="This option will cause *all* run statistic to be printed to standard error after the program terminates." }))
<*> (StatisticsConfiguration
<$> value (flag ((optInfo ["show-walltimes"]) { optDoc ="This option will cause the starting, ending, and duration wall time of the run to be printed to standard error after the program terminates." }))
<*> value (flag ((optInfo ["show-supervisor-occupation"]) { optDoc ="This option will cause the supervisor occupation percentage to be printed to standard error after the program terminates." }))
<*> value (flag ((optInfo ["show-supervisor-monad-occupation"]) { optDoc ="This option will cause the supervisor monad occupation percentage to be printed to standard error after the program terminates." }))
<*> value (flag ((optInfo ["show-supervisor-calls"]) { optDoc ="This option will cause the number of supervisor calls and average time per supervisor call to be printed to standard error after the program terminates." }))
<*> value (flag ((optInfo ["show-worker-occupation"]) { optDoc ="This option will cause the worker occupation percentage to be printed to standard error after the program terminates." }))
<*> value (flag ((optInfo ["show-worker-wait-times"]) { optDoc ="This option will cause statistics about the worker wait times to be printed to standard error after the program terminates." }))
<*> value (flag ((optInfo ["show-steal-wait-times"]) { optDoc ="This option will cause statistics about the steal wait times to be printed to standard error after the program terminates." }))
<*> value (flag ((optInfo ["show-numbers-of-waiting-workers"]) { optDoc ="This option will cause statistics about the number of waiting workers to be printed to standard error after the program terminates." }))
<*> value (flag ((optInfo ["show-numbers-of-available-workloads"]) { optDoc ="This option will cause statistics about the number of available workloads to be printed to standard error after the program terminates." }))
<*> value (flag ((optInfo ["show-workload-request-rate"]) { optDoc ="This option will cause statistics about the (roughly) instantaneous rate at which workloads are requested by finished works to be printed to standard error after the program terminates." }))
<*> value (flag ((optInfo ["show-workload-steal-time"]) { optDoc ="This option will cause statistics about the (roughly) instantaneous amount of time that it took to steal a workload to be printed to standard error after the program terminates." }))
)
maybe_workload_buffer_size_configuration_term :: Term (Maybe Int)
maybe_workload_buffer_size_configuration_term =
value (opt Nothing ((optInfo ["buffer-size"]) { optName = "SIZE", optDoc = "This option sets the size of the workload buffer, which contains stolen workloads that are held at the supervisor so that if a worker needs a new workload it can be given one immediately rather than having to wait for a new workload to be stolen. This setting should be large enough that a request for a new workload can always be answered immediately using a workload from the buffer, which is roughly a function of the product of the number of workloads requested per second and the time needed to steal a new workload (both of which are server statistics than you can request to see upon completions). If you are not having problems with scaling, then you can ignore this option (it defaults to 4)." }))
makeSharedConfigurationTerm :: Term tree_configuration → Term (SharedConfiguration tree_configuration)
makeSharedConfigurationTerm tree_configuration_term =
SharedConfiguration
<$> logging_configuration_term
<*> tree_configuration_term
checkpointLoop ::
( RequestQueueMonad m
, Serialize (ProgressFor (ExplorationModeFor m))
) ⇒ CheckpointConfiguration → m α
checkpointLoop CheckpointConfiguration{..} = forever $ do
liftIO $ threadDelay delay
requestProgressUpdate >>= writeCheckpointFile checkpoint_path
where
delay = round $ checkpoint_interval * 1000000
controllerLoop ::
( RequestQueueMonad m
, Serialize (ProgressFor (ExplorationModeFor m))
) ⇒ SupervisorConfiguration → m ()
controllerLoop SupervisorConfiguration{..} = do
maybe (return ()) setWorkloadBufferSize $ maybe_workload_buffer_size_configuration
maybe_checkpoint_thread_id ← maybeForkIO checkpointLoop maybe_checkpoint_configuration
case catMaybes
[maybe_checkpoint_thread_id
]
of [] → return ()
thread_ids → liftIO $
(forever $ threadDelay 3600000000)
`finally`
(mapM_ killThread thread_ids)
maybeForkIO :: RequestQueueMonad m ⇒ (α → m ()) → Maybe α → m (Maybe ThreadId)
maybeForkIO loop = maybe (return Nothing) (liftM Just . fork . loop)
removeFileIfExists :: FilePath → IO ()
removeFileIfExists path =
handleJust
(\e → if isDoesNotExistError e then Nothing else Just ())
(\_ → return ())
(removeFile path)
showStatistics :: MonadIO m ⇒ StatisticsConfiguration → RunStatistics → m ()
showStatistics StatisticsConfiguration{..} RunStatistics{..} = liftIO $ do
let total_time :: Double
total_time = realToFrac runWallTime
when show_wall_times $
hPutStrLn stderr $
printf "Run started at %s, ended at %s, and took %sseconds.\n"
(show runStartTime)
(show runEndTime)
(showWithUnitPrefix total_time)
hPutStr stderr $
case (show_supervisor_occupation,show_supervisor_monad_occupation) of
(True,False) → printf "Supervior was occupied for %.2f%% of the run.\n\n" (runSupervisorOccupation*100)
(False,True) → printf "Supervisor ran inside the SupervisorMonad for %.2f%% of the run.\n\n" (runSupervisorMonadOccupation*100)
(True,True) → printf "Supervior was occupied for %.2f%% of the run, of which %.2f%% was spent inside the SupervisorMonad.\n\n" (runSupervisorOccupation*100) (runSupervisorOccupation/runSupervisorMonadOccupation*100)
_ → ""
when show_supervisor_calls $
hPutStrLn stderr $
printf "%i calls were made into the supervisor monad, and each took an average of %sseconds.\n"
runNumberOfCalls
(showWithUnitPrefix runAverageTimePerCall)
when show_worker_occupation $
hPutStrLn stderr $
printf "Workers were occupied %.2f%% of the time on average.\n"
(runWorkerOccupation*100)
when show_worker_wait_times $ do
let FunctionOfTimeStatistics{..} = runWorkerWaitTimes
hPutStrLn stderr $
if timeCount == 0
then
"At no point did a worker receive a new workload after finishing a workload."
else
if timeMax == 0
then
printf "Workers completed their task and obtained a new workload %i times and never had to wait to receive the new workload."
timeCount
else
printf
(unlines
["Workers completed their task and obtained a new workload %i times with an average of one every %sseconds or %.1g enqueues/second."
,"The minimum waiting time was %sseconds, and the maximum waiting time was %sseconds."
,"On average, a worker had to wait %sseconds +/- %sseconds (std. dev) for a new workload."
]
)
timeCount
(showWithUnitPrefix $ total_time / fromIntegral timeCount)
(fromIntegral timeCount / total_time)
(showWithUnitPrefix timeMin)
(showWithUnitPrefix timeMax)
(showWithUnitPrefix timeAverage)
(showWithUnitPrefix timeStdDev)
when show_steal_wait_times $ do
let IndependentMeasurementsStatistics{..} = runStealWaitTimes
hPutStrLn stderr $
if statCount == 0
then
"No workloads were stolen."
else
printf
(unlines
["Workloads were stolen %i times with an average of %sseconds between each steal or %.1g steals/second."
,"The minimum waiting time for a steal was %sseconds, and the maximum waiting time was %sseconds."
,"On average, it took %sseconds +/- %sseconds (std. dev) to steal a workload."
]
)
statCount
(showWithUnitPrefix $ total_time / fromIntegral statCount)
(fromIntegral statCount / total_time)
(showWithUnitPrefix statMin)
(showWithUnitPrefix statMax)
(showWithUnitPrefix statAverage)
(showWithUnitPrefix statStdDev)
when show_numbers_of_waiting_workers $ do
let FunctionOfTimeStatistics{..} = runWaitingWorkerStatistics
hPutStrLn stderr $
if timeMax == 0
then
printf "No worker ever had to wait for a workload to become available.\n"
else if timeMin == 0
then
printf "On average, %.1f +/ - %.1f (std. dev) workers were waiting at any given time; never more than %i.\n"
timeAverage
timeStdDev
timeMax
else
printf "On average, %.1f +/ - %.1f (std. dev) workers were waiting at any given time; never more than %i nor fewer than %i.\n"
timeAverage
timeStdDev
timeMax
timeMin
when show_numbers_of_available_workloads $ do
let FunctionOfTimeStatistics{..} = runAvailableWorkloadStatistics
hPutStrLn stderr $
if timeMax == 0
then
printf "No workload ever had to wait for an available worker.\n"
else if timeMin == 0
then
printf "On average, %.1f +/ - %.1f (std. dev) workloads were waiting for a worker at any given time; never more than %i.\n"
timeAverage
timeStdDev
timeMax
else
printf "On average, %.1f +/ - %.1f (std. dev) workloads were waiting for a worker at any given time; never more than %i nor fewer than %i.\n"
timeAverage
timeStdDev
timeMax
timeMin
when show_instantaneous_workload_request_rates $ do
let FunctionOfTimeStatistics{..} = runInstantaneousWorkloadRequestRateStatistics
hPutStrLn stderr $
printf
(unlines
["On average, the instantanenous rate at which workloads were being requested was %.1f +/ - %.1f (std. dev) requests per second; the rate never fell below %.1f nor rose above %.1f."
,"This value was obtained by exponentially smoothing the request data over a time scale of one second."
]
)
timeAverage
timeStdDev
timeMin
timeMax
when show_instantaneous_workload_steal_times $ do
let FunctionOfTimeStatistics{..} = runInstantaneousWorkloadStealTimeStatistics
hPutStrLn stderr $
printf
(unlines
["On average, the instantaneous time to steal a workload was %sseconds +/ - %sseconds (std. dev); this time interval never fell below %sseconds nor rose above %sseconds."
,"This value was obtained by exponentially smoothing the request data over a time scale of one second."
]
)
(showWithUnitPrefix timeAverage)
(showWithUnitPrefix timeStdDev)
(showWithUnitPrefix timeMin)
(showWithUnitPrefix timeMax)
where
showWithUnitPrefix :: Real n ⇒ n → String
showWithUnitPrefix 0 = "0 "
showWithUnitPrefix x = printf "%.1f %s" x_scaled (unitName unit)
where
(x_scaled :: Float,Just unit) = formatValue (Left FormatSiAll) . realToFrac $ x
writeCheckpointFile :: (Serialize ip, MonadIO m) ⇒ FilePath → ip → m ()
writeCheckpointFile checkpoint_path checkpoint = do
noticeM $ "Writing checkpoint file"
liftIO $
(do writeFile checkpoint_temp_path (encodeLazy checkpoint)
renameFile checkpoint_temp_path checkpoint_path
) `onException` (
removeFileIfExists checkpoint_temp_path
)
where
checkpoint_temp_path = checkpoint_path ++ ".tmp"