{-# LANGUAGE FlexibleContexts     #-}
{-# LANGUAGE FlexibleInstances    #-}
{-# LANGUAGE GADTs                #-}
{-# LANGUAGE OverloadedStrings    #-}
{-# LANGUAGE ScopedTypeVariables  #-}
{-# LANGUAGE Strict               #-}
{-# LANGUAGE TemplateHaskell      #-}
{-# LANGUAGE TypeFamilies         #-}
{-# LANGUAGE UndecidableInstances #-}

module Experimenter.MasterSlave
    ( WorkerStatus (..)
    , createKeepAliveFork
    , waitForSlaves
    , keepAliveTimeout
    ) where

import           Control.Lens
import           Control.Monad.IO.Class
import           Control.Monad.Logger         (NoLoggingT, logInfo, runNoLoggingT)
import           Data.IORef

import           Control.Concurrent           (forkIO, threadDelay)
import           Control.Monad.Reader
import           Control.Monad.Trans.Resource
import qualified Data.Text                    as T
import           Data.Time                    (UTCTime, diffUTCTime, getCurrentTime)
import           Database.Persist.Postgresql
import           Network.HostName
import           System.Posix.Process


import           Experimenter.DatabaseSetting
import           Experimenter.DB
import           Experimenter.Models
import           Experimenter.Result
import           Experimenter.Util


keepAliveTimeout :: Num t => t
keepAliveTimeout :: forall t. Num t => t
keepAliveTimeout = t
10


data WorkerStatus = Working | Finished
  deriving (WorkerStatus -> WorkerStatus -> Bool
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: WorkerStatus -> WorkerStatus -> Bool
$c/= :: WorkerStatus -> WorkerStatus -> Bool
== :: WorkerStatus -> WorkerStatus -> Bool
$c== :: WorkerStatus -> WorkerStatus -> Bool
Eq, Int -> WorkerStatus -> ShowS
[WorkerStatus] -> ShowS
WorkerStatus -> String
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
showList :: [WorkerStatus] -> ShowS
$cshowList :: [WorkerStatus] -> ShowS
show :: WorkerStatus -> String
$cshow :: WorkerStatus -> String
showsPrec :: Int -> WorkerStatus -> ShowS
$cshowsPrec :: Int -> WorkerStatus -> ShowS
Show)

createKeepAliveFork :: DatabaseSetting -> (UTCTime -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()) -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) () -> IO (IORef WorkerStatus)
createKeepAliveFork :: DatabaseSetting
-> (UTCTime -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ())
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
-> IO (IORef WorkerStatus)
createKeepAliveFork DatabaseSetting
dbSetup UTCTime -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
updateFunction ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
deletionFunction = do
  IORef WorkerStatus
ref <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. a -> IO (IORef a)
newIORef WorkerStatus
Working
  forall (f :: * -> *) a. Functor f => f a -> f ()
void forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ IO () -> IO ThreadId
forkIO (forall (m :: * -> *) a. NoLoggingT m a -> m a
runNoLoggingT forall a b. (a -> b) -> a -> b
$ forall (m :: * -> *) a.
(MonadLoggerIO m, MonadUnliftIO m) =>
ConnectionString -> Int -> (Pool SqlBackend -> m a) -> m a
withPostgresqlPool (DatabaseSetting -> ConnectionString
connectionString DatabaseSetting
dbSetup) Int
1 forall a b. (a -> b) -> a -> b
$ forall backend (m :: * -> *) a.
(MonadIO m, BackendCompatible SqlBackend backend) =>
ReaderT backend (NoLoggingT (ResourceT IO)) a
-> Pool backend -> m a
liftSqlPersistMPool forall a b. (a -> b) -> a -> b
$ IORef WorkerStatus
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
keepAlive IORef WorkerStatus
ref)
  forall (m :: * -> *) a. Monad m => a -> m a
return IORef WorkerStatus
ref
  where
    keepAlive :: IORef WorkerStatus
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
keepAlive IORef WorkerStatus
ref = do
      WorkerStatus
res <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ forall a. IORef a -> IO a
readIORef IORef WorkerStatus
ref
      if WorkerStatus
res forall a. Eq a => a -> a -> Bool
== WorkerStatus
Working
        then do
          UTCTime
time <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
          UTCTime -> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
updateFunction UTCTime
time
          forall (m :: * -> *). MonadIO m => ReaderT SqlBackend m ()
transactionSave
          forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int
1000000 forall a. Num a => a -> a -> a
* forall t. Num t => t
keepAliveTimeout)
          IORef WorkerStatus
-> ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
keepAlive IORef WorkerStatus
ref
        else ReaderT SqlBackend (NoLoggingT (ResourceT IO)) ()
deletionFunction

waitForSlaves :: (MonadIO m) => Experiments a -> DB m Bool
waitForSlaves :: forall (m :: * -> *) a. MonadIO m => Experiments a -> DB m Bool
waitForSlaves Experiments a
exps = do
  ProcessID
pid <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO ProcessID
getProcessID
  Text
hostName <- String -> Text
T.pack forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO String
getHostName
  let notSelf :: ExpExecutionLock -> Bool
notSelf (ExpExecutionLock ExpId
_ Text
h Int
p UTCTime
_) = Bool -> Bool
not (Text
h forall a. Eq a => a -> a -> Bool
== Text
hostName Bool -> Bool -> Bool
&& Int
p forall a. Eq a => a -> a -> Bool
== forall a b. (Integral a, Num b) => a -> b
fromIntegral ProcessID
pid)
  [ExpId]
expIds <- forall record backend (m :: * -> *).
(MonadIO m, PersistQueryRead backend,
 PersistRecordBackend record backend) =>
[Filter record]
-> [SelectOpt record] -> ReaderT backend m [Key record]
selectKeysList [forall typ. (typ ~ Key Exps) => EntityField Exp typ
ExpExps forall v typ.
PersistField typ =>
EntityField v typ -> typ -> Filter v
==. Key Exps
expsId] []
  forall {backend} {m :: * -> *}.
(BaseBackend backend ~ SqlBackend, MonadIO m,
 PersistQueryWrite backend, MonadLogger m) =>
(ExpExecutionLock -> Bool) -> [ExpId] -> ReaderT backend m Bool
waitForSlaves' ExpExecutionLock -> Bool
notSelf [ExpId]
expIds
  where
    expsId :: Key Exps
expsId = Experiments a
exps forall s a. s -> Getting a s a -> a
^. forall a. Lens' (Experiments a) (Key Exps)
experimentsKey
    waitForSlaves' :: (ExpExecutionLock -> Bool) -> [ExpId] -> ReaderT backend m Bool
waitForSlaves' ExpExecutionLock -> Bool
notSelf [ExpId]
expIds = do
      [ExpExecutionLock]
locks <- forall a. (a -> Bool) -> [a] -> [a]
filter ExpExecutionLock -> Bool
notSelf forall b c a. (b -> c) -> (a -> b) -> a -> c
. forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap forall record. Entity record -> record
entityVal forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> forall record backend (m :: * -> *).
(MonadIO m, PersistQueryRead backend,
 PersistRecordBackend record backend) =>
[Filter record]
-> [SelectOpt record] -> ReaderT backend m [Entity record]
selectList [forall typ. (typ ~ ExpId) => EntityField ExpExecutionLock typ
ExpExecutionLockExp forall v typ.
PersistField typ =>
EntityField v typ -> [typ] -> Filter v
<-. [ExpId]
expIds] []
      if forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ExpExecutionLock]
locks
        then do
        forall backend (m :: * -> *) record.
(PersistQueryWrite backend, MonadIO m,
 PersistRecordBackend record backend) =>
[Filter record] -> ReaderT backend m ()
deleteWhere [forall typ. (typ ~ ExpId) => EntityField ExpProgress typ
ExpProgressExp forall v typ.
PersistField typ =>
EntityField v typ -> [typ] -> Filter v
<-. [ExpId]
expIds]
        forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
        else do
          UTCTime
time <- forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO IO UTCTime
getCurrentTime
          let workingSlaves :: [ExpExecutionLock]
workingSlaves = forall a. (a -> Bool) -> [a] -> [a]
filter (\ExpExecutionLock
l -> UTCTime -> UTCTime -> NominalDiffTime
diffUTCTime UTCTime
time (ExpExecutionLock
l forall s a. s -> Getting a s a -> a
^. forall (f :: * -> *).
Functor f =>
(UTCTime -> f UTCTime) -> ExpExecutionLock -> f ExpExecutionLock
expExecutionLockLastAliveSign) forall a. Ord a => a -> a -> Bool
<= NominalDiffTime
2 forall a. Num a => a -> a -> a
* forall t. Num t => t
keepAliveTimeout) [ExpExecutionLock]
locks
          if forall (t :: * -> *) a. Foldable t => t a -> Bool
null [ExpExecutionLock]
workingSlaves
            then forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False -- a slive must have died as it didn't delete the lock
            else do
              $(logInfo) Text
"Waiting for slaves. List of slaves currently working: "
              forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ forall {m :: * -> *}. MonadLogger m => ExpExecutionLock -> m ()
printInfoSlave [ExpExecutionLock]
locks
              forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay (Int
1000000 forall a. Num a => a -> a -> a
* forall t. Num t => t
keepAliveTimeout)
              (ExpExecutionLock -> Bool) -> [ExpId] -> ReaderT backend m Bool
waitForSlaves' ExpExecutionLock -> Bool
notSelf [ExpId]
expIds
    printInfoSlave :: ExpExecutionLock -> m ()
printInfoSlave (ExpExecutionLock ExpId
_ Text
host Int
pid UTCTime
_) = $(logInfo) forall a b. (a -> b) -> a -> b
$ Text
"Slave from host " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> Text
tshow Text
host forall a. Semigroup a => a -> a -> a
<> Text
" with process ID " forall a. Semigroup a => a -> a -> a
<> forall a. Show a => a -> Text
tshow Int
pid