{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE Strict #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE UndecidableInstances #-}
module Experimenter.MasterSlave
( WorkerStatus (..)
, createKeepAliveFork
, waitForSlaves
, keepAliveTimeout
) where
import Control.Lens
import Control.Monad.IO.Class
import Control.Monad.Logger (NoLoggingT, logInfo, runNoLoggingT)
import Data.IORef
import Control.Concurrent (forkIO, threadDelay)
import Control.Monad.Reader
import Control.Monad.Trans.Resource
import qualified Data.Text as T
import Data.Time (UTCTime, diffUTCTime, getCurrentTime)
import Database.Persist.Postgresql
import Network.HostName
import System.Posix.Process
import Experimenter.DatabaseSetting
import Experimenter.DB
import Experimenter.Models
import Experimenter.Result
import Experimenter.Util
keepAliveTimeout :: Num t => t
keepAliveTimeout :: forall t. Num t => t
keepAliveTimeout = t
10
data WorkerStatus = Working | Finished
deriving (WorkerStatus -> WorkerStatus -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkerStatus -> WorkerStatus -> Bool
$c/= :: WorkerStatus -> WorkerStatus -> Bool
== :: WorkerStatus -> WorkerStatus -> Bool
$c== :: WorkerStatus -> WorkerStatus -> Bool
Eq, Int -> WorkerStatus -> ShowS
[WorkerStatus] -> ShowS
WorkerStatus -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerStatus] -> ShowS
$cshowList :: [WorkerStatus] -> ShowS
show :: WorkerStatus -> String
$cshow :: WorkerStatus -> String
showsPrec :: Int -> WorkerStatus -> ShowS
$cshowsPrec :: Int -> WorkerStatus -> ShowS
Show)
createKeepAliveFork :: DatabaseSetting -> (UTCTime -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()) -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) () -> IO (IORef WorkerStatus)
createKeepAliveFork :: DatabaseSetting
-> (UTCTime -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ())
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
-> IO (IORef WorkerStatus)
createKeepAliveFork DatabaseSetting
dbSetup UTCTime -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
updateFunction ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
deletionFunction = do
IORef WorkerStatus
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef WorkerStatus
Working
forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (forall (m :: * -> *) a. NoLoggingT m a -> m a
runNoLoggingT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadLoggerIO m, MonadUnliftIO m) =>
ConnectionString -> Int -> (Pool SqlBackend -> m a) -> m a
withPostgresqlPool (DatabaseSetting -> ConnectionString
connectionString DatabaseSetting
dbSetup) Int
1 forall a b. (a -> b) -> a -> b
$ forall backend (m :: * -> *) a.
(MonadIO m, BackendCompatible SqlBackend backend) =>
ReaderT backend (NoLoggingT (ResourceT IO)) a
-> Pool backend -> m a
liftSqlPersistMPool forall a b. (a -> b) -> a -> b
$ IORef WorkerStatus
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
keepAlive IORef WorkerStatus
ref)
forall (m :: * -> *) a. Monad m => a -> m a
return IORef WorkerStatus
ref
where
keepAlive :: IORef WorkerStatus
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
keepAlive IORef WorkerStatus
ref = do
WorkerStatus
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef WorkerStatus
ref
if WorkerStatus
res forall a. Eq a => a -> a -> Bool
== WorkerStatus
Working
then do
UTCTime
time <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
UTCTime -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
updateFunction UTCTime
time
forall (m :: * -> *). MonadIO m => ReaderT SqlBackend m ()
transactionSave
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int
1000000 forall a. Num a => a -> a -> a
* forall t. Num t => t
keepAliveTimeout)
IORef WorkerStatus
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
keepAlive IORef WorkerStatus
ref
else ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
deletionFunction
waitForSlaves :: (MonadIO m) => Experiments a -> DB m Bool
waitForSlaves :: forall (m :: * -> *) a. MonadIO m => Experiments a -> DB m Bool
waitForSlaves Experiments a
exps = do
ProcessID
pid <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ProcessID
getProcessID
Text
hostName <- String -> Text
T.pack forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO String
getHostName
let notSelf :: ExpExecutionLock -> Bool
notSelf (ExpExecutionLock ExpId
_ Text
h Int
p UTCTime
_) = Bool -> Bool
not (Text
h forall a. Eq a => a -> a -> Bool
== Text
hostName Bool -> Bool -> Bool
&& Int
p forall a. Eq a => a -> a -> Bool
== forall a b. (Integral a, Num b) => a -> b
fromIntegral ProcessID
pid)
[ExpId]
expIds <- forall record backend (m :: * -> *).
(MonadIO m, PersistQueryRead backend,
PersistRecordBackend record backend) =>
[Filter record]
-> [SelectOpt record] -> ReaderT backend m [Key record]
selectKeysList [forall typ. (typ ~ Key Exps) => EntityField Exp typ
ExpExps forall v typ.
PersistField typ =>
EntityField v typ -> typ -> Filter v
==. Key Exps
expsId] []
forall {backend} {m :: * -> *}.
(BaseBackend backend ~ SqlBackend, MonadIO m,
PersistQueryWrite backend, MonadLogger m) =>
(ExpExecutionLock -> Bool) -> [ExpId] -> ReaderT backend m Bool
waitForSlaves' ExpExecutionLock -> Bool
notSelf [ExpId]
expIds
where
expsId :: Key Exps
expsId = Experiments a
exps forall s a. s -> Getting a s a -> a
^. forall a. Lens' (Experiments a) (Key Exps)
experimentsKey
waitForSlaves' :: (ExpExecutionLock -> Bool) -> [ExpId] -> ReaderT backend m Bool
waitForSlaves' ExpExecutionLock -> Bool
notSelf [ExpId]
expIds = do
[ExpExecutionLock]
locks <- forall a. (a -> Bool) -> [a] -> [a]
filter ExpExecutionLock -> Bool
notSelf forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall record. Entity record -> record
entityVal forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall record backend (m :: * -> *).
(MonadIO m, PersistQueryRead backend,
PersistRecordBackend record backend) =>
[Filter record]
-> [SelectOpt record] -> ReaderT backend m [Entity record]
selectList [forall typ. (typ ~ ExpId) => EntityField ExpExecutionLock typ
ExpExecutionLockExp forall v typ.
PersistField typ =>
EntityField v typ -> [typ] -> Filter v
<-. [ExpId]
expIds] []
if forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ExpExecutionLock]
locks
then do
forall backend (m :: * -> *) record.
(PersistQueryWrite backend, MonadIO m,
PersistRecordBackend record backend) =>
[Filter record] -> ReaderT backend m ()
deleteWhere [forall typ. (typ ~ ExpId) => EntityField ExpProgress typ
ExpProgressExp forall v typ.
PersistField typ =>
EntityField v typ -> [typ] -> Filter v
<-. [ExpId]
expIds]
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
else do
UTCTime
time <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
let workingSlaves :: [ExpExecutionLock]
workingSlaves = forall a. (a -> Bool) -> [a] -> [a]
filter (\ExpExecutionLock
l -> UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
time (ExpExecutionLock
l forall s a. s -> Getting a s a -> a
^. forall (f :: * -> *).
Functor f =>
(UTCTime -> f UTCTime) -> ExpExecutionLock -> f ExpExecutionLock
expExecutionLockLastAliveSign) forall a. Ord a => a -> a -> Bool
<= NominalDiffTime
2 forall a. Num a => a -> a -> a
* forall t. Num t => t
keepAliveTimeout) [ExpExecutionLock]
locks
if forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ExpExecutionLock]
workingSlaves
then forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
else do
$(logInfo) Text
"Waiting for slaves. List of slaves currently working: "
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall {m :: * -> *}. MonadLogger m => ExpExecutionLock -> m ()
printInfoSlave [ExpExecutionLock]
locks
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int
1000000 forall a. Num a => a -> a -> a
* forall t. Num t => t
keepAliveTimeout)
(ExpExecutionLock -> Bool) -> [ExpId] -> ReaderT backend m Bool
waitForSlaves' ExpExecutionLock -> Bool
notSelf [ExpId]
expIds
printInfoSlave :: ExpExecutionLock -> m ()
printInfoSlave (ExpExecutionLock ExpId
_ Text
host Int
pid UTCTime
_) = $(logInfo) forall a b. (a -> b) -> a -> b
$ Text
"Slave from host " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> Text
tshow Text
host forall a. Semigroup a => a -> a -> a
<> Text
" with process ID " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> Text
tshow Int
pid