{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
module Control.Funflow.External.Executor where
import Control.Arrow (second)
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Exception.Safe
import qualified Control.Funflow.ContentStore as CS
import Control.Funflow.External
import Control.Funflow.External.Coordinator
import qualified Control.Funflow.RemoteCache as Remote
import Control.Lens
import Control.Monad (forever, mzero, unless)
import Control.Monad.Trans (lift)
import Control.Monad.Trans.Maybe
import qualified Data.Aeson as Json
import qualified Data.ByteString as BS
import Data.Foldable (for_)
import Data.Maybe (isJust, isNothing)
import Data.Monoid ((<>))
import qualified Data.Text as T
import GHC.IO.Handle (hClose)
import Katip as K
import Network.HostName
import Path
import Path.IO
import System.Clock
import System.Exit (ExitCode (..))
import System.IO (Handle, IOMode (..),
openFile, stderr, stdout)
import System.Posix.Env (getEnv)
import System.Posix.User
import System.Process
data ExecutionResult =
Cached
| AlreadyRunning
| Success TimeSpec
| Failure TimeSpec Int
| ExecutorFailure IOException
execute :: CS.ContentStore -> TaskDescription -> KatipContextT IO ExecutionResult
execute store td = logError $ do
status <- CS.withConstructIfMissing store Remote.NoCache (td ^. tdOutput) $ \fp -> do
(fpOut, hOut) <- lift $
CS.createMetadataFile store (td ^. tdOutput) [relfile|stdout|]
(fpErr, hErr) <- lift $
CS.createMetadataFile store (td ^. tdOutput) [relfile|stderr|]
let
withFollowOutput
| td ^. tdTask . etWriteToStdOut . to outputCaptureToRelFile
. to isNothing
= withFollowFile fpErr stderr
| otherwise
= withFollowFile fpErr stderr
. withFollowFile fpOut stdout
cmd = T.unpack $ td ^. tdTask . etCommand
procSpec params textEnv = (proc cmd $ T.unpack <$> params)
{ cwd = Just (fromAbsDir fp)
, close_fds = True
, std_err = UseHandle hErr
, std_out = UseHandle hOut
, env = map (bimap T.unpack T.unpack) <$> textEnv
}
convParam = ConvParam
{ convPath = pure . CS.itemPath store
, convEnv = \e -> T.pack <$> MaybeT (getEnv $ T.unpack e)
, convUid = lift getEffectiveUserID
, convGid = lift getEffectiveGroupID
, convOut = pure fp
}
mbParams <- lift $ runMaybeT $
traverse (paramToText convParam) (td ^. tdTask . etParams)
mbEnv <- case td ^. tdTask . etEnv of
EnvInherit -> pure Nothing
EnvExplicit x ->
(lift . runMaybeT $ traverse (sequence . second (paramToText convParam)) x)
>>= \case
Nothing -> fail "A parameter was not ready"
jp -> return jp
params <- case mbParams of
Nothing -> fail "A parameter was not ready"
Just params -> return params
let
inputItems :: [CS.Item]
inputItems = do
Param fields <- td ^. tdTask . etParams
ParamPath inputPath <- fields
case inputPath of
IPItem item -> pure item
IPExternalFile _ -> mzero
IPExternalDir _ -> mzero
CS.setInputs store (td ^. tdOutput) inputItems
CS.setMetadata store (td ^. tdOutput)
("external-task"::T.Text)
(Json.encode (td ^. tdTask))
start <- lift $ getTime Monotonic
let theProc = procSpec params mbEnv
katipAddNamespace "process" . katipAddContext (sl "processId" $ show theProc) $ do
$(logTM) InfoS "Executing"
res <- lift $ tryIO $ withCreateProcess theProc $ \_ _ _ ph ->
withFollowOutput $ do
exitCode <- waitForProcess ph
hClose hErr
hClose hOut
end <- getTime Monotonic
case exitCode of
ExitSuccess -> do
for_ (td ^. tdTask . etWriteToStdOut . to outputCaptureToRelFile)
$ \file -> copyFile fpOut (fp </> file)
return $ Right (diffTimeSpec start end)
ExitFailure i ->
return $ Left (diffTimeSpec start end, i)
case res of
Right (Right r) -> return $ Right r
Right (Left e) -> return $ Left (Right e)
Left e -> return $ Left (Left e)
case status of
CS.Missing (Left e) -> return (ExecutorFailure e)
CS.Missing (Right (t, ec)) -> return (Failure t ec)
CS.Pending () -> return AlreadyRunning
CS.Complete (Nothing, _) -> return Cached
CS.Complete (Just t, _) -> return (Success t)
where
logError = flip withException $ \(e::SomeException) ->
$(logTM) ErrorS . ls $ displayException e
executeLoop :: forall c. Coordinator c
=> c
-> Config c
-> CS.ContentStore
-> IO ()
executeLoop coord cfg store =
executeLoopWithScribe coord cfg store =<<
mkHandleScribe ColorIfTerminal stdout (permitItem InfoS) V2
executeLoopWithScribe :: forall c. Coordinator c
=> c
-> Config c
-> CS.ContentStore
-> Scribe
-> IO ()
executeLoopWithScribe _ cfg store handleScribe = do
let mkLogEnv = registerScribe "stdout" handleScribe defaultScribeSettings =<< initLogEnv "FFExecutorD" "production"
bracket mkLogEnv closeScribes $ \le -> do
let initialContext = ()
initialNamespace = "executeLoop"
runKatipContextT le initialContext initialNamespace $ do
$(logTM) InfoS "Initialising connection to coordinator."
hook :: Hook c <- lift $ initialise cfg
executor <- lift $ Executor <$> getHostName
let
handleFailures = handle $ \(e::CS.StoreError) ->
$(logTM) WarningS . ls $ displayException e
forever $ handleFailures $ do
$(logTM) DebugS "Awaiting task from coordinator."
mb <- withPopTask hook executor $ \task ->
katipAddContext (sl "task" $ task ^. tdOutput) $ do
$(logTM) DebugS "Checking task"
res <- execute store task
case res of
Cached -> do
$(logTM) InfoS "Task was cached"
return (0, Right ())
Success t -> do
$(logTM) InfoS "Task completed successfully"
return (t, Right ())
Failure t i -> do
$(logTM) WarningS "Task failed"
return (t, Left i)
ExecutorFailure e -> do
$(logTM) ErrorS $ "Executor failed: " <> ls (displayException e)
return (0, Left 2)
AlreadyRunning -> do
$(logTM) ErrorS $
"Received an already running task from the coordinator "
<> showLS (task ^. tdOutput)
error $
"Received an already running task from the coordinator "
++ show (task ^. tdOutput)
case mb of
Nothing -> lift $ threadDelay 1000000
Just () -> return ()
withFollowFile :: Path Abs File -> Handle -> IO a -> IO a
withFollowFile infile outhandle action = do
mv <- newEmptyMVar
inhandle <- openFile (fromAbsFile infile) ReadMode
let loop = do
some <- BS.hGetSome inhandle 4096
if BS.null some then do
done <- isJust <$> tryTakeMVar mv
unless done $ do
threadDelay 10000
loop
else do
BS.hPut outhandle some
loop
res <- snd <$> concurrently (tryIO loop) (action <* putMVar mv ())
hClose inhandle
return res