{-# OPTIONS_GHC -fno-warn-orphans #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE StrictData #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE TypeFamilyDependencies #-}
module Control.Funflow.External.Coordinator where
import Control.Exception.Safe
import Control.Funflow.ContentHashable (ContentHash)
import Control.Funflow.External
import Control.Lens
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Monoid ((<>))
import Data.Store (Store)
import Data.Store.TH (makeStore)
import Data.Typeable (Typeable)
import Katip
import Network.HostName
import Path
import System.Clock (TimeSpec)
#if !MIN_VERSION_store(0,5,0)
instance Store TimeSpec
#endif
newtype Executor = Executor HostName
deriving (Show, Store)
data TaskStatus =
Pending
| Running ExecutionInfo
| Completed ExecutionInfo
| Failed ExecutionInfo Int
deriving Show
data TaskInfo =
KnownTask TaskStatus
| UnknownTask
deriving Show
data ExecutionInfo = ExecutionInfo {
_eiExecutor :: Executor
, _eiElapsed :: TimeSpec
} deriving Show
data TaskError
= ExternalTaskFailed
TaskDescription
TaskInfo
(Maybe (Path Abs File))
(Maybe (Path Abs File))
deriving (Show, Typeable)
instance Exception TaskError where
displayException (ExternalTaskFailed td ti mbStdout mbStderr) =
"External task failed to construct item '"
++ show (_tdOutput td)
++ "'. Task info: "
++ show ti
++ " stdout: "
++ show mbStdout
++ " stderr: "
++ show mbStderr
++ " Task: "
++ show (_tdTask td)
class Coordinator c where
type Config c
type Hook c = h | h -> c
initialise :: MonadIO m => Config c -> m (Hook c)
submitTask :: MonadIO m => Hook c -> TaskDescription -> m ()
queueSize :: MonadIO m => Hook c -> m Int
taskInfo :: MonadIO m => Hook c -> ContentHash -> m TaskInfo
popTask :: MonadIO m => Hook c -> Executor
-> m (Maybe TaskDescription)
awaitTask :: MonadIO m => Hook c -> ContentHash -> m TaskInfo
updateTaskStatus :: MonadIO m => Hook c -> ContentHash -> TaskStatus -> m ()
dropTasks :: MonadIO m => Hook c -> m ()
makeLenses ''ExecutionInfo
makeStore ''TaskStatus
makeStore ''ExecutionInfo
makeStore ''TaskInfo
startTask :: (Coordinator c, MonadIO m)
=> Hook c
-> m (Maybe TaskDescription)
startTask h = liftIO $ do
executorInfo <- Executor <$> getHostName
popTask h executorInfo
isInProgress :: (Coordinator c, MonadIO m)
=> Hook c
-> ContentHash
-> m Bool
isInProgress h ch = do
ti <- taskInfo h ch
return $ case ti of
KnownTask Pending -> True
KnownTask (Running _) -> True
_ -> False
withPopTask :: (Coordinator c, MonadIO m, MonadMask m, KatipContext m)
=> Hook c -> Executor
-> (TaskDescription -> m (TimeSpec, Either Int ()))
-> m (Maybe ())
withPopTask hook executor f =
bracketOnError
(popTask hook executor)
(\case
Nothing -> return ()
Just td ->
update td Pending
`withException`
\e -> $(logTM) ErrorS $
"Failed to place task "
<> showLS (td ^. tdOutput)
<> " back on queue: "
<> ls (displayException (e :: SomeException))
)
(\case
Nothing -> return Nothing
Just td -> f td >>= \case
(t, Left ec) -> Just <$> update td (Failed (execInfo t) ec)
(t, Right ()) -> Just <$> update td (Completed (execInfo t)))
where
update td = updateTaskStatus hook (td ^. tdOutput)
execInfo = ExecutionInfo executor