module LogicGrowsOnTrees.Parallel.Adapter.Network
(
driver
, Network()
, withNetwork
, NetworkRequestQueueMonad(..)
, NetworkControllerMonad
, abort
, fork
, getCurrentProgressAsync
, getCurrentProgress
, getCurrentStatisticsAsync
, getCurrentStatistics
, getNumberOfWorkersAsync
, getNumberOfWorkers
, requestProgressUpdateAsync
, requestProgressUpdate
, setWorkloadBufferSize
, RunOutcome(..)
, RunStatistics(..)
, TerminationReason(..)
, NetworkCallbacks(..)
, default_network_callbacks
, NetworkConfiguration(..)
, WorkerId(..)
, WrappedPortID(..)
, runSupervisor
, runWorker
, runExplorer
, showPortID
, getConfiguration
) where
import Prelude hiding (catch)
import Control.Applicative (Applicative(..))
import Control.Concurrent (ThreadId,forkIO,killThread,myThreadId,throwTo)
import Control.Exception (AsyncException(..),SomeException,bracket,catch,fromException)
import Control.Lens (use)
import Control.Lens.Operators ((%=))
import Control.Lens.TH (makeLenses)
import Control.Monad (forever,when)
import Control.Monad.CatchIO (MonadCatchIO)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Reader (ask)
import Control.Monad.Trans.State.Strict (StateT,evalStateT)
import Data.Composition ((.*))
import Data.Functor ((<$>))
import qualified Data.Map as Map
import Data.Map (Map)
import Data.Maybe (fromMaybe)
import Data.Monoid (Monoid(..))
import Data.Serialize (Serialize)
import qualified Data.Set as Set
import Data.Set (Set)
import Data.Typeable (Typeable)
import Network (HostName,PortID(..),PortNumber,accept,connectTo,listenOn,sClose,withSocketsDo)
import System.Console.CmdTheLine
import System.IO (Handle)
import qualified System.Log.Logger as Logger
import System.Log.Logger (Priority(DEBUG,INFO,NOTICE))
import System.Log.Logger.TH
import Text.PrettyPrint (text)
import LogicGrowsOnTrees
import LogicGrowsOnTrees.Parallel.Common.Message
import qualified LogicGrowsOnTrees.Parallel.Common.Process as Process
import LogicGrowsOnTrees.Parallel.Common.RequestQueue
import LogicGrowsOnTrees.Parallel.Common.Supervisor
hiding
(runSupervisor
,getCurrentProgress
,getCurrentStatistics
,getNumberOfWorkers
,setWorkloadBufferSize
)
import LogicGrowsOnTrees.Parallel.ExplorationMode
import LogicGrowsOnTrees.Parallel.Main
import LogicGrowsOnTrees.Parallel.Purity
import LogicGrowsOnTrees.Utils.Handle
deriveLoggers "Logger" [DEBUG,INFO,NOTICE]
data NetworkSecret = NetworkSecret
type Network = ?network_secret :: NetworkSecret
withNetwork :: (Network ⇒ IO α) → IO α
withNetwork action =
let ?network_secret = NetworkSecret
in withSocketsDo action
data WorkerId = WorkerId
{ workerHostName :: HostName
, workerPortNumber :: PortNumber
} deriving (Eq,Ord,Show,Typeable)
data Worker = Worker
{ workerHandle :: Handle
, workerThreadId :: ThreadId
} deriving (Eq,Show)
data NetworkState = NetworkState
{ _pending_add :: !(Set WorkerId)
, _pending_quit :: !(Set WorkerId)
, _workers :: !(Map WorkerId Worker)
}
makeLenses ''NetworkState
type NetworkStateMonad = StateT NetworkState IO
class RequestQueueMonad m ⇒ NetworkRequestQueueMonad m where
disconnectWorker :: WorkerId → m ()
newtype NetworkControllerMonad exploration_mode α =
C (RequestQueueReader exploration_mode WorkerId NetworkStateMonad α)
deriving (Applicative,Functor,Monad,MonadCatchIO,MonadIO,RequestQueueMonad)
instance HasExplorationMode (NetworkControllerMonad exploration_mode) where
type ExplorationModeFor (NetworkControllerMonad exploration_mode) = exploration_mode
instance NetworkRequestQueueMonad (NetworkControllerMonad result) where
disconnectWorker worker_id = C $ ask >>= (enqueueRequest $ do
debugM ("Disconnecting worker " ++ show worker_id)
Map.lookup worker_id <$> use workers >>=
maybe (pending_add %= Set.delete worker_id)
(liftIO . flip send QuitWorker . workerHandle)
)
data NetworkCallbacks = NetworkCallbacks
{ notifyConnected :: WorkerId → IO Bool
, notifyDisconnected :: WorkerId → IO ()
}
default_network_callbacks :: NetworkCallbacks
default_network_callbacks = NetworkCallbacks
{ notifyConnected = const (return True)
, notifyDisconnected = const (return ())
}
data NetworkConfiguration shared_configuration supervisor_configuration =
SupervisorConfiguration
{ shared_configuration :: shared_configuration
, supervisor_configuration :: supervisor_configuration
, supervisor_port :: WrappedPortID
}
| WorkerConfiguration
{ supervisor_host_name :: HostName
, supervisor_port :: WrappedPortID
}
newtype WrappedPortID = WrappedPortID { unwrapPortID :: PortID }
instance ArgVal WrappedPortID where
converter = (parsePortID,prettyPortID)
where
(parseInt,prettyInt) = converter
parsePortID =
either Left (\n →
if n >= 0 && n <= (65535 :: Int)
then Right . WrappedPortID . PortNumber . fromIntegral $ n
else Left . text $ "bad port number: must be between 0 and 65535, inclusive (was given " ++ show n ++ ")"
)
.
parseInt
prettyPortID (WrappedPortID (PortNumber port_number)) = prettyInt . fromIntegral $ port_number
prettyPortID _ = error "a non-numeric port ID somehow made its way in to the configuration"
instance ArgVal (Maybe WrappedPortID) where
converter = just
runSupervisor ::
∀ exploration_mode.
( Serialize (ProgressFor exploration_mode)
, Serialize (WorkerFinishedProgressFor exploration_mode)
, Network
) ⇒
ExplorationMode exploration_mode →
(Handle → IO ()) →
NetworkCallbacks →
PortID →
ProgressFor exploration_mode →
NetworkControllerMonad exploration_mode () →
IO (RunOutcomeFor exploration_mode)
runSupervisor
exploration_mode
initializeWorker
NetworkCallbacks{..}
port_id
starting_progress
(C controller)
= do
request_queue ← newRequestQueue
let receiveStolenWorkloadFromWorker = flip enqueueRequest request_queue .* receiveStolenWorkload
receiveProgressUpdateFromWorker = flip enqueueRequest request_queue .* receiveProgressUpdate
receiveFailureFromWorker = flip enqueueRequest request_queue .* receiveWorkerFailure
receiveFinishedFromWorker worker_id final_progress = flip enqueueRequest request_queue $ do
removal_flag ← Set.member worker_id <$> use pending_quit
infoM $ if removal_flag
then "Worker " ++ show worker_id ++ " has finished, and will be removed."
else "Worker " ++ show worker_id ++ " has finished, and will look for another workload."
receiveWorkerFinishedWithRemovalFlag removal_flag worker_id final_progress
receiveQuitFromWorker worker_id = flip enqueueRequest request_queue $ do
infoM $ "Worker " ++ show worker_id ++ " has quit."
pending_quit %= Set.delete worker_id
workers %= Map.delete worker_id
removeWorkerIfPresent worker_id
liftIO $ notifyDisconnected_ worker_id
sendMessageToWorker message worker_id = do
use workers
>>=
liftIO
.
flip send message
.
workerHandle
.
fromJustOrBust ("Error looking up " ++ show worker_id ++ " to send a message")
.
Map.lookup worker_id
broadcastMessageToWorkers message = mapM_ (sendMessageToWorker message)
broadcastProgressUpdateToWorkers = broadcastMessageToWorkers RequestProgressUpdate
broadcastWorkloadStealToWorkers = broadcastMessageToWorkers RequestWorkloadSteal
receiveCurrentProgress = receiveProgress request_queue
sendWorkloadToWorker workload worker_id = do
infoM $ "Activating worker " ++ show worker_id ++ " with workload " ++ show workload
sendMessageToWorker (StartWorkload workload) worker_id
notifyDisconnected_ worker_id@WorkerId{..} = do
noticeM $ "Worker " ++ workerHostName ++ ":" ++ show workerPortNumber ++ " has disconnected"
notifyDisconnected worker_id
let port_id_description = showPortID port_id
supervisor_thread_id ← myThreadId
acceptor_thread_id ← forkIO $
bracket
(debugM ("Acquiring lock on " ++ port_id_description) >> listenOn port_id)
(\socket → debugM ("Releasing lock on " ++ port_id_description) >> sClose socket)
(\socket → forever $
accept socket >>=
\(workerHandle,workerHostName,workerPortNumber) → do
let identifier = workerHostName ++ ":" ++ show workerPortNumber
debugM $ "Received connection from " ++ identifier
let worker_id = WorkerId{..}
sendToWorker = send workerHandle
flip enqueueRequestAndWait request_queue $ pending_add %= Set.insert worker_id
allowed_to_connect ← liftIO $ notifyConnected worker_id
if allowed_to_connect
then
do debugM $ identifier ++ " is allowed to connect."
noticeM $ "Received connection from " ++ identifier
initializeWorker workerHandle
workerThreadId ← forkIO (
receiveAndProcessMessagesFromWorkerUsingHandle
(MessageForSupervisorReceivers{..} :: MessageForSupervisorReceivers exploration_mode WorkerId)
workerHandle
worker_id
`catch`
(\e → case fromException e of
Just ThreadKilled → sendToWorker QuitWorker
Just UserInterrupt → sendToWorker QuitWorker
_ → do enqueueRequest (removeWorker worker_id) request_queue
sendToWorker QuitWorker `catch` (\(_::SomeException) → return ())
liftIO $ notifyDisconnected_ worker_id
)
)
flip enqueueRequest request_queue $ do
is_pending_add ← Set.member worker_id <$> use pending_add
when is_pending_add $ do
pending_add %= Set.delete worker_id
workers %= Map.insert worker_id Worker{..}
addWorker worker_id
else
do debugM $ identifier ++ " is *not* allowed to connect."
noticeM $ "Rejected connections from " ++ identifier
sendToWorker QuitWorker
)
`catch`
(\e → case fromException e of
Just ThreadKilled → return ()
_ → throwTo supervisor_thread_id e
)
forkControllerThread request_queue controller
flip evalStateT (NetworkState mempty mempty mempty) $ do
supervisor_outcome@SupervisorOutcome{supervisorRemainingWorkers} ←
runSupervisorStartingFrom
exploration_mode
starting_progress
SupervisorCallbacks{..}
(requestQueueProgram (return ()) request_queue)
broadcastMessageToWorkers QuitWorker supervisorRemainingWorkers
liftIO $ killThread acceptor_thread_id
killControllerThreads request_queue
return $ extractRunOutcomeFromSupervisorOutcome supervisor_outcome
runExplorer ::
( Serialize shared_configuration
, Serialize (ProgressFor exploration_mode)
, Serialize (WorkerFinishedProgressFor exploration_mode)
, Network
) ⇒
(shared_configuration → ExplorationMode exploration_mode)
→
Purity m n →
IO (NetworkConfiguration shared_configuration supervisor_configuration)
→
(shared_configuration → IO ())
→
(shared_configuration → TreeT m (ResultFor exploration_mode))
→
(shared_configuration → supervisor_configuration → IO (ProgressFor exploration_mode))
→
(shared_configuration → supervisor_configuration → NetworkControllerMonad exploration_mode ())
→
IO (Maybe ((shared_configuration,supervisor_configuration),RunOutcomeFor exploration_mode))
runExplorer
constructExplorationMode
purity
getConfiguration
initializeGlobalState
constructTree
getStartingProgress
constructController
= do
configuration ← liftIO $ getConfiguration
case configuration of
SupervisorConfiguration{..} → do
liftIO $ initializeGlobalState shared_configuration
starting_progress ← liftIO $ getStartingProgress shared_configuration supervisor_configuration
termination_result ←
runSupervisor
(constructExplorationMode shared_configuration)
(flip send shared_configuration)
default_network_callbacks
(unwrapPortID supervisor_port)
starting_progress
(constructController shared_configuration supervisor_configuration)
return $ Just ((shared_configuration,supervisor_configuration),termination_result)
WorkerConfiguration{..} → liftIO $ do
handle ← connectTo supervisor_host_name (unwrapPortID supervisor_port)
shared_configuration ← receive handle
Process.runWorkerUsingHandles
(constructExplorationMode shared_configuration)
purity
(constructTree shared_configuration)
handle
handle
return Nothing
runWorker ::
( Serialize (ProgressFor exploration_mode)
, Serialize (WorkerFinishedProgressFor exploration_mode)
, Network
) ⇒
ExplorationMode exploration_mode →
Purity m n →
TreeT m (ResultFor exploration_mode) →
HostName →
PortID →
IO ()
runWorker exploration_mode purity tree host_name port_id = liftIO $ do
handle ← connectTo host_name port_id
Process.runWorkerUsingHandles exploration_mode purity tree handle handle
getConfiguration ::
Term shared_configuration →
Term supervisor_configuration →
TermInfo →
IO (NetworkConfiguration shared_configuration supervisor_configuration)
getConfiguration shared_configuration_term supervisor_configuration_term term_info =
execChoice
(no_configuration_term,term_info)
[(supervisorConfigurationTermFor shared_configuration_term supervisor_configuration_term,defTI
{ termName = "supervisor"
, termDoc = "Run the program in supervisor mode, waiting for network connections from workers on the specified port."
, man = mainMan
}
)
,(worker_configuration_term,defTI
{ termName = "worker"
, termDoc = "Run the program in worker mode, connecting to the specified supervisor to receive workloads."
}
)
]
showPortID :: PortID → String
showPortID (Service service_name) = "Service " ++ service_name
showPortID (PortNumber port_number) = "Port Number " ++ show port_number
#ifndef mingw32_HOST_OS
showPortID (UnixSocket unix_socket_name) = "Unix Socket " ++ unix_socket_name
#endif
driver ::
∀ shared_configuration supervisor_configuration m n exploration_mode.
( Serialize shared_configuration
, Serialize (ProgressFor exploration_mode)
, Serialize (WorkerFinishedProgressFor exploration_mode)
) ⇒
Driver IO shared_configuration supervisor_configuration m n exploration_mode
driver =
let ?network_secret = NetworkSecret
in case (driverNetwork :: Driver IO shared_configuration supervisor_configuration m n exploration_mode) of
Driver runDriver → Driver runDriver
driverNetwork ::
∀ shared_configuration supervisor_configuration m n exploration_mode.
( Serialize shared_configuration
, Serialize (ProgressFor exploration_mode)
, Serialize (WorkerFinishedProgressFor exploration_mode)
, Network
) ⇒
Driver IO shared_configuration supervisor_configuration m n exploration_mode
driverNetwork = Driver $ \DriverParameters{..} → do
runExplorer
constructExplorationMode
purity
(getConfiguration shared_configuration_term supervisor_configuration_term program_info)
initializeGlobalState
constructTree
getStartingProgress
constructController
>>=
maybe (return ()) (liftIO . (notifyTerminated <$> fst . fst <*> snd . fst <*> snd))
fromJustOrBust message = fromMaybe (error message)
no_configuration_term = ret (pure $ helpFail Plain Nothing)
supervisorConfigurationTermFor shared_configuration_term supervisor_configuration_term =
SupervisorConfiguration
<$> shared_configuration_term
<*> supervisor_configuration_term
<*> (required
$
opt Nothing
((optInfo ["p","port"])
{ optName = "PORT"
, optDoc = "port on which to listen for workers"
}
)
)
worker_configuration_term =
WorkerConfiguration
<$> (required
$
pos 0
Nothing
posInfo
{ posName = "HOST_NAME"
, posDoc = "supervisor host name"
}
)
<*> (required
$
pos 1
Nothing
posInfo
{ posName = "HOST_PORT"
, posDoc = "supervisor host port"
}
)