module Control.Distributed.Process.Backend.SimpleLocalnet
(
Backend(..)
, initializeBackend
, startSlave
, terminateSlave
, findSlaves
, terminateAllSlaves
, startMaster
) where
import System.IO (fixIO)
import Data.Maybe (catMaybes)
import Data.Binary (Binary(get, put), getWord8, putWord8)
import Data.Accessor (Accessor, accessor, (^:), (^.))
import Data.Set (Set)
import qualified Data.Set as Set (insert, empty, toList)
import Data.Foldable (forM_)
import Data.Typeable (Typeable)
import Control.Applicative ((<$>))
import Control.Exception (throw)
import Control.Monad (forever, forM)
import Control.Monad.IO.Class (liftIO)
import Control.Concurrent (forkIO, threadDelay, ThreadId)
import Control.Concurrent.MVar (MVar, newMVar, readMVar, modifyMVar_)
import Control.Distributed.Process
( RemoteTable
, NodeId
, Process
, WhereIsReply(..)
, whereis
, whereisRemoteAsync
, registerRemote
, getSelfPid
, register
, expect
, nsendRemote
, receiveWait
, matchIf
, processNodeId
)
import qualified Control.Distributed.Process.Node as Node
( LocalNode
, newLocalNode
, localNodeId
, runProcess
)
import qualified Network.Transport.TCP as NT
( createTransport
, defaultTCPParameters
)
import qualified Network.Transport as NT (Transport)
import qualified Network.Socket as N (HostName, ServiceName, SockAddr)
import Control.Distributed.Process.Backend.SimpleLocalnet.Internal.Multicast (initMulticast)
data Backend = Backend {
newLocalNode :: IO Node.LocalNode
, findPeers :: Int -> IO [NodeId]
, redirectLogsHere :: Process ()
}
data BackendState = BackendState {
_localNodes :: [Node.LocalNode]
, _peers :: Set NodeId
, discoveryDaemon :: ThreadId
}
initializeBackend :: N.HostName -> N.ServiceName -> RemoteTable -> IO Backend
initializeBackend host port rtable = do
mTransport <- NT.createTransport host port NT.defaultTCPParameters
(recv, send) <- initMulticast "224.0.0.99" 9999 1024
(_, backendState) <- fixIO $ \ ~(tid, _) -> do
backendState <- newMVar BackendState
{ _localNodes = []
, _peers = Set.empty
, discoveryDaemon = tid
}
tid' <- forkIO $ peerDiscoveryDaemon backendState recv send
return (tid', backendState)
case mTransport of
Left err -> throw err
Right transport ->
let backend = Backend {
newLocalNode = apiNewLocalNode transport rtable backendState
, findPeers = apiFindPeers send backendState
, redirectLogsHere = apiRedirectLogsHere backend
}
in return backend
apiNewLocalNode :: NT.Transport
-> RemoteTable
-> MVar BackendState
-> IO Node.LocalNode
apiNewLocalNode transport rtable backendState = do
localNode <- Node.newLocalNode transport rtable
modifyMVar_ backendState $ return . (localNodes ^: (localNode :))
return localNode
apiFindPeers :: (PeerDiscoveryMsg -> IO ())
-> MVar BackendState
-> Int
-> IO [NodeId]
apiFindPeers send backendState delay = do
send PeerDiscoveryRequest
threadDelay delay
Set.toList . (^. peers) <$> readMVar backendState
data PeerDiscoveryMsg =
PeerDiscoveryRequest
| PeerDiscoveryReply NodeId
instance Binary PeerDiscoveryMsg where
put PeerDiscoveryRequest = putWord8 0
put (PeerDiscoveryReply nid) = putWord8 1 >> put nid
get = do
header <- getWord8
case header of
0 -> return PeerDiscoveryRequest
1 -> PeerDiscoveryReply <$> get
_ -> fail "PeerDiscoveryMsg.get: invalid"
peerDiscoveryDaemon :: MVar BackendState
-> IO (PeerDiscoveryMsg, N.SockAddr)
-> (PeerDiscoveryMsg -> IO ())
-> IO ()
peerDiscoveryDaemon backendState recv send = forever go
where
go = do
(msg, _) <- recv
case msg of
PeerDiscoveryRequest -> do
nodes <- (^. localNodes) <$> readMVar backendState
forM_ nodes $ send . PeerDiscoveryReply . Node.localNodeId
PeerDiscoveryReply nid ->
modifyMVar_ backendState $ return . (peers ^: Set.insert nid)
apiRedirectLogsHere :: Backend -> Process ()
apiRedirectLogsHere backend = do
mLogger <- whereis "logger"
forM_ mLogger $ \logger -> do
nids <- liftIO $ findPeers backend 1000000
forM_ nids $ \nid -> registerRemote nid "logger" logger
data SlaveControllerMsg =
SlaveTerminate
deriving (Typeable, Show)
instance Binary SlaveControllerMsg where
put SlaveTerminate = putWord8 0
get = do
header <- getWord8
case header of
0 -> return SlaveTerminate
_ -> fail "SlaveControllerMsg.get: invalid"
startSlave :: Backend -> IO ()
startSlave backend = do
node <- newLocalNode backend
Node.runProcess node slaveController
slaveController :: Process ()
slaveController = do
pid <- getSelfPid
register "slaveController" pid
go
where
go = do
msg <- expect
case msg of
SlaveTerminate -> return ()
terminateSlave :: NodeId -> Process ()
terminateSlave nid = nsendRemote nid "slaveController" SlaveTerminate
findSlaves :: Backend -> Process [NodeId]
findSlaves backend = do
nodes <- liftIO $ findPeers backend 1000000
forM_ nodes $ \nid -> whereisRemoteAsync nid "slaveController"
catMaybes <$> forM nodes (\_ ->
receiveWait
[ matchIf (\(WhereIsReply label _) -> label == "slaveController")
(\(WhereIsReply _ mPid) -> return (processNodeId <$> mPid))
])
terminateAllSlaves :: Backend -> Process ()
terminateAllSlaves backend = do
slaves <- findSlaves backend
forM_ slaves terminateSlave
liftIO $ threadDelay 1000000
startMaster :: Backend -> ([NodeId] -> Process ()) -> IO ()
startMaster backend proc = do
node <- newLocalNode backend
Node.runProcess node $ do
slaves <- findSlaves backend
redirectLogsHere backend
proc slaves
localNodes :: Accessor BackendState [Node.LocalNode]
localNodes = accessor _localNodes (\ns st -> st { _localNodes = ns })
peers :: Accessor BackendState (Set NodeId)
peers = accessor _peers (\ps st -> st { _peers = ps })