{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE NoMonomorphismRestriction #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UnicodeSyntax #-}

{-| This adapter implements parallelism by spawning multiple processes.  The
    number of processes can be changed during the run and even be set to zero.
 -}
module LogicGrowsOnTrees.Parallel.Adapter.Processes
    (
    -- * Driver
      driver
    -- * Controller
    , ProcessesControllerMonad
    , abort
    , changeNumberOfWorkersAsync
    , changeNumberOfWorkers
    , fork
    , getCurrentProgressAsync
    , getCurrentProgress
    , getCurrentStatisticsAsync
    , getCurrentStatistics
    , getNumberOfWorkersAsync
    , getNumberOfWorkers
    , requestProgressUpdateAsync
    , requestProgressUpdate
    , setNumberOfWorkersAsync
    , setNumberOfWorkers
    , setWorkloadBufferSize
    -- * Outcome types
    , RunOutcome
    , RunStatistics(..)
    , TerminationReason(..)
    -- * Generic runner functions
    -- $runners
    , runSupervisor
    , runWorker
    , runWorkerUsingHandles
    , runExplorer
    -- * Utility functions
    , getProgFilepath
    ) where

import Prelude hiding (catch)

import Control.Applicative ((<$>),(<*>),Applicative,liftA2)
import Control.Arrow (second)
import Control.Concurrent (forkIO)
import Control.Exception (AsyncException(ThreadKilled,UserInterrupt),SomeException,catch,catchJust,fromException)
import Control.Monad (forever,liftM2)
import Control.Monad.CatchIO (MonadCatchIO)
import Control.Monad.IO.Class (MonadIO,liftIO)
import Control.Monad.Trans.State.Strict (get,modify)

import qualified Data.Foldable as Fold
import Data.IntMap (IntMap)
import qualified Data.IntMap as IntMap
import Data.Maybe (fromMaybe)
import Data.Monoid (Monoid(mempty))
import Data.Serialize (Serialize)

import System.Console.CmdTheLine
import System.Environment (getArgs,getProgName)
import System.Environment.FindBin (getProgPath)
import System.FilePath ((</>))
import System.IO (Handle,hGetLine,stdin,stdout)
import System.IO.Error (isEOFError)
import qualified System.Log.Logger as Logger
import System.Log.Logger (Priority(DEBUG,ERROR))
import System.Log.Logger.TH
import System.Process (CreateProcess(..),CmdSpec(RawCommand),StdStream(..),ProcessHandle,createProcess,interruptProcessGroupOf)

import LogicGrowsOnTrees (TreeT)
import LogicGrowsOnTrees.Parallel.Common.Message
import LogicGrowsOnTrees.Parallel.Common.Process (runWorker,runWorkerUsingHandles)
import LogicGrowsOnTrees.Parallel.Common.RequestQueue
import LogicGrowsOnTrees.Parallel.Common.Workgroup hiding (C,unwrapC)
import LogicGrowsOnTrees.Parallel.ExplorationMode
import LogicGrowsOnTrees.Parallel.Main
    (Driver(..)
    ,DriverParameters(..)
    ,RunOutcome
    ,RunOutcomeFor
    ,RunStatistics(..)
    ,TerminationReason(..)
    ,mainParser
    )
import LogicGrowsOnTrees.Parallel.Purity
import LogicGrowsOnTrees.Utils.Handle
import LogicGrowsOnTrees.Utils.Word_

--------------------------------------------------------------------------------
----------------------------------- Loggers ------------------------------------
--------------------------------------------------------------------------------

deriveLoggers "Logger" [DEBUG,ERROR]

--------------------------------------------------------------------------------
------------------------------------ Driver ------------------------------------
--------------------------------------------------------------------------------

{-| This is the driver for the threads adapter; the number of workers is
    specified via. the (required) command-line option "-n".

    Note that there are not seperate drivers for the supervisor process and the
    worker process; instead, the same executable is used for both the supervisor
    and the worker, with a sentinel argument (or arguments) determining which
    role it should run as.
 -}
driver ::
    ( Serialize shared_configuration
    , Serialize (ProgressFor exploration_mode)
    , Serialize (WorkerFinishedProgressFor exploration_mode)
    ) 
    Driver IO shared_configuration supervisor_configuration m n exploration_mode
driver = Driver $ \DriverParameters{..}  do
    runExplorer
        constructExplorationMode
        purity
        (mainParser (liftA2 (,) shared_configuration_term (liftA2 (,) number_of_processes_term supervisor_configuration_term)) program_info)
        initializeGlobalState
        constructTree
        (curry $ uncurry getStartingProgress . second snd)
        (\shared_configuration (Word_ number_of_processes,supervisor_configuration)  do
            setNumberOfWorkers number_of_processes
            constructController shared_configuration supervisor_configuration
        )
    >>=
    maybe (return ()) (notifyTerminated <$> fst . fst <*> snd . snd . fst <*> snd)
  where
    number_of_processes_term = required (flip opt (
        (optInfo ["n","number-of-processes"])
        {   optName = "#"
        ,   optDoc = "This *required* option specifies the number of worker processes to spawn."
        }
        ) Nothing )
{-# INLINE driver #-}

--------------------------------------------------------------------------------
---------------------------------- Controller ----------------------------------
--------------------------------------------------------------------------------

{-| The monad in which the processes controller will run. -}
newtype ProcessesControllerMonad exploration_mode α =
    C { unwrapC :: WorkgroupControllerMonad (IntMap Worker) exploration_mode α
    } deriving (Applicative,Functor,Monad,MonadCatchIO,MonadIO,RequestQueueMonad,WorkgroupRequestQueueMonad)

instance HasExplorationMode (ProcessesControllerMonad exploration_mode) where
    type ExplorationModeFor (ProcessesControllerMonad exploration_mode) = exploration_mode

--------------------------------------------------------------------------------
------------------------------- Generic runners --------------------------------
--------------------------------------------------------------------------------

{- $runners
In this section the full functionality of this module is exposed in case one
does not want the restrictions of the driver interface. If you decide to go in
this direction, then you need to decide whether you want there to be a single
executable for both the supervisor and worker with the process of determining in
which mode it should run taken care of for you, or whether you want to do this
yourself in order to give yourself more control (such as by having separate
supervisor and worker executables) at the price of more work.

If you want to use a single executable with automated handling of the
supervisor and worker roles, then use 'runExplorer'.  Otherwise, use
'runSupervisor' to run the supervisor loop and on each worker use
'runWorkerUsingHandles', passing 'stdin' and 'stdout' as the process handles.
 -}

{-| This runs the supervisor, which will spawn and kill worker processes as
    needed so that the total number is equal to the number set by the
    controller.
 -}
runSupervisor ::
    ( Serialize (ProgressFor exploration_mode)
    , Serialize (WorkerFinishedProgressFor exploration_mode)
    ) 
    ExplorationMode exploration_mode {-^ the exploration mode -} 
    String {-^ the path to the worker executable -} 
    [String] {-^ the arguments to pass to the worker executable -} 
    (Handle  IO ()) {-^ an action that writes any information needed by the worker to the given handle -} 
    ProgressFor exploration_mode {-^ the initial progress of the run -} 
    ProcessesControllerMonad exploration_mode () {-^ the controller of the supervisor, which must at least set the number of workers to be positive for anything to take place -} 
    IO (RunOutcomeFor exploration_mode) {-^ the result of the run -}
runSupervisor
    exploration_mode
    worker_filepath
    worker_arguments
    sendConfigurationTo
    starting_progress
    (C controller)
 = do
    runWorkgroup
        exploration_mode
        mempty
        (\message_receivers@MessageForSupervisorReceivers{..} 
            let createWorker worker_id = do
                    debugM $ "Launching worker process: " ++ worker_filepath ++ " " ++ unwords worker_arguments
                    (Just write_handle,Just read_handle,Just error_handle,process_handle)  liftIO . createProcess $
                        CreateProcess
                        {   cmdspec = RawCommand worker_filepath worker_arguments
                        ,   cwd = Nothing
                        ,   env = Nothing
                        ,   std_in = CreatePipe
                        ,   std_out = CreatePipe
                        ,   std_err = CreatePipe
                        ,   close_fds = True
                        ,   create_group = True
                        }
                    liftIO $ do
                        _  forkIO $
                            catchJust
                                (\e  if isEOFError e then Just () else Nothing)
                                (forever $ hGetLine error_handle >>= \line  debugM $ "[" ++ show worker_id ++ "] " ++ line)
                                (const $ return ())
                             `catch`
                                (\(e::SomeException)  errorM $ "Error reading stderr for worker " ++ show worker_id ++ ": " ++ show e)
                        _  forkIO $
                            receiveAndProcessMessagesFromWorkerUsingHandle message_receivers read_handle worker_id
                            `catch`
                            (\(e::SomeException) 
                            case fromException e of
                                Just ThreadKilled  return ()
                                Just UserInterrupt  return ()
                                _  do
                                    debugM $ "Worker " ++ show worker_id ++ " failed with exception: " ++ show e
                                    interruptProcessGroupOf process_handle
                                    receiveFailureFromWorker worker_id (show e)
                            )
                        sendConfigurationTo write_handle
                    modify . IntMap.insert worker_id $ Worker{..}

                destroyWorker worker_id _ = do
                    debugM $ "Sending QuitWorker to " ++ show worker_id ++ "..."
                    get >>=
                        liftIO
                        .
                        flip send QuitWorker
                        .
                        write_handle
                        .
                        fromJustOrBust ("destroyWorker failed to get record for " ++ show worker_id)
                        .
                        IntMap.lookup worker_id
                    debugM $ "Finished sending QuitWorker to " ++ show worker_id ++ "."
                    modify $ IntMap.delete worker_id

                killAllWorkers _ =
                    debugM "Killing all workers..." >>
                    get >>= liftIO . Fold.mapM_ (flip send QuitWorker . write_handle) >>
                    debugM "Done killing all workers."

                sendMessageToWorker message worker_id =
                    get >>=
                        liftIO
                        .
                        maybe (return ()) (
                            flip send message
                            .
                            write_handle
                        )
                        .
                        IntMap.lookup worker_id

                sendProgressUpdateRequestTo = sendMessageToWorker RequestProgressUpdate
                sendWorkloadStealRequestTo = sendMessageToWorker RequestWorkloadSteal
                sendWorkloadTo worker_id workload = sendMessageToWorker (StartWorkload workload) worker_id
            in WorkgroupCallbacks{..}
        )
        starting_progress
        controller
{-# INLINE runSupervisor #-}

{-| Explores the given tree using multiple processes to achieve parallelism.

    This function grants access to all of the functionality of this adapter,
    rather than having to go through the more restricted driver interface. The
    signature of this function is very complicated because it is meant to be
    used in both the supervisor and worker;  it figures out which role it is
    supposed to play based on whether the list of command line arguments matches
    a sentinel.

    The configuration information is divided into two parts: information shared
    between the supervisor and the workers, and information that is specific to
    the supervisor and not sent to the workers. (Note that only the former needs
    to be serializable.) An action must be supplied that obtains this
    configuration information, and most of the arguments are functions that are
    given all or part of this information.
 -}
runExplorer ::
    ( Serialize shared_configuration
    , Serialize (ProgressFor exploration_mode)
    , Serialize (WorkerFinishedProgressFor exploration_mode)
    ) 
    (shared_configuration  ExplorationMode exploration_mode)
        {-^ a function that constructs the exploration mode given the shared
            configuration
         -} 
    Purity m n {-^ the purity of the tree -} 
    IO (shared_configuration,supervisor_configuration)
        {-^ an action that gets the shared and supervisor-specific configuration
            information (run only on the supervisor)
         -} 
    (shared_configuration  IO ())
        {-^ an action that initializes the global state of the process given the
            shared configuration (run on both supervisor and worker processes)
         -} 
    (shared_configuration  TreeT m (ResultFor exploration_mode))
        {-^ a function that constructs the tree from the shared configuration
            (called only on the worker)
         -} 
    (shared_configuration  supervisor_configuration  IO (ProgressFor exploration_mode))
        {-^ an action that gets the starting progress given the full
            configuration information (run only on the supervisor)
         -} 
    (shared_configuration  supervisor_configuration  ProcessesControllerMonad exploration_mode ())
        {-^ a function that constructs the controller for the supervisor, which
            must at least set the number of workers to be non-zero (called only
            on the supervisor)
         -} 
    IO (Maybe ((shared_configuration,supervisor_configuration),RunOutcomeFor exploration_mode))
        {-^ if this process is the supervisor, then the outcome of the run as
            well as the configuration information wrapped in 'Just'; otherwise
            'Nothing'
         -}
runExplorer
    constructExplorationMode
    purity
    getConfiguration
    initializeGlobalState
    constructTree
    getStartingProgress
    constructController
  = getArgs >>= \args 
    if args == sentinel
        then do
            shared_configuration  receive stdin
            initializeGlobalState shared_configuration
            runWorkerUsingHandles
                (constructExplorationMode shared_configuration)
                purity
                (constructTree shared_configuration)
                stdin
                stdout
            return Nothing
        else do
            configuration@(shared_configuration,supervisor_configuration)  getConfiguration
            initializeGlobalState shared_configuration
            program_filepath  getProgFilepath
            starting_progress  getStartingProgress shared_configuration supervisor_configuration
            termination_result 
                runSupervisor
                    (constructExplorationMode shared_configuration)
                    program_filepath
                    sentinel
                    (flip send shared_configuration)
                    starting_progress
                    (constructController shared_configuration supervisor_configuration)
            return $ Just (configuration,termination_result)
  where
    sentinel = ["explorer","worker","bee"]
{-# INLINE runExplorer #-}

--------------------------------------------------------------------------------
------------------------------- Utility funtions -------------------------------
--------------------------------------------------------------------------------

{-| Gets the full path to this executable. -}
getProgFilepath :: IO String
getProgFilepath = liftM2 (</>) getProgPath getProgName

--------------------------------------------------------------------------------
----------------------------------- Internal -----------------------------------
--------------------------------------------------------------------------------

data Worker = Worker
    {   read_handle :: Handle
    ,   write_handle :: Handle
    ,   process_handle :: ProcessHandle
    }

fromJustOrBust message = fromMaybe (error message)