{-# LANGUAGE LambdaCase          #-}
{-# LANGUAGE OverloadedStrings   #-}
{-# LANGUAGE QuasiQuotes         #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell     #-}
-- | Executor for external tasks.
--
--   An executor will poll for tasks on the co-ordinator, mark them as in
--   progress and then execute them.
--
--   You probably want to start with 'executeLoop'.
module Control.Funflow.External.Executor where

import           Control.Arrow                        (second)
import           Control.Concurrent                   (threadDelay)
import           Control.Concurrent.Async
import           Control.Concurrent.MVar
import           Control.Exception.Safe
import qualified Control.Funflow.ContentStore         as CS
import           Control.Funflow.External
import           Control.Funflow.External.Coordinator
import qualified Control.Funflow.RemoteCache          as Remote
import           Control.Lens
import           Control.Monad                        (forever, mzero, unless)
import           Control.Monad.Trans                  (lift)
import           Control.Monad.Trans.Maybe
import qualified Data.Aeson                           as Json
import qualified Data.ByteString                      as BS
import           Data.Foldable                        (for_)
import           Data.Maybe                           (isJust, isNothing)
import           Data.Monoid                          ((<>))
import qualified Data.Text                            as T
import           GHC.IO.Handle                        (hClose)
import           Katip                                as K
import           Network.HostName
import           Path
import           Path.IO
import           System.Clock
import           System.Exit                          (ExitCode (..))
import           System.IO                            (Handle, IOMode (..),
                                                       openFile, stderr, stdout)
import           System.Posix.Env                     (getEnv)
import           System.Posix.User
import           System.Process

data ExecutionResult =
    -- | The result already exists in the store and there is no need
    --   to execute. This is also returned if the job is already running
    --   elsewhere.
    Cached
    -- | The computation is already running elsewhere. This is probably
    --   indicative of a bug, because the coordinator should only allow one
    --   instance of a task to be running at any time.
  | AlreadyRunning
    -- | Execution completed successfully after a certain amount of time.
  | Success TimeSpec
    -- | Execution failed with the following exit code.
    --   TODO where should logs go?
  | Failure TimeSpec Int
    -- | The executor itself failed to execute the external task.
    --   E.g. because the executable was not found.
  | ExecutorFailure IOException

-- | Execute an individual task.
execute :: CS.ContentStore -> TaskDescription -> KatipContextT IO ExecutionResult
execute store td = logError $ do
     -- We should pass a Remote cacher down to it:
  status <- CS.withConstructIfMissing store Remote.NoCache (td ^. tdOutput) $ \fp -> do
    (fpOut, hOut) <- lift $
      CS.createMetadataFile store (td ^. tdOutput) [relfile|stdout|]
    (fpErr, hErr) <- lift $
      CS.createMetadataFile store (td ^. tdOutput) [relfile|stderr|]
    let
      withFollowOutput
        | td ^. tdTask . etWriteToStdOut . to outputCaptureToRelFile
              . to isNothing
        = withFollowFile fpErr stderr
        | otherwise
        = withFollowFile fpErr stderr
        . withFollowFile fpOut stdout
      cmd = T.unpack $ td ^. tdTask . etCommand
      procSpec params textEnv = (proc cmd $ T.unpack <$> params)
        { cwd = Just (fromAbsDir fp)
        , close_fds = True
        , std_err = UseHandle hErr
        , std_out = UseHandle hOut
        , env = map (bimap T.unpack T.unpack) <$> textEnv
        }
      convParam = ConvParam
        { convPath = pure . CS.itemPath store
        , convEnv = \e -> T.pack <$> MaybeT (getEnv $ T.unpack e)
        , convUid = lift getEffectiveUserID
        , convGid = lift getEffectiveGroupID
        , convOut = pure fp
        }
    mbParams <- lift $ runMaybeT $
      traverse (paramToText convParam) (td ^. tdTask . etParams)
    mbEnv <- case td ^. tdTask . etEnv of
      EnvInherit -> pure Nothing
      EnvExplicit x ->
        (lift . runMaybeT $ traverse (sequence . second (paramToText convParam)) x)
        >>= \case
          Nothing -> fail "A parameter was not ready"
          jp -> return jp
    params <- case mbParams of
      Nothing     -> fail "A parameter was not ready"
      Just params -> return params

    let
      inputItems :: [CS.Item]
      inputItems = do
        Param fields <- td ^. tdTask . etParams
        ParamPath inputPath <- fields
        case inputPath of
          IPItem item      -> pure item
          -- XXX: Store these references as well.
          IPExternalFile _ -> mzero
          IPExternalDir _  -> mzero
    CS.setInputs store (td ^. tdOutput) inputItems
    CS.setMetadata store (td ^. tdOutput)
      ("external-task"::T.Text)
      (Json.encode (td ^. tdTask))

    start <- lift $ getTime Monotonic
    let theProc = procSpec params mbEnv
    katipAddNamespace "process" . katipAddContext (sl "processId" $ show theProc) $ do
      $(logTM) InfoS "Executing"
      res <- lift $ tryIO $ withCreateProcess theProc $ \_ _ _ ph ->
        -- Error output should be displayed on our stderr stream
        withFollowOutput $ do
          exitCode <- waitForProcess ph
          hClose hErr
          hClose hOut
          end <- getTime Monotonic
          case exitCode of
            ExitSuccess   -> do
              for_ (td ^. tdTask . etWriteToStdOut . to outputCaptureToRelFile)
                $ \file -> copyFile fpOut (fp </> file)
              return $ Right (diffTimeSpec start end)
            ExitFailure i ->
              return $ Left (diffTimeSpec start end, i)
      case res of
        -- execution was successful
        Right (Right r) -> return $ Right r
        -- execution failed
        Right (Left e)  -> return $ Left (Right e)
        -- executor itself failed
        Left e          -> return $ Left (Left e)
  case status of
    CS.Missing (Left e)        -> return (ExecutorFailure e)
    CS.Missing (Right (t, ec)) -> return (Failure t ec)
    CS.Pending ()              -> return AlreadyRunning
    CS.Complete (Nothing, _)   -> return Cached
    CS.Complete (Just t, _)    -> return (Success t)
  where
    logError = flip withException $ \(e::SomeException) ->
      $(logTM) ErrorS . ls $ displayException e

-- | Execute tasks forever
executeLoop :: forall c. Coordinator c
            => c
            -> Config c
            -> CS.ContentStore
            -> IO ()
executeLoop coord cfg store =
  executeLoopWithScribe coord cfg store =<<
    mkHandleScribe ColorIfTerminal stdout (permitItem InfoS) V2

-- | Same as 'executeLoop', but allows specifying a custom 'Scribe' for logging
executeLoopWithScribe :: forall c. Coordinator c
                      => c
                      -> Config c
                      -> CS.ContentStore
                      -> Scribe
                      -> IO ()
executeLoopWithScribe _ cfg store handleScribe = do
  let mkLogEnv = registerScribe "stdout" handleScribe defaultScribeSettings =<< initLogEnv "FFExecutorD" "production"
  bracket mkLogEnv closeScribes $ \le -> do
    let initialContext = ()
        initialNamespace = "executeLoop"

    runKatipContextT le initialContext initialNamespace $ do
      $(logTM) InfoS "Initialising connection to coordinator."
      hook :: Hook c <- lift $ initialise cfg
      executor <- lift $ Executor <$> getHostName

      let -- Known failures that do not affect the executors ability
          -- to execute further tasks will be logged and ignored.
          handleFailures = handle $ \(e::CS.StoreError) ->
            -- Certain store errors can occur if an item is forcibly removed
            -- while the executor is constructing it or picked up a
            -- corresponding outdated task from the queue.
            -- XXX: The store should distinguish between recoverable
            --   and unrecoverable errors.
            $(logTM) WarningS . ls $ displayException e

      forever $ handleFailures $ do
        $(logTM) DebugS "Awaiting task from coordinator."
        mb <- withPopTask hook executor $ \task ->
          katipAddContext (sl "task" $ task ^. tdOutput) $ do
            $(logTM) DebugS "Checking task"
            res <- execute store task
            case res of
              Cached      -> do
                $(logTM) InfoS "Task was cached"
                return (0, Right ())
              Success t   -> do
                $(logTM) InfoS "Task completed successfully"
                return (t, Right ())
              Failure t i -> do
                $(logTM) WarningS "Task failed"
                return (t, Left i)
              ExecutorFailure e -> do
                $(logTM) ErrorS $ "Executor failed: " <> ls (displayException e)
                return (0, Left 2)
              AlreadyRunning -> do
                -- XXX:
                --   This should not happen and indicates a programming error
                --   or invalid state.
                --   We do not want to just put the task back on the queue,
                --   as it would cause a loop.
                --   We do not want to just mark the task done, as a potential
                --   later completion of the already running external task
                --   would to mark it as done then.
                --   We cannot mark it as ongoing, as we don't have information
                --   on the executor where the task is already running.
                $(logTM) ErrorS $
                  "Received an already running task from the coordinator "
                  <> showLS (task ^. tdOutput)
                error $
                  "Received an already running task from the coordinator "
                  ++ show (task ^. tdOutput)
        case mb of
          Nothing -> lift $ threadDelay 1000000
          Just () -> return ()

-- | @withFollowFile in out@ follows the file @in@
--   and prints contents to @out@ as they appear.
--   The file must exist. Doesn't handle file truncation.
withFollowFile :: Path Abs File -> Handle -> IO a -> IO a
withFollowFile infile outhandle action = do
  mv <- newEmptyMVar
  inhandle <- openFile (fromAbsFile infile) ReadMode
  let loop = do
        some <- BS.hGetSome inhandle 4096
        if BS.null some then do
          done <- isJust <$> tryTakeMVar mv
          unless done $ do
            threadDelay 10000
            loop
        else do
          BS.hPut outhandle some
          loop
  res <- snd <$> concurrently (tryIO loop) (action <* putMVar mv ())
  hClose inhandle
  return res