module LogicGrowsOnTrees.Parallel.Main
(
Driver(..)
, DriverParameters(..)
, RunOutcome(..)
, RunOutcomeFor
, RunStatistics(..)
, TerminationReason(..)
, TerminationReasonFor
, mainForExploreTree
, mainForExploreTreeIO
, mainForExploreTreeImpure
, mainForExploreTreeUntilFirst
, mainForExploreTreeIOUntilFirst
, mainForExploreTreeImpureUntilFirst
, mainForExploreTreeUntilFoundUsingPull
, mainForExploreTreeIOUntilFoundUsingPull
, mainForExploreTreeImpureUntilFoundUsingPull
, mainForExploreTreeUntilFoundUsingPush
, mainForExploreTreeIOUntilFoundUsingPush
, mainForExploreTreeImpureUntilFoundUsingPush
, genericMain
, simpleMainForExploreTree
, simpleMainForExploreTreeIO
, simpleMainForExploreTreeImpure
, simpleMainForExploreTreeUntilFirst
, simpleMainForExploreTreeIOUntilFirst
, simpleMainForExploreTreeImpureUntilFirst
, simpleMainForExploreTreeUntilFoundUsingPull
, simpleMainForExploreTreeIOUntilFoundUsingPull
, simpleMainForExploreTreeImpureUntilFoundUsingPull
, simpleMainForExploreTreeUntilFoundUsingPush
, simpleMainForExploreTreeIOUntilFoundUsingPush
, simpleMainForExploreTreeImpureUntilFoundUsingPush
, extractRunOutcomeFromSupervisorOutcome
, mainMan
, mainParser
) where
import Prelude hiding (catch,readFile,writeFile)
import Control.Applicative ((<$>),(<*>),pure)
import Control.Arrow ((&&&))
import Control.Concurrent (threadDelay)
import Control.Exception (AsyncException,SomeException,finally,fromException,handleJust,onException)
import Control.Monad (forM_,forever,join,liftM2,mplus,when,unless,void)
import Control.Monad.CatchIO (catch)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Tools (ifM)
import qualified Control.Monad.Trans.State.Strict as State
import Control.Monad.Trans.Class (lift)
import Control.Monad.Trans.State.Strict (evalStateT)
import Data.ByteString.Lazy (readFile,writeFile)
import Data.Char (toLower)
import Data.Composition ((.*),(.**))
import Data.Derive.Serialize
import Data.DeriveTH
import Data.Function (on)
import Data.Functor.Identity (Identity)
import Data.IORef (IORef,newIORef,readIORef,writeIORef)
import Data.List (find,intercalate,nub)
import Data.List.Split (splitOn)
import Data.Maybe (isJust)
import Data.Monoid (Monoid(..))
import Data.Ord (comparing)
import Data.Prefix.Units (FormatMode(FormatSiAll),formatValue,unitName)
import Data.Serialize
import Data.Time.Clock (NominalDiffTime)
import System.Console.CmdTheLine
import System.Directory (doesFileExist,removeFile,renameFile)
import System.Environment (getProgName)
import System.IO (hFlush,hPutStrLn,stderr,stdout)
import System.IO.Error (isDoesNotExistError)
import qualified System.Log.Logger as Logger
import System.Log.Formatter (simpleLogFormatter)
import System.Log.Handler (setFormatter)
import System.Log.Handler.Simple (streamHandler)
import System.Log.Logger (Priority(..),logM,rootLoggerName,setHandlers,setLevel,updateGlobalLogger)
import System.Log.Logger.TH
import Text.PrettyPrint (text)
import Text.Printf (printf)
import LogicGrowsOnTrees (Tree,TreeIO,TreeT)
import LogicGrowsOnTrees.Checkpoint
import LogicGrowsOnTrees.Parallel.Common.RequestQueue
import LogicGrowsOnTrees.Parallel.Common.Supervisor
( FunctionOfTimeStatistics(..)
, IndependentMeasurementsStatistics(..)
, RunStatistics(..)
, SupervisorTerminationReason(..)
, SupervisorOutcome(..)
)
import LogicGrowsOnTrees.Parallel.ExplorationMode
import LogicGrowsOnTrees.Parallel.Purity
deriveLoggers "Logger" [INFO,NOTICE,ERROR]
type Tense = String → String → String
data Statistic = Statistic
{ statisticLongName :: String
, statisticShortName :: String
, statisticDescription :: String
, statisticApplication :: Tense → RunStatistics → String
}
instance Eq Statistic where
(==) = (==) `on` statisticLongName
instance Ord Statistic where
compare = comparing statisticLongName
data CheckpointConfiguration = CheckpointConfiguration
{ maybe_checkpoint_path :: Maybe FilePath
, checkpoint_interval :: Float
} deriving (Eq,Show)
data LoggingConfiguration = LoggingConfiguration
{ log_level :: Priority
, maybe_log_format :: Maybe String
} deriving (Eq,Show)
instance Serialize Priority where
put = put . show
get = read <$> get
$( derive makeSerialize ''LoggingConfiguration )
data StatisticsConfiguration = StatisticsConfiguration
{ end_stats_configuration :: [[Statistic]]
, log_end_stats_configuration :: Bool
, log_stats_configuration :: [[Statistic]]
, log_stats_level_configuration :: Priority
, log_stats_interval_configuration :: Float
}
data SupervisorConfiguration = SupervisorConfiguration
{ checkpoint_configuration :: CheckpointConfiguration
, maybe_workload_buffer_size_configuration :: Maybe Int
, statistics_configuration :: StatisticsConfiguration
, show_cpu_time :: Bool
}
data SharedConfiguration tree_configuration = SharedConfiguration
{ logging_configuration :: LoggingConfiguration
, tree_configuration :: tree_configuration
} deriving (Eq,Show)
$( derive makeSerialize ''SharedConfiguration )
data ProgressAndCPUTime progress = ProgressAndCPUTime progress Rational
$( derive makeSerialize ''ProgressAndCPUTime )
data Driver
result_monad
shared_configuration
supervisor_configuration
m n
exploration_mode
= ∀ controller_monad.
( RequestQueueMonad (controller_monad exploration_mode)
, ExplorationModeFor (controller_monad exploration_mode) ~ exploration_mode
) ⇒
Driver (
( Serialize (ProgressFor exploration_mode)
, MonadIO result_monad
) ⇒
DriverParameters
shared_configuration
supervisor_configuration
m n
exploration_mode
controller_monad
→ result_monad ()
)
data DriverParameters
shared_configuration
supervisor_configuration
m n
exploration_mode
controller_monad =
DriverParameters
{
shared_configuration_term :: Term shared_configuration
, supervisor_configuration_term :: Term supervisor_configuration
, program_info :: TermInfo
, initializeGlobalState :: shared_configuration → IO ()
, getStartingProgress :: shared_configuration → supervisor_configuration → IO (ProgressFor exploration_mode)
, notifyTerminated :: shared_configuration → supervisor_configuration → RunOutcomeFor exploration_mode → IO ()
, constructExplorationMode :: shared_configuration → ExplorationMode exploration_mode
, constructTree :: shared_configuration → TreeT m (ResultFor exploration_mode)
, purity :: Purity m n
, constructController :: shared_configuration → supervisor_configuration → controller_monad exploration_mode ()
}
data RunOutcome progress final_result = RunOutcome
{
runStatistics :: RunStatistics
, runTerminationReason :: TerminationReason progress final_result
} deriving (Eq,Show)
type RunOutcomeFor exploration_mode = RunOutcome (ProgressFor exploration_mode) (FinalResultFor exploration_mode)
data TerminationReason progress final_result =
Aborted progress
| Completed final_result
| Failure progress String
deriving (Eq,Show)
type TerminationReasonFor exploration_mode = TerminationReason (ProgressFor exploration_mode) (FinalResultFor exploration_mode)
instance ArgVal Priority where
converter = enum $
[DEBUG,INFO,NOTICE,WARNING,ERROR,CRITICAL,ALERT,EMERGENCY]
>>=
\level → let name = show level
in return (name,level) `mplus` return (map toLower name,level)
instance ArgVal [Statistic] where
converter = (parse,pretty)
where
parse = go [] . splitOn ","
where
go stats [] = Right . reverse . nub $ stats
go _ ("all":_) = Right statistics
go stats (name:rest) =
case find (\Statistic{..} → statisticLongName == name || statisticShortName == name) statistics of
Just stat → go (stat:stats) rest
Nothing → Left . text $ "unrecognized statistic '" ++ show name ++ "'"
pretty stats
| length stats == length statistics = text "all"
| otherwise = text . intercalate "," . map statisticLongName $ stats
mainForExploreTree ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration Identity IO (AllMode result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) result → IO ())
→
(tree_configuration → Tree result) →
result_monad ()
mainForExploreTree = genericMain (const AllMode) Pure
mainForExploreTreeIO ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration IO IO (AllMode result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) result → IO ())
→
(tree_configuration → TreeIO result) →
result_monad ()
mainForExploreTreeIO = genericMain (const AllMode) io_purity
mainForExploreTreeImpure ::
(Monoid result, Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration m m (AllMode result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) result → IO ())
→
(tree_configuration → TreeT m result) →
result_monad ()
mainForExploreTreeImpure = genericMain (const AllMode) . ImpureAtopIO
mainForExploreTreeUntilFirst ::
(Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration Identity IO (FirstMode result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome Checkpoint (Maybe (Progress result)) → IO ())
→
(tree_configuration → Tree result) →
result_monad ()
mainForExploreTreeUntilFirst = genericMain (const FirstMode) Pure
mainForExploreTreeIOUntilFirst ::
(Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration IO IO (FirstMode result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome Checkpoint (Maybe (Progress result)) → IO ())
→
(tree_configuration → TreeIO result) →
result_monad ()
mainForExploreTreeIOUntilFirst = genericMain (const FirstMode) io_purity
mainForExploreTreeImpureUntilFirst ::
(Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration m m (FirstMode result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome Checkpoint (Maybe (Progress result)) → IO ())
→
(tree_configuration → TreeT m result) →
result_monad ()
mainForExploreTreeImpureUntilFirst = genericMain (const FirstMode) . ImpureAtopIO
mainForExploreTreeUntilFoundUsingPull ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(tree_configuration → result → Bool) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration Identity IO (FoundModeUsingPull result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
(tree_configuration → Tree result) →
result_monad ()
mainForExploreTreeUntilFoundUsingPull constructCondition = genericMain (FoundModeUsingPull . constructCondition) Pure
mainForExploreTreeIOUntilFoundUsingPull ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(tree_configuration → result → Bool) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration IO IO (FoundModeUsingPull result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
(tree_configuration → TreeIO result) →
result_monad ()
mainForExploreTreeIOUntilFoundUsingPull constructCondition = genericMain (FoundModeUsingPull . constructCondition) io_purity
mainForExploreTreeImpureUntilFoundUsingPull ::
(Monoid result, Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(tree_configuration → result → Bool) →
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration m m (FoundModeUsingPull result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
(tree_configuration → TreeT m result) →
result_monad ()
mainForExploreTreeImpureUntilFoundUsingPull constructCondition = genericMain (FoundModeUsingPull . constructCondition) . ImpureAtopIO
mainForExploreTreeUntilFoundUsingPush ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(tree_configuration → result → Bool) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration Identity IO (FoundModeUsingPush result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
(tree_configuration → Tree result) →
result_monad ()
mainForExploreTreeUntilFoundUsingPush constructCondition = genericMain (FoundModeUsingPush . constructCondition) Pure
mainForExploreTreeIOUntilFoundUsingPush ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(tree_configuration → result → Bool) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration IO IO (FoundModeUsingPush result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
(tree_configuration → TreeIO result) →
result_monad ()
mainForExploreTreeIOUntilFoundUsingPush constructCondition = genericMain (FoundModeUsingPush . constructCondition) io_purity
mainForExploreTreeImpureUntilFoundUsingPush ::
(Monoid result, Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(tree_configuration → result → Bool) →
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration tree_configuration) SupervisorConfiguration m m (FoundModeUsingPush result) →
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
(tree_configuration → TreeT m result) →
result_monad ()
mainForExploreTreeImpureUntilFoundUsingPush constructCondition = genericMain (FoundModeUsingPush . constructCondition) . ImpureAtopIO
simpleMainForExploreTree ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration Identity IO (AllMode result) →
(RunOutcome (Progress result) result → IO ())
→
Tree result →
result_monad ()
simpleMainForExploreTree = dispatchToMainFunction mainForExploreTree
simpleMainForExploreTreeIO ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration IO IO (AllMode result) →
(RunOutcome (Progress result) result → IO ())
→
TreeIO result →
result_monad ()
simpleMainForExploreTreeIO = dispatchToMainFunction mainForExploreTreeIO
simpleMainForExploreTreeImpure ::
(Monoid result, Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration m m (AllMode result) →
(RunOutcome (Progress result) result → IO ())
→
TreeT m result →
result_monad ()
simpleMainForExploreTreeImpure = dispatchToMainFunction . mainForExploreTreeImpure
simpleMainForExploreTreeUntilFirst ::
(Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration Identity IO (FirstMode result) →
(RunOutcome Checkpoint (Maybe (Progress result)) → IO ())
→
Tree result →
result_monad ()
simpleMainForExploreTreeUntilFirst = dispatchToMainFunction mainForExploreTreeUntilFirst
simpleMainForExploreTreeIOUntilFirst ::
(Serialize result, MonadIO result_monad) ⇒
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration IO IO (FirstMode result) →
(RunOutcome Checkpoint (Maybe (Progress result)) → IO ())
→
TreeIO result →
result_monad ()
simpleMainForExploreTreeIOUntilFirst = dispatchToMainFunction mainForExploreTreeIOUntilFirst
simpleMainForExploreTreeImpureUntilFirst ::
(Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration m m (FirstMode result) →
(RunOutcome Checkpoint (Maybe (Progress result)) → IO ())
→
TreeT m result →
result_monad ()
simpleMainForExploreTreeImpureUntilFirst = dispatchToMainFunction . mainForExploreTreeImpureUntilFirst
simpleMainForExploreTreeUntilFoundUsingPull ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(result → Bool) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration Identity IO (FoundModeUsingPull result) →
(RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
Tree result →
result_monad ()
simpleMainForExploreTreeUntilFoundUsingPull = dispatchToMainFunction . mainForExploreTreeUntilFoundUsingPull . const
simpleMainForExploreTreeIOUntilFoundUsingPull ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(result → Bool) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration IO IO (FoundModeUsingPull result) →
(RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
TreeIO result →
result_monad ()
simpleMainForExploreTreeIOUntilFoundUsingPull = dispatchToMainFunction . mainForExploreTreeIOUntilFoundUsingPull . const
simpleMainForExploreTreeImpureUntilFoundUsingPull ::
(Monoid result, Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(result → Bool) →
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration m m (FoundModeUsingPull result) →
(RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
TreeT m result →
result_monad ()
simpleMainForExploreTreeImpureUntilFoundUsingPull = (dispatchToMainFunction .* mainForExploreTreeImpureUntilFoundUsingPull) . const
simpleMainForExploreTreeUntilFoundUsingPush ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(result → Bool) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration Identity IO (FoundModeUsingPush result) →
(RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
Tree result →
result_monad ()
simpleMainForExploreTreeUntilFoundUsingPush = dispatchToMainFunction . mainForExploreTreeUntilFoundUsingPush . const
simpleMainForExploreTreeIOUntilFoundUsingPush ::
(Monoid result, Serialize result, MonadIO result_monad) ⇒
(result → Bool) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration IO IO (FoundModeUsingPush result) →
(RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
TreeIO result →
result_monad ()
simpleMainForExploreTreeIOUntilFoundUsingPush = dispatchToMainFunction . mainForExploreTreeIOUntilFoundUsingPush . const
simpleMainForExploreTreeImpureUntilFoundUsingPush ::
(Monoid result, Serialize result, MonadIO result_monad, Functor m, MonadIO m) ⇒
(result → Bool) →
(∀ β. m β → IO β) →
Driver result_monad (SharedConfiguration ()) SupervisorConfiguration m m (FoundModeUsingPush result) →
(RunOutcome (Progress result) (Either result (Progress result)) → IO ())
→
TreeT m result →
result_monad ()
simpleMainForExploreTreeImpureUntilFoundUsingPush = (dispatchToMainFunction .* mainForExploreTreeImpureUntilFoundUsingPush) . const
genericMain ::
( MonadIO result_monad
, ResultFor exploration_mode ~ result
, Serialize (ProgressFor exploration_mode)
) ⇒
(tree_configuration → ExplorationMode exploration_mode)
→
Purity m n →
Driver
result_monad
(SharedConfiguration tree_configuration)
SupervisorConfiguration
m n
exploration_mode
→
Term tree_configuration →
TermInfo
→
(tree_configuration → RunOutcomeFor exploration_mode → IO ())
→
(tree_configuration → TreeT m result) →
result_monad ()
genericMain constructExplorationMode_ purity (Driver run) tree_configuration_term program_info_ notifyTerminated_user constructTree_ = do
tracker_ref ← liftIO . newIORef $ error "tracker was not set"
let constructController = constructController_ tracker_ref
notifyTerminated = notifyTerminated_main tracker_ref
getStartingProgress = getStartingProgress_ tracker_ref
run DriverParameters{..}
where
constructExplorationMode = constructExplorationMode_ . tree_configuration
shared_configuration_term = makeSharedConfigurationTerm tree_configuration_term
supervisor_configuration_term =
SupervisorConfiguration
<$> checkpoint_configuration_term
<*> maybe_workload_buffer_size_configuration_term
<*> statistics_configuration_term
<*> show_cpu_time_term
program_info = program_info_ { man = mainMan }
initializeGlobalState SharedConfiguration{logging_configuration=LoggingConfiguration{..}} = do
case maybe_log_format of
Nothing → return ()
Just log_format → do
handler ← flip setFormatter (simpleLogFormatter log_format) <$> streamHandler stdout log_level
updateGlobalLogger rootLoggerName $ setHandlers [handler]
updateGlobalLogger rootLoggerName (setLevel log_level)
constructTree = constructTree_ . tree_configuration
getStartingProgress_
tracker_ref
shared_configuration
SupervisorConfiguration{checkpoint_configuration=CheckpointConfiguration{..},..}
= do
case maybe_checkpoint_path of
Nothing → do
infoM "Checkpointing is NOT enabled"
newCPUTimeTracker 0 >>= writeIORef tracker_ref
return initial_progress
Just checkpoint_path → do
infoM $ "Checkpointing enabled"
infoM $ "Checkpoint file is " ++ checkpoint_path
infoM $ "Checkpoint interval is " ++ show checkpoint_interval ++ " seconds"
ifM (doesFileExist checkpoint_path)
(do infoM "Loading existing checkpoint file"
ProgressAndCPUTime progress initial_cpu_time ← either error id . decodeLazy <$> readFile checkpoint_path
newCPUTimeTracker (realToFrac initial_cpu_time) >>= writeIORef tracker_ref
return progress
)
(newCPUTimeTracker 0 >>= writeIORef tracker_ref >> return initial_progress)
where
initial_progress = initialProgress . constructExplorationMode $ shared_configuration
notifyTerminated_main
tracker_ref
SharedConfiguration{..}
SupervisorConfiguration{checkpoint_configuration=CheckpointConfiguration{..},..}
run_outcome@RunOutcome{..}
= do
cpu_time ← readIORef tracker_ref >>= getCurrentCPUTime
case maybe_checkpoint_path of
Nothing → doEndOfRun cpu_time
Just checkpoint_path →
do doEndOfRun cpu_time
infoM "Deleting any remaining checkpoint file"
removeFileIfExists checkpoint_path
`finally`
do
case runTerminationReason of
Aborted checkpoint → writeCheckpointFile checkpoint_path checkpoint cpu_time
Failure checkpoint _ → writeCheckpointFile checkpoint_path checkpoint cpu_time
_ → return ()
where
StatisticsConfiguration{..} = statistics_configuration
doEndOfRun cpu_time = do
if log_end_stats_configuration
then writeStatisticsToLog
log_stats_level_configuration
pastTense
runStatistics
end_stats_configuration
else mapM_ (hPutStrLn stderr)
.
map snd
.
generateStatistics pastTense runStatistics
$
end_stats_configuration
when show_cpu_time . hPutStrLn stderr $
"Total CPU time used was " ++ showWithUnitPrefix cpu_time ++ "seconds."
hFlush stderr
notifyTerminated_user tree_configuration run_outcome
constructController_ = const . controllerLoop
extractRunOutcomeFromSupervisorOutcome ::
Show worker_id ⇒
SupervisorOutcome fv ip worker_id →
RunOutcome ip fv
extractRunOutcomeFromSupervisorOutcome SupervisorOutcome{..} = RunOutcome{..}
where
runTerminationReason =
case supervisorTerminationReason of
SupervisorAborted remaining_progress → Aborted remaining_progress
SupervisorCompleted result → Completed result
SupervisorFailure remainig_progress worker_id message →
Failure remainig_progress $ "Worker " ++ show worker_id ++ " failed with message: " ++ message
runStatistics = supervisorRunStatistics
mainMan :: [ManBlock]
mainMan =
[S "Log Formatting"
,P "The following are the variables you can use in the format string:"
,I "$msg" "The actual log message"
,I "$loggername" "The name of the logger"
,I "$prio" "The priority level of the message"
,I "$tid" "The thread ID"
,I "$pid" "Process ID (Not available on windows)"
,I "$time" "The current time"
,I "$utcTime" "The current time in UTC Time"
]
++
[S "Statistics",P "Each statistic has a long-form name and an abbreviated name (in parentheses) shown below; you may use either when specifying it"]
++
map (I <$> (printf "%s (%s)" <$> statisticLongName <*> statisticShortName) <*> statisticDescription) statistics
mainParser :: Term α → TermInfo → IO α
mainParser term term_info =
(if null (termName term_info)
then getProgName >>= \progname → return $ term_info {termName = progname}
else return term_info
) >>= exec . (term,)
statistics :: [Statistic]
statistics =
[Statistic "walltimes" "times"
"the starting, ending, and duration (wall) time of the run"
(\tense RunStatistics{..} →
let total_time = realToFrac runWallTime :: Float
in tense
(printf "The run started at %s, and so far it has run for %sseconds."
(show runStartTime)
(showWithUnitPrefix total_time)
)
(printf "The run started at %s, ended at %s, and took %sseconds."
(show runStartTime)
(show runEndTime)
(showWithUnitPrefix total_time)
)
)
,Statistic "supervisor-occupation" "supocc"
"the percentage of the time that the supervisor was occupied"
(\tense RunStatistics{..} →
printf "The supervior %s occupied for %.2f%% of the time so far, of which %.2f%% %s spent inside the SupervisorMonad."
(tense "has been" "was")
(runSupervisorOccupation*100)
(runSupervisorOccupation/runSupervisorMonadOccupation*100)
(tense "has been" "was")
)
,Statistic "supervisor-calls" "supcalls"
"the number of calls made to functions in the Supervisor module as well as the average time per call"
(\tense RunStatistics{..} →
printf "There %s %i calls made to functions in the Supervisor module, each of which took an average of %sseconds to complete."
(tense "have been" "were")
runNumberOfCalls
(showWithUnitPrefix runAverageTimePerCall)
)
,Statistic "worker-count" "workcnt"
"statistics about the number of participating workers"
(\_ RunStatistics{..} →
let FunctionOfTimeStatistics{..} = runWorkerCountStatistics
in if timeMax == 0
then
"No workers participated in this run."
else if timeMin == 0
then
printf "On average there were %.1f +/- %.1f (std. dev) workers participating in the run; never more than %i."
timeAverage
timeStdDev
timeMax
else
printf "On average there were %.1f +/- %.1f (std. dev) workers participating in the run; never more than %i nor fewer than %i."
timeAverage
timeStdDev
timeMax
timeMin
)
,Statistic "worker-occupation" "workocc"
"the average percentage of the time that the workers were occupied"
(\tense RunStatistics{..} →
printf "Workers %s occupied for %.2f%% of the time on average."
(tense "have been" "were")
(runWorkerOccupation*100)
)
,Statistic "worker-waiting-times" "workwait"
"statistics about the amount of time that it took for a worker to obtain a new workload after finishing a workload"
(\tense RunStatistics{..} →
let FunctionOfTimeStatistics{..} = runWorkerWaitTimes
total_time = realToFrac runWallTime :: Float
in if timeCount == 0
then
printf "At no point %s a worker receive%s a new workload after finishing its current workload."
(tense "has" "did")
(tense "d" "")
else
if timeMax == 0
then
printf "Workers %scompleted their workload and obtained a new one %i times and never once has any had to wait to receive a new workload."
(tense "have " "")
timeCount
else
printf (
intercalate "\n"
["Workers %scompleted their task and obtained a new workload %i times with an average of one every %sseconds or %.1g enqueues/second."
,"The minimum waiting time %s %sseconds, and the maximum waiting time %s %sseconds."
,"On average, a worker %shad to wait %sseconds +/- %sseconds (std. dev) for a new workload."
]
)
(tense "have " "")
timeCount
(showWithUnitPrefix $ total_time / fromIntegral timeCount)
(fromIntegral timeCount / total_time)
(tense "has been" "was")
(showWithUnitPrefix timeMin)
(tense "has been" "was")
(showWithUnitPrefix timeMax)
(tense "has " "")
(showWithUnitPrefix timeAverage)
(showWithUnitPrefix timeStdDev)
)
,Statistic "steal-waiting-times" "stealwait"
"statistics about the amount of time needed to steal a workload"
(\tense RunStatistics{..} →
let IndependentMeasurementsStatistics{..} = runStealWaitTimes
total_time = realToFrac runWallTime :: Float
in if statCount == 0
then
printf "No workloads %s stolen."
(tense "have been" "were")
else
printf (
intercalate "\n"
["Workloads %s stolen %i times with an average of %sseconds between each steal or %.1g steals/second."
,"The minimum waiting time for a steal %s %sseconds, and the maximum waiting time %s %sseconds."
,"On average, it %s %sseconds +/- %sseconds (std. dev) to steal a workload."
]
)
(tense "have been" "were")
statCount
(showWithUnitPrefix $ total_time / fromIntegral statCount)
(fromIntegral statCount / total_time)
(tense "has been" "was")
(showWithUnitPrefix statMin)
(tense "has been" "was")
(showWithUnitPrefix statMax)
(tense "has taken" "took")
(showWithUnitPrefix statAverage)
(showWithUnitPrefix statStdDev)
)
,Statistic "waiting-worker-count" "waitworkcnt"
"statistics about the number of waiting workers"
(\tense RunStatistics{..} →
let FunctionOfTimeStatistics{..} = runWaitingWorkerStatistics
in if timeMax == 0
then
printf "No worker %s to wait for a workload to become available."
(tense "has had" "ever had")
else if timeMin == 0
then
printf "On average, %.1f +/- %.1f (std. dev) workers %s waiting for a workload at any given time; never more than %i."
timeAverage
timeStdDev
(tense "have been" "were")
timeMax
else
printf "On average, %.1f +/- %.1f (std. dev) workers %s waiting for a workload at any given time; never more than %i nor fewer than %i."
timeAverage
timeStdDev
(tense "have been" "were")
timeMax
timeMin
)
,Statistic "available-workload-count" "avlwldcnt"
"This option will cause statistics about the number of available workloads to be printed to standard error after the program terminates."
(\tense RunStatistics{..} →
let FunctionOfTimeStatistics{..} = runAvailableWorkloadStatistics
in if timeMax == 0
then
printf "No workload %s to wait for an available worker."
(tense "has had" "ever had")
else if timeMin == 0
then
printf "On average, %.1f +/- %.1f (std. dev) workloads %s waiting for a worker at any given time; never more than %i."
timeAverage
timeStdDev
(tense "have been" "were")
timeMax
else
printf "On average, %.1f +/- %.1f (std. dev) workloads %s waiting for a worker at any given time; never more than %i nor fewer than %i."
timeAverage
timeStdDev
(tense "have been" "were")
timeMax
timeMin
)
,Statistic "instant-workload-request-rate" "instworkreq"
"statistics about the (roughly) instantaneous rate at which workloads were requested by finished workers (obtained via exponential smoothing over a time scale of one second)"
(\tense RunStatistics{..} →
let FunctionOfTimeStatistics{..} = runInstantaneousWorkloadRequestRateStatistics
in printf "On average, the instantanenous rate at which workloads %s being requested %s %.1f +/- %.1f (std. dev) requests per second; the rate %s below %.1f nor %s above %.1f."
(tense "are" "were")
(tense "is" "was")
timeAverage
timeStdDev
(tense "has never fallen" "never fell")
timeMin
(tense "risen" "rose")
timeMax
)
,Statistic "instant-workload-steal-times" "instwldsteal"
"statistics about the (roughly) instantaneous amount of time that it took to steal a workload (obtained via exponential smoothing over a time scale of one second"
(\tense RunStatistics{..} →
let FunctionOfTimeStatistics{..} = runInstantaneousWorkloadStealTimeStatistics
in printf "On average, the instantaneous time to steal a workload %s %sseconds +/- %sseconds (std. dev); this time interval %s below %sseconds nor %s above %sseconds."
(tense "has been" "was")
(showWithUnitPrefix timeAverage)
(showWithUnitPrefix timeStdDev)
(tense "has never fallen" "never fell")
(showWithUnitPrefix timeMin)
(tense "risen" "rose")
(showWithUnitPrefix timeMax)
)
]
checkpoint_configuration_term :: Term CheckpointConfiguration
checkpoint_configuration_term =
CheckpointConfiguration
<$> value (flip opt (
(optInfo ["c","checkpoint-file"])
{ optName = "FILEPATH"
, optDoc = unwords
["This enables periodic checkpointing with the given path"
,"specifying the location of the checkpoint file; if the file"
,"already exists then it will be loaded as the initial starting"
,"point for the search."
]
}
) Nothing)
<*> value (flip opt (
(optInfo ["i","interval"])
{ optName = "SECONDS"
, optDoc = unwords
["If checkpointing is enabled, this specifies how often a"
,"checkpoint will be written; if checkpointing is not enabled,"
,"then it sets how often a global progress update is performed"
,"(which matters when workers will join and leave during the"
,"run so that their partial progress is not lost). This"
,"quantity is given in seconds, and not only may it be"
,"fractional but in fact a decimal point is required as"
,"otherwise the argument parser gets confused."
]
}
) 60)
logging_configuration_term :: Term LoggingConfiguration
logging_configuration_term =
LoggingConfiguration
<$> (value . opt WARNING $
(optInfo ["l","log-level"])
{ optName = "LEVEL"
, optDoc = "This specifies the upper bound (inclusive) on the importance of the messages that will be logged; it must be one of (in increasing order of importance): DEBUG, INFO, NOTICE, WARNING, ERROR, CRITICAL, ALERT, or EMERGENCY."
}
)
<*> (value . opt Nothing $
(optInfo ["log-format"])
{ optName = "FORMAT"
, optDoc = "This specifies the format of logged messages; see the Log Formatting section for more details."
}
)
show_cpu_time_term :: Term Bool
show_cpu_time_term =
value . flag $
(optInfo ["show-cpu-time"])
{ optDoc = "Print the total CPU time when the run finishes."
}
statistics_configuration_term :: Term StatisticsConfiguration
statistics_configuration_term =
StatisticsConfiguration
<$> (value . optAll [] $
(optInfo ["s","end-stats"])
{ optName = "STATS"
, optDoc = "A comma-separated list of statistics to be printed to stderr at the end of the run; you may alternatively specify multiple statistics by using this option multiple times. (See the Statistics section for more information.)"
}
)
<*> (value . flag $
(optInfo ["log-end-stats"])
{ optDoc = "If present, then the end-of-run stats are sent to the log instead of stderr."
}
)
<*> (value . optAll [] $
(optInfo ["log-stats"])
{ optName = "STATS"
, optDoc = "A comma-separated list of statistics to be regularly logged during the run level; you may alternatively specify multiple statistics by using this option multiple times. (See the Statistics section for more information.)"
}
)
<*> (value . opt NOTICE $
(optInfo ["log-stats-level"])
{ optName = "STATS"
, optDoc = "The level at which to log the stats."
}
)
<*> (value . opt 60 $
(optInfo ["log-stats-interval"])
{ optName = "SECONDS"
, optDoc = "The time between logging statistics (in seconds, decimals allowed); it is ignored if no statistics have been enabled for logging."
}
)
maybe_workload_buffer_size_configuration_term :: Term (Maybe Int)
maybe_workload_buffer_size_configuration_term =
value (opt Nothing ((optInfo ["buffer-size"])
{ optName = "SIZE"
, optDoc = unwords
["This option sets the size of the workload buffer which contains"
,"stolen workloads that are held at the supervisor so that if a"
,"worker needs a new workload it can be given one immediately rather"
,"than having to wait for a new workload to be stolen. This setting"
,"should be large enough that a request for a new workload can"
,"always be answered immediately using a workload from the buffer,"
,"which is roughly a function of the product of the number of"
,"workloads requested per second and the time needed to steal a new"
,"workload (both of which are server statistics than you can request"
,"to see upon completions). If you are not having problems with"
,"scaling, then you can ignore this option (it defaults to 4)."
]
}
))
makeSharedConfigurationTerm :: Term tree_configuration → Term (SharedConfiguration tree_configuration)
makeSharedConfigurationTerm tree_configuration_term =
SharedConfiguration
<$> logging_configuration_term
<*> tree_configuration_term
checkpointLoop ::
( RequestQueueMonad m
, Serialize (ProgressFor (ExplorationModeFor m))
) ⇒ CPUTimeTracker → CheckpointConfiguration → m α
checkpointLoop tracker CheckpointConfiguration{..} =
case maybe_checkpoint_path of
Nothing → forever $ requestProgressUpdate >> delay
Just checkpoint_path → flip evalStateT False . forever $
(do join $
liftM2 (liftIO .* writeCheckpointFile checkpoint_path)
(lift requestProgressUpdate)
(liftIO (getCurrentCPUTime tracker))
infoM $ "Checkpoint written to " ++ show checkpoint_path
State.get >>= (flip when $ noticeM "The problem with the checkpoint has been resolved.")
State.put False
) `catch` (\(e::SomeException) →
unless (isJust . (fromException :: SomeException → Maybe AsyncException) $ e) $ do
let message = "Failed writing checkpoint to \"" ++ checkpoint_path ++ "\" with error \"" ++ show e ++ "\"; will keep retrying in case the problem gets resolved."
ifM State.get (infoM message) (errorM message)
State.put True
)
where
delay = liftIO . threadDelay $ amount
where
amount = round $ checkpoint_interval * 1000000
statisticsLoop :: RequestQueueMonad m ⇒ [[Statistic]] → Priority → Float → m α
statisticsLoop stats level interval = forever $ do
liftIO $ threadDelay delay
run_statistics ← getCurrentStatistics
liftIO $
writeStatisticsToLog
level
pastPerfectTense
run_statistics
stats
where
delay = round $ interval * 1000000
controllerLoop ::
( RequestQueueMonad m
, Serialize (ProgressFor (ExplorationModeFor m))
) ⇒ IORef CPUTimeTracker → SupervisorConfiguration → m ()
controllerLoop tracker_ref SupervisorConfiguration{statistics_configuration=StatisticsConfiguration{..},..} = do
tracker ← liftIO . readIORef $ tracker_ref
startCPUTimeTracker tracker
maybe (return ()) setWorkloadBufferSize maybe_workload_buffer_size_configuration
void . fork $ checkpointLoop tracker checkpoint_configuration
when (not . null $ log_stats_configuration) $
void . fork $ statisticsLoop log_stats_configuration log_stats_level_configuration log_stats_interval_configuration
default_terminfo :: TermInfo
default_terminfo = defTI { termDoc = "LogicGrowsOnTrees program" }
dispatchToMainFunction f driver notifyTerminated tree =
f driver
(pure ())
default_terminfo
(const notifyTerminated)
(const tree)
generateStatistics :: Tense → RunStatistics → [[Statistic]] → [(String,String)]
generateStatistics tense run_statistics =
map (
statisticLongName
&&&
(\(statisticApplication → apply) → apply tense run_statistics)
)
.
nub
.
concat
pastPerfectTense, pastTense :: Tense
pastPerfectTense x _ = x
pastTense _ x = x
removeFileIfExists :: FilePath → IO ()
removeFileIfExists path =
handleJust
(\e → if isDoesNotExistError e then Just () else Nothing)
(\_ → return ())
(removeFile path)
showWithUnitPrefix :: Real n ⇒ n → String
showWithUnitPrefix 0 = "0 "
showWithUnitPrefix x = printf "%.1f %s" x_scaled (unitName unit)
where
(x_scaled :: Float,Just unit) = formatValue (Left FormatSiAll) . realToFrac $ x
writeStatisticsToLog :: Priority → Tense → RunStatistics → [[Statistic]] → IO ()
writeStatisticsToLog level =
(\outputs →
unless (null outputs) $ do
logIt "=== BEGIN STATISTICS ==="
forM_ outputs $ \(name,output) → logIt (name ++ ": " ++ output)
logIt "=== END STATISTICS ==="
)
.**
generateStatistics
where
logIt = logM "LogicGrowsOnTrees.Parallel.Main" level
writeCheckpointFile :: (Serialize ip, MonadIO m) ⇒ FilePath → ip → NominalDiffTime → m ()
writeCheckpointFile checkpoint_path checkpoint cpu_time = do
infoM $ "Writing checkpoint file"
liftIO $
(do writeFile checkpoint_temp_path . encodeLazy $ ProgressAndCPUTime checkpoint (toRational cpu_time)
renameFile checkpoint_temp_path checkpoint_path
) `onException` (
removeFileIfExists checkpoint_temp_path
)
where
checkpoint_temp_path = checkpoint_path ++ ".tmp"