module LogicGrowsOnTrees.Parallel.Adapter.Processes
(
driver
, ProcessesControllerMonad
, abort
, changeNumberOfWorkersAsync
, changeNumberOfWorkers
, fork
, getCurrentProgressAsync
, getCurrentProgress
, getNumberOfWorkersAsync
, getNumberOfWorkers
, requestProgressUpdateAsync
, requestProgressUpdate
, setNumberOfWorkersAsync
, setNumberOfWorkers
, setWorkloadBufferSize
, RunOutcome(..)
, RunStatistics(..)
, TerminationReason(..)
, runSupervisor
, runWorker
, runWorkerUsingHandles
, runExplorer
, getProgFilepath
) where
import Prelude hiding (catch)
import Control.Applicative ((<$>),(<*>),Applicative,liftA2)
import Control.Arrow (second)
import Control.Concurrent (ThreadId,forkIO,getNumCapabilities,killThread)
import Control.Exception (AsyncException(ThreadKilled,UserInterrupt),SomeException,catch,catchJust,fromException)
import Control.Monad (forever,liftM2,unless,void)
import Control.Monad.CatchIO (MonadCatchIO)
import Control.Monad.IO.Class (MonadIO,liftIO)
import Control.Monad.Trans.State.Strict (get,modify)
import qualified Data.Foldable as Fold
import Data.Function (fix)
import Data.IntMap (IntMap)
import qualified Data.IntMap as IntMap
import Data.Maybe (fromJust,fromMaybe)
import Data.Monoid (Monoid(mempty))
import Data.Serialize (Serialize)
import System.Console.CmdTheLine
import System.Environment (getArgs,getProgName)
import System.Environment.FindBin (getProgPath)
import System.FilePath ((</>))
import System.IO (Handle,hGetLine,stdin,stdout)
import System.IO.Error (isEOFError)
import qualified System.Log.Logger as Logger
import System.Log.Logger (Priority(DEBUG,INFO,ERROR))
import System.Log.Logger.TH
import System.Process (CreateProcess(..),CmdSpec(RawCommand),StdStream(..),ProcessHandle,createProcess,interruptProcessGroupOf)
import LogicGrowsOnTrees (Tree,TreeIO,TreeT)
import LogicGrowsOnTrees.Checkpoint
import LogicGrowsOnTrees.Parallel.Common.Message
import qualified LogicGrowsOnTrees.Parallel.Common.Process as Process
import LogicGrowsOnTrees.Parallel.Common.Process
import LogicGrowsOnTrees.Parallel.Common.RequestQueue
import LogicGrowsOnTrees.Parallel.Common.Worker
import LogicGrowsOnTrees.Parallel.Common.Workgroup hiding (C,unwrapC)
import LogicGrowsOnTrees.Parallel.ExplorationMode
import LogicGrowsOnTrees.Parallel.Main
(Driver(..)
,DriverParameters(..)
,RunOutcome
,RunOutcomeFor
,RunStatistics(..)
,TerminationReason(..)
,mainParser
)
import LogicGrowsOnTrees.Parallel.Purity
import LogicGrowsOnTrees.Utils.Handle
import LogicGrowsOnTrees.Utils.Word_
import LogicGrowsOnTrees.Workload
deriveLoggers "Logger" [DEBUG,INFO,ERROR]
driver ::
( Serialize shared_configuration
, Serialize (ProgressFor exploration_mode)
, Serialize (WorkerFinishedProgressFor exploration_mode)
) ⇒
Driver IO shared_configuration supervisor_configuration m n exploration_mode
driver = Driver $ \DriverParameters{..} → do
runExplorer
constructExplorationMode
purity
(mainParser (liftA2 (,) shared_configuration_term (liftA2 (,) number_of_processes_term supervisor_configuration_term)) program_info)
initializeGlobalState
constructTree
(curry $ uncurry getStartingProgress . second snd)
(\shared_configuration (Word_ number_of_processes,supervisor_configuration) → do
setNumberOfWorkers number_of_processes
constructController shared_configuration supervisor_configuration
)
>>=
maybe (return ()) (notifyTerminated <$> fst . fst <*> snd . snd . fst <*> snd)
where
number_of_processes_term = required (flip opt (
(optInfo ["n","number-of-processes"])
{ optName = "#"
, optDoc = "This *required* option specifies the number of worker processes to spawn."
}
) Nothing )
newtype ProcessesControllerMonad exploration_mode α =
C { unwrapC :: WorkgroupControllerMonad (IntMap Worker) exploration_mode α
} deriving (Applicative,Functor,Monad,MonadCatchIO,MonadIO,RequestQueueMonad,WorkgroupRequestQueueMonad)
instance HasExplorationMode (ProcessesControllerMonad exploration_mode) where
type ExplorationModeFor (ProcessesControllerMonad exploration_mode) = exploration_mode
runSupervisor ::
( Serialize (ProgressFor exploration_mode)
, Serialize (WorkerFinishedProgressFor exploration_mode)
) ⇒
ExplorationMode exploration_mode →
String →
[String] →
(Handle → IO ()) →
ProgressFor exploration_mode →
ProcessesControllerMonad exploration_mode () →
IO (RunOutcomeFor exploration_mode)
runSupervisor
exploration_mode
worker_filepath
worker_arguments
sendConfigurationTo
starting_progress
(C controller)
= do
request_queue ← newRequestQueue
runWorkgroup
exploration_mode
mempty
(\message_receivers@MessageForSupervisorReceivers{..} →
let createWorker worker_id = do
debugM $ "Launching worker process: " ++ worker_filepath ++ " " ++ unwords worker_arguments
(Just write_handle,Just read_handle,Just error_handle,process_handle) ← liftIO . createProcess $
CreateProcess
{ cmdspec = RawCommand worker_filepath worker_arguments
, cwd = Nothing
, env = Nothing
, std_in = CreatePipe
, std_out = CreatePipe
, std_err = CreatePipe
, close_fds = True
, create_group = True
}
liftIO $ do
_ ← forkIO $
catchJust
(\e → if isEOFError e then Just () else Nothing)
(forever $ hGetLine error_handle >>= \line → debugM $ "[" ++ show worker_id ++ "] " ++ line)
(const $ return ())
`catch`
(\(e::SomeException) → errorM $ "Error reading stderr for worker " ++ show worker_id ++ ": " ++ show e)
_ ← forkIO $
receiveAndProcessMessagesFromWorkerUsingHandle message_receivers read_handle worker_id
`catch`
(\(e::SomeException) →
case fromException e of
Just ThreadKilled → return ()
Just UserInterrupt → return ()
_ → do
debugM $ "Worker " ++ show worker_id ++ " failed with exception: " ++ show e
interruptProcessGroupOf process_handle
receiveFailureFromWorker worker_id (show e)
)
sendConfigurationTo write_handle
modify . IntMap.insert worker_id $ Worker{..}
destroyWorker worker_id worker_is_active = do
debugM $ "Sending QuitWorker to " ++ show worker_id ++ "..."
get >>=
liftIO
.
flip send QuitWorker
.
write_handle
.
fromJustOrBust ("destroyWorker failed to get record for " ++ show worker_id)
.
IntMap.lookup worker_id
debugM $ "Finished sending QuitWorker to " ++ show worker_id ++ "."
modify $ IntMap.delete worker_id
killAllWorkers _ =
debugM "Killing all workers..." >>
get >>= liftIO . Fold.mapM_ (flip send QuitWorker . write_handle) >>
debugM "Done killing all workers."
sendMessageToWorker message worker_id =
get >>=
liftIO
.
maybe (return ()) (
flip send message
.
write_handle
)
.
IntMap.lookup worker_id
sendProgressUpdateRequestTo = sendMessageToWorker RequestProgressUpdate
sendWorkloadStealRequestTo = sendMessageToWorker RequestWorkloadSteal
sendWorkloadTo worker_id workload = sendMessageToWorker (StartWorkload workload) worker_id
in WorkgroupCallbacks{..}
)
starting_progress
controller
runExplorer ::
( Serialize shared_configuration
, Serialize (ProgressFor exploration_mode)
, Serialize (WorkerFinishedProgressFor exploration_mode)
) ⇒
(shared_configuration → ExplorationMode exploration_mode)
→
Purity m n →
IO (shared_configuration,supervisor_configuration)
→
(shared_configuration → IO ())
→
(shared_configuration → TreeT m (ResultFor exploration_mode))
→
(shared_configuration → supervisor_configuration → IO (ProgressFor exploration_mode))
→
(shared_configuration → supervisor_configuration → ProcessesControllerMonad exploration_mode ())
→
IO (Maybe ((shared_configuration,supervisor_configuration),RunOutcomeFor exploration_mode))
runExplorer
constructExplorationMode
purity
getConfiguration
initializeGlobalState
constructTree
getStartingProgress
constructController
= getArgs >>= \args →
if args == sentinel
then do
shared_configuration ← receive stdin
initializeGlobalState shared_configuration
runWorkerUsingHandles
(constructExplorationMode shared_configuration)
purity
(constructTree shared_configuration)
stdin
stdout
return Nothing
else do
configuration@(shared_configuration,supervisor_configuration) ← getConfiguration
initializeGlobalState shared_configuration
program_filepath ← getProgFilepath
starting_progress ← getStartingProgress shared_configuration supervisor_configuration
termination_result ←
runSupervisor
(constructExplorationMode shared_configuration)
program_filepath
sentinel
(flip send shared_configuration)
starting_progress
(constructController shared_configuration supervisor_configuration)
return $ Just (configuration,termination_result)
where
sentinel = ["explorer","worker","bee"]
getProgFilepath :: IO String
getProgFilepath = liftM2 (</>) getProgPath getProgName
data Worker = Worker
{ read_handle :: Handle
, write_handle :: Handle
, process_handle :: ProcessHandle
}
fromJustOrBust message = fromMaybe (error message)