-- | Simple backend based on the TCP transport which offers node discovery
-- based on UDP multicast. This is a zero-configuration backend designed to
-- get you going with Cloud Haskell quickly without imposing any structure
-- on your application.
--
-- To simplify getting started we provide special support for /master/ and
-- /slave/ nodes (see 'startSlave' and 'startMaster'). Use of these functions
-- is completely optional; you can use the local backend without making use
-- of the predefined master and slave nodes.
--
-- [Minimal example]
--
-- > import System.Environment (getArgs)
-- > import Control.Distributed.Process
-- > import Control.Distributed.Process.Node (initRemoteTable)
-- > import Control.Distributed.Process.Backend.SimpleLocalnet
-- >
-- > master :: Backend -> [NodeId] -> Process ()
-- > master backend slaves = do
-- >   -- Do something interesting with the slaves
-- >   liftIO . putStrLn $ "Slaves: " ++ show slaves
-- >   -- Terminate the slaves when the master terminates (this is optional)
-- >   terminateAllSlaves backend
-- >
-- > main :: IO ()
-- > main = do
-- >   args <- getArgs
-- >
-- >   case args of
-- >     ["master", host, port] -> do
-- >       backend <- initializeBackend host port initRemoteTable
-- >       startMaster backend (master backend)
-- >     ["slave", host, port] -> do
-- >       backend <- initializeBackend host port initRemoteTable
-- >       startSlave backend
--
-- [Compiling and Running]
--
-- Save to @example.hs@ and compile using
--
-- > ghc -threaded example.hs
--
-- Fire up some slave nodes (for the example, we run them on a single machine):
--
-- > ./example slave localhost 8080 &
-- > ./example slave localhost 8081 &
-- > ./example slave localhost 8082 &
-- > ./example slave localhost 8083 &
--
-- And start the master node:
--
-- > ./example master localhost 8084
--
-- which should then output:
--
-- > Slaves: [nid://localhost:8083:0,nid://localhost:8082:0,nid://localhost:8081:0,nid://localhost:8080:0]
--
-- at which point the slaves should exit.
--
-- To run the example on multiple machines, you could run
--
-- > ./example slave 198.51.100.1 8080 &
-- > ./example slave 198.51.100.2 8080 &
-- > ./example slave 198.51.100.3 8080 &
-- > ./example slave 198.51.100.4 8080 &
--
-- on four different machines (with IP addresses 198.51.100.1..4), and run the
-- master on a fifth node (or on any of the four machines that run the slave
-- nodes).
--
-- It is important that every node has a unique (hostname, port number) pair,
-- and that the hostname you use to initialize the node can be resolved by
-- peer nodes. In other words, if you start a node and pass hostname @localhost@
-- then peer nodes won't be able to reach it because @localhost@ will resolve
-- to a different IP address for them.
--
-- [Troubleshooting]
--
-- If you try the above example and the master process cannot find any slaves,
-- then it might be that your firewall settings do not allow for UDP multicast
-- (in particular, the default iptables on some Linux distributions might not
-- allow it).
{-# OPTIONS_GHC -fno-warn-orphans #-}
module Control.Distributed.Process.Backend.SimpleLocalnet
  ( -- * Initialization
    Backend(..)
  , initializeBackend
    -- * Slave nodes
  , startSlave
  , terminateSlave
  , findSlaves
  , terminateAllSlaves
    -- * Master nodes
  , startMaster
  ) where

import System.IO (fixIO)
import Data.Maybe (catMaybes)
import Data.Binary (Binary(get, put), getWord8, putWord8)
import Data.Accessor (Accessor, accessor, (^:), (^.))
import Data.Set (Set)
import qualified Data.Set as Set (insert, empty, toList)
import Data.Foldable (forM_)
import Data.Typeable (Typeable)
import Control.Exception (throw)
import Control.Monad (forever, replicateM, replicateM_)
import Control.Monad.Catch (bracket, try, finally)
import Control.Monad.IO.Class (liftIO)
import Control.Concurrent (forkIO, threadDelay, ThreadId)
import Control.Concurrent.MVar (MVar, newMVar, readMVar, modifyMVar_)
import Control.Distributed.Process
  ( RemoteTable
  , NodeId
  , Process
  , ProcessId
  , WhereIsReply(..)
  , whereis
  , whereisRemoteAsync
  , getSelfPid
  , register
  , reregister
  , expect
  , nsendRemote
  , receiveWait
  , match
  , processNodeId
  , monitorNode
  , monitor
  , unmonitor
  , NodeMonitorNotification(..)
  , ProcessRegistrationException
  , newChan
  , receiveChan
  , nsend
  , SendPort
  , send
  )
import qualified Control.Distributed.Process.Node as Node
  ( LocalNode
  , newLocalNode
  , localNodeId
  , runProcess
  )
import qualified Network.Transport.TCP as NT
  ( createTransport
  , defaultTCPParameters
  , TCPAddr(Addressable)
  , TCPAddrInfo(TCPAddrInfo)
  )
import qualified Network.Transport as NT (Transport)
import qualified Network.Socket as N (HostName, ServiceName, SockAddr)
import Control.Distributed.Process.Backend.SimpleLocalnet.Internal.Multicast (initMulticast)

-- | Local backend
data Backend = Backend {
    -- | Create a new local node
    Backend -> IO LocalNode
newLocalNode :: IO Node.LocalNode
    -- | @findPeers t@ broadcasts a /who's there?/ message on the local
    -- network, waits 't' microseconds, and then collects and returns the answers.
    -- You can use this to dynamically discover peer nodes.
  , Backend -> Int -> IO [NodeId]
findPeers :: Int -> IO [NodeId]
    -- | Make sure that all log messages are printed by the logger on the
    -- current node
  , Backend -> [ProcessId] -> Process ()
redirectLogsHere :: [ProcessId] -> Process ()
  }

data BackendState = BackendState {
   BackendState -> [LocalNode]
_localNodes      :: [Node.LocalNode]
 , BackendState -> Set NodeId
_peers           :: Set NodeId
 ,  BackendState -> ThreadId
discoveryDaemon :: ThreadId
 }

-- | Initialize the backend
initializeBackend :: N.HostName -> N.ServiceName -> RemoteTable -> IO Backend
initializeBackend :: String -> String -> RemoteTable -> IO Backend
initializeBackend String
host String
port RemoteTable
rtable = do
  Either IOException Transport
mTransport   <- TCPAddr -> TCPParameters -> IO (Either IOException Transport)
NT.createTransport (TCPAddrInfo -> TCPAddr
NT.Addressable (TCPAddrInfo -> TCPAddr) -> TCPAddrInfo -> TCPAddr
forall a b. (a -> b) -> a -> b
$ String -> String -> (String -> (String, String)) -> TCPAddrInfo
NT.TCPAddrInfo String
host String
port (\String
sn -> (String
host, String
sn)))
                                     TCPParameters
NT.defaultTCPParameters
  (IO (PeerDiscoveryMsg, SockAddr)
recv, PeerDiscoveryMsg -> IO ()
sendp) <- String
-> PortNumber
-> Int
-> IO (IO (PeerDiscoveryMsg, SockAddr), PeerDiscoveryMsg -> IO ())
forall a.
Binary a =>
String -> PortNumber -> Int -> IO (IO (a, SockAddr), a -> IO ())
initMulticast  String
"224.0.0.99" PortNumber
9999 Int
1024
  (ThreadId
_, MVar BackendState
backendState) <- ((ThreadId, MVar BackendState) -> IO (ThreadId, MVar BackendState))
-> IO (ThreadId, MVar BackendState)
forall a. (a -> IO a) -> IO a
fixIO (((ThreadId, MVar BackendState)
  -> IO (ThreadId, MVar BackendState))
 -> IO (ThreadId, MVar BackendState))
-> ((ThreadId, MVar BackendState)
    -> IO (ThreadId, MVar BackendState))
-> IO (ThreadId, MVar BackendState)
forall a b. (a -> b) -> a -> b
$ \ ~(ThreadId
tid, MVar BackendState
_) -> do
    MVar BackendState
backendState <- BackendState -> IO (MVar BackendState)
forall a. a -> IO (MVar a)
newMVar BackendState
                      { _localNodes :: [LocalNode]
_localNodes      = []
                      , _peers :: Set NodeId
_peers           = Set NodeId
forall a. Set a
Set.empty
                      ,  discoveryDaemon :: ThreadId
discoveryDaemon = ThreadId
tid
                      }
    ThreadId
tid' <- IO () -> IO ThreadId
forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId
forall a b. (a -> b) -> a -> b
$ MVar BackendState
-> IO (PeerDiscoveryMsg, SockAddr)
-> (PeerDiscoveryMsg -> IO ())
-> IO ()
peerDiscoveryDaemon MVar BackendState
backendState IO (PeerDiscoveryMsg, SockAddr)
recv PeerDiscoveryMsg -> IO ()
sendp
    (ThreadId, MVar BackendState) -> IO (ThreadId, MVar BackendState)
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (ThreadId
tid', MVar BackendState
backendState)
  case Either IOException Transport
mTransport of
    Left IOException
err -> IOException -> IO Backend
forall a e. Exception e => e -> a
throw IOException
err
    Right Transport
transport ->
      let backend :: Backend
backend = Backend {
          newLocalNode :: IO LocalNode
newLocalNode       = Transport -> RemoteTable -> MVar BackendState -> IO LocalNode
apiNewLocalNode Transport
transport RemoteTable
rtable MVar BackendState
backendState
        , findPeers :: Int -> IO [NodeId]
findPeers          = (PeerDiscoveryMsg -> IO ())
-> MVar BackendState -> Int -> IO [NodeId]
apiFindPeers PeerDiscoveryMsg -> IO ()
sendp MVar BackendState
backendState
        , redirectLogsHere :: [ProcessId] -> Process ()
redirectLogsHere   = Backend -> [ProcessId] -> Process ()
apiRedirectLogsHere Backend
backend
        }
      in Backend -> IO Backend
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return Backend
backend

-- | Create a new local node
apiNewLocalNode :: NT.Transport
                -> RemoteTable
                -> MVar BackendState
                -> IO Node.LocalNode
apiNewLocalNode :: Transport -> RemoteTable -> MVar BackendState -> IO LocalNode
apiNewLocalNode Transport
transport RemoteTable
rtable MVar BackendState
backendState = do
  LocalNode
localNode <- Transport -> RemoteTable -> IO LocalNode
Node.newLocalNode Transport
transport RemoteTable
rtable
  MVar BackendState -> (BackendState -> IO BackendState) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar BackendState
backendState ((BackendState -> IO BackendState) -> IO ())
-> (BackendState -> IO BackendState) -> IO ()
forall a b. (a -> b) -> a -> b
$ BackendState -> IO BackendState
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (BackendState -> IO BackendState)
-> (BackendState -> BackendState)
-> BackendState
-> IO BackendState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (Accessor BackendState [LocalNode]
localNodes Accessor BackendState [LocalNode]
-> ([LocalNode] -> [LocalNode]) -> BackendState -> BackendState
forall r a. T r a -> (a -> a) -> r -> r
^: (LocalNode
localNode LocalNode -> [LocalNode] -> [LocalNode]
forall a. a -> [a] -> [a]
:))
  LocalNode -> IO LocalNode
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return LocalNode
localNode

-- | Peer discovery
apiFindPeers :: (PeerDiscoveryMsg -> IO ())
             -> MVar BackendState
             -> Int
             -> IO [NodeId]
apiFindPeers :: (PeerDiscoveryMsg -> IO ())
-> MVar BackendState -> Int -> IO [NodeId]
apiFindPeers PeerDiscoveryMsg -> IO ()
sendfn MVar BackendState
backendState Int
delay = do
  PeerDiscoveryMsg -> IO ()
sendfn PeerDiscoveryMsg
PeerDiscoveryRequest
  Int -> IO ()
threadDelay Int
delay
  Set NodeId -> [NodeId]
forall a. Set a -> [a]
Set.toList (Set NodeId -> [NodeId])
-> (BackendState -> Set NodeId) -> BackendState -> [NodeId]
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (BackendState -> T BackendState (Set NodeId) -> Set NodeId
forall r a. r -> T r a -> a
^. T BackendState (Set NodeId)
peers) (BackendState -> [NodeId]) -> IO BackendState -> IO [NodeId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVar BackendState -> IO BackendState
forall a. MVar a -> IO a
readMVar MVar BackendState
backendState

data PeerDiscoveryMsg =
    PeerDiscoveryRequest
  | PeerDiscoveryReply NodeId

instance Binary PeerDiscoveryMsg where
  put :: PeerDiscoveryMsg -> Put
put PeerDiscoveryMsg
PeerDiscoveryRequest     = Word8 -> Put
putWord8 Word8
0
  put (PeerDiscoveryReply NodeId
nid) = Word8 -> Put
putWord8 Word8
1 Put -> Put -> Put
forall a b. PutM a -> PutM b -> PutM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> NodeId -> Put
forall t. Binary t => t -> Put
put NodeId
nid
  get :: Get PeerDiscoveryMsg
get = do
    Word8
header <- Get Word8
getWord8
    case Word8
header of
      Word8
0 -> PeerDiscoveryMsg -> Get PeerDiscoveryMsg
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return PeerDiscoveryMsg
PeerDiscoveryRequest
      Word8
1 -> NodeId -> PeerDiscoveryMsg
PeerDiscoveryReply (NodeId -> PeerDiscoveryMsg) -> Get NodeId -> Get PeerDiscoveryMsg
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get NodeId
forall t. Binary t => Get t
get
      Word8
_ -> String -> Get PeerDiscoveryMsg
forall a. String -> Get a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"PeerDiscoveryMsg.get: invalid"

-- | Respond to peer discovery requests sent by other nodes
peerDiscoveryDaemon :: MVar BackendState
                    -> IO (PeerDiscoveryMsg, N.SockAddr)
                    -> (PeerDiscoveryMsg -> IO ())
                    -> IO ()
peerDiscoveryDaemon :: MVar BackendState
-> IO (PeerDiscoveryMsg, SockAddr)
-> (PeerDiscoveryMsg -> IO ())
-> IO ()
peerDiscoveryDaemon MVar BackendState
backendState IO (PeerDiscoveryMsg, SockAddr)
recv PeerDiscoveryMsg -> IO ()
sendfn = IO () -> IO ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever IO ()
go
  where
    go :: IO ()
go = do
      (PeerDiscoveryMsg
msg, SockAddr
_) <- IO (PeerDiscoveryMsg, SockAddr)
recv
      case PeerDiscoveryMsg
msg of
        PeerDiscoveryMsg
PeerDiscoveryRequest -> do
          [LocalNode]
nodes <- (BackendState -> Accessor BackendState [LocalNode] -> [LocalNode]
forall r a. r -> T r a -> a
^. Accessor BackendState [LocalNode]
localNodes) (BackendState -> [LocalNode]) -> IO BackendState -> IO [LocalNode]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> MVar BackendState -> IO BackendState
forall a. MVar a -> IO a
readMVar MVar BackendState
backendState
          [LocalNode] -> (LocalNode -> IO ()) -> IO ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [LocalNode]
nodes ((LocalNode -> IO ()) -> IO ()) -> (LocalNode -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ PeerDiscoveryMsg -> IO ()
sendfn (PeerDiscoveryMsg -> IO ())
-> (LocalNode -> PeerDiscoveryMsg) -> LocalNode -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. NodeId -> PeerDiscoveryMsg
PeerDiscoveryReply (NodeId -> PeerDiscoveryMsg)
-> (LocalNode -> NodeId) -> LocalNode -> PeerDiscoveryMsg
forall b c a. (b -> c) -> (a -> b) -> a -> c
. LocalNode -> NodeId
Node.localNodeId
        PeerDiscoveryReply NodeId
nid ->
          MVar BackendState -> (BackendState -> IO BackendState) -> IO ()
forall a. MVar a -> (a -> IO a) -> IO ()
modifyMVar_ MVar BackendState
backendState ((BackendState -> IO BackendState) -> IO ())
-> (BackendState -> IO BackendState) -> IO ()
forall a b. (a -> b) -> a -> b
$ BackendState -> IO BackendState
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (BackendState -> IO BackendState)
-> (BackendState -> BackendState)
-> BackendState
-> IO BackendState
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (T BackendState (Set NodeId)
peers T BackendState (Set NodeId)
-> (Set NodeId -> Set NodeId) -> BackendState -> BackendState
forall r a. T r a -> (a -> a) -> r -> r
^: NodeId -> Set NodeId -> Set NodeId
forall a. Ord a => a -> Set a -> Set a
Set.insert NodeId
nid)

--------------------------------------------------------------------------------
-- Back-end specific primitives                                               --
--------------------------------------------------------------------------------

-- | Make sure that all log messages are printed by the logger on this node
apiRedirectLogsHere :: Backend -> [ProcessId] -> Process ()
apiRedirectLogsHere :: Backend -> [ProcessId] -> Process ()
apiRedirectLogsHere Backend
_backend [ProcessId]
slavecontrollers = do
  Maybe ProcessId
mLogger <- String -> Process (Maybe ProcessId)
whereis String
"logger"
  ProcessId
myPid <- Process ProcessId
getSelfPid

  Maybe ProcessId -> (ProcessId -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ Maybe ProcessId
mLogger ((ProcessId -> Process ()) -> Process ())
-> (ProcessId -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \ProcessId
logger -> do
    Process [MonitorRef]
-> ([MonitorRef] -> Process [()])
-> ([MonitorRef] -> Process ())
-> Process ()
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket
      ((ProcessId -> Process MonitorRef)
-> [ProcessId] -> Process [MonitorRef]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM ProcessId -> Process MonitorRef
monitor [ProcessId]
slavecontrollers)
      ((MonitorRef -> Process ()) -> [MonitorRef] -> Process [()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM MonitorRef -> Process ()
unmonitor)
      (([MonitorRef] -> Process ()) -> Process ())
-> ([MonitorRef] -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \[MonitorRef]
_ -> do

      -- fire off redirect requests
      [ProcessId] -> (ProcessId -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ProcessId]
slavecontrollers ((ProcessId -> Process ()) -> Process ())
-> (ProcessId -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \ProcessId
pid -> ProcessId -> SlaveControllerMsg -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
pid (ProcessId -> ProcessId -> SlaveControllerMsg
RedirectLogsTo ProcessId
logger ProcessId
myPid)

      -- Wait for the replies
      Int -> Process () -> Process ()
forall (m :: * -> *) a. Applicative m => Int -> m a -> m ()
replicateM_ ([ProcessId] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [ProcessId]
slavecontrollers) (Process () -> Process ()) -> Process () -> Process ()
forall a b. (a -> b) -> a -> b
$ do
        [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
          [ (RedirectLogsReply -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(RedirectLogsReply {}) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
          , (NodeMonitorNotification -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(NodeMonitorNotification {}) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
          ]

--------------------------------------------------------------------------------
-- Slaves                                                                     --
--------------------------------------------------------------------------------

-- | Messages to slave nodes
--
-- This datatype is not exposed; instead, we expose primitives for dealing
-- with slaves.
data SlaveControllerMsg
   = SlaveTerminate
   | RedirectLogsTo ProcessId ProcessId
  deriving (Typeable, Int -> SlaveControllerMsg -> ShowS
[SlaveControllerMsg] -> ShowS
SlaveControllerMsg -> String
(Int -> SlaveControllerMsg -> ShowS)
-> (SlaveControllerMsg -> String)
-> ([SlaveControllerMsg] -> ShowS)
-> Show SlaveControllerMsg
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> SlaveControllerMsg -> ShowS
showsPrec :: Int -> SlaveControllerMsg -> ShowS
$cshow :: SlaveControllerMsg -> String
show :: SlaveControllerMsg -> String
$cshowList :: [SlaveControllerMsg] -> ShowS
showList :: [SlaveControllerMsg] -> ShowS
Show)

instance Binary SlaveControllerMsg where
  put :: SlaveControllerMsg -> Put
put SlaveControllerMsg
SlaveTerminate = Word8 -> Put
putWord8 Word8
0
  put (RedirectLogsTo ProcessId
a ProcessId
b) = do Word8 -> Put
putWord8 Word8
1; (ProcessId, ProcessId) -> Put
forall t. Binary t => t -> Put
put (ProcessId
a,ProcessId
b)
  get :: Get SlaveControllerMsg
get = do
    Word8
header <- Get Word8
getWord8
    case Word8
header of
      Word8
0 -> SlaveControllerMsg -> Get SlaveControllerMsg
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return SlaveControllerMsg
SlaveTerminate
      Word8
1 -> do (ProcessId
a,ProcessId
b) <- Get (ProcessId, ProcessId)
forall t. Binary t => Get t
get; SlaveControllerMsg -> Get SlaveControllerMsg
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessId -> ProcessId -> SlaveControllerMsg
RedirectLogsTo ProcessId
a ProcessId
b)
      Word8
_ -> String -> Get SlaveControllerMsg
forall a. String -> Get a
forall (m :: * -> *) a. MonadFail m => String -> m a
fail String
"SlaveControllerMsg.get: invalid"

data RedirectLogsReply
  = RedirectLogsReply ProcessId Bool
  deriving (Typeable, Int -> RedirectLogsReply -> ShowS
[RedirectLogsReply] -> ShowS
RedirectLogsReply -> String
(Int -> RedirectLogsReply -> ShowS)
-> (RedirectLogsReply -> String)
-> ([RedirectLogsReply] -> ShowS)
-> Show RedirectLogsReply
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> RedirectLogsReply -> ShowS
showsPrec :: Int -> RedirectLogsReply -> ShowS
$cshow :: RedirectLogsReply -> String
show :: RedirectLogsReply -> String
$cshowList :: [RedirectLogsReply] -> ShowS
showList :: [RedirectLogsReply] -> ShowS
Show)

instance Binary RedirectLogsReply where
  put :: RedirectLogsReply -> Put
put (RedirectLogsReply ProcessId
from Bool
ok) = (ProcessId, Bool) -> Put
forall t. Binary t => t -> Put
put (ProcessId
from,Bool
ok)
  get :: Get RedirectLogsReply
get = do
    (ProcessId
from,Bool
ok) <- Get (ProcessId, Bool)
forall t. Binary t => Get t
get
    RedirectLogsReply -> Get RedirectLogsReply
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return (ProcessId -> Bool -> RedirectLogsReply
RedirectLogsReply ProcessId
from Bool
ok)

-- | Calling 'slave' sets up a new local node and then waits. You start
-- processes on the slave by calling 'spawn' from other nodes.
--
-- This function does not return. The only way to exit the slave is to CTRL-C
-- the process or call terminateSlave from another node.
startSlave :: Backend -> IO ()
startSlave :: Backend -> IO ()
startSlave Backend
backend = do
  LocalNode
node <- Backend -> IO LocalNode
newLocalNode Backend
backend
  LocalNode -> Process () -> IO ()
Node.runProcess LocalNode
node Process ()
slaveController

-- | The slave controller interprets 'SlaveControllerMsg's
slaveController :: Process ()
slaveController :: Process ()
slaveController = do
    ProcessId
pid <- Process ProcessId
getSelfPid
    String -> ProcessId -> Process ()
register String
"slaveController" ProcessId
pid
    Process ()
go
  where
    go :: Process ()
go = do
      SlaveControllerMsg
msg <- Process SlaveControllerMsg
forall a. Serializable a => Process a
expect
      case SlaveControllerMsg
msg of
        SlaveControllerMsg
SlaveTerminate -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        RedirectLogsTo ProcessId
loggerPid ProcessId
from -> do
          Either ProcessRegistrationException ()
r <- Process () -> Process (Either ProcessRegistrationException ())
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (String -> ProcessId -> Process ()
reregister String
"logger" ProcessId
loggerPid)
          Bool
ok <- case (Either ProcessRegistrationException ()
r :: Either ProcessRegistrationException ()) of
                  Right ()
_ -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                  Left ProcessRegistrationException
_  -> do
                    Either ProcessRegistrationException ()
s <- Process () -> Process (Either ProcessRegistrationException ())
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (String -> ProcessId -> Process ()
register String
"logger" ProcessId
loggerPid)
                    case (Either ProcessRegistrationException ()
s :: Either ProcessRegistrationException ()) of
                      Right ()
_ -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
True
                      Left ProcessRegistrationException
_  -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Bool
False
          ProcessId
pid <- Process ProcessId
getSelfPid
          ProcessId -> RedirectLogsReply -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
from (ProcessId -> Bool -> RedirectLogsReply
RedirectLogsReply ProcessId
pid Bool
ok)
          Process ()
go

-- | Terminate the slave at the given node ID
terminateSlave :: NodeId -> Process ()
terminateSlave :: NodeId -> Process ()
terminateSlave NodeId
nid = NodeId -> String -> SlaveControllerMsg -> Process ()
forall a. Serializable a => NodeId -> String -> a -> Process ()
nsendRemote NodeId
nid String
"slaveController" SlaveControllerMsg
SlaveTerminate

-- | Find slave nodes
findSlaves :: Backend -> Process [ProcessId]
findSlaves :: Backend -> Process [ProcessId]
findSlaves Backend
backend = do
  [NodeId]
nodes <- IO [NodeId] -> Process [NodeId]
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO [NodeId] -> Process [NodeId])
-> IO [NodeId] -> Process [NodeId]
forall a b. (a -> b) -> a -> b
$ Backend -> Int -> IO [NodeId]
findPeers Backend
backend Int
1000000
  -- Fire off asynchronous requests for the slave controller

  Process [MonitorRef]
-> ([MonitorRef] -> Process [()])
-> ([MonitorRef] -> Process [ProcessId])
-> Process [ProcessId]
forall (m :: * -> *) a c b.
(HasCallStack, MonadMask m) =>
m a -> (a -> m c) -> (a -> m b) -> m b
bracket
   ((NodeId -> Process MonitorRef) -> [NodeId] -> Process [MonitorRef]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM NodeId -> Process MonitorRef
monitorNode [NodeId]
nodes)
   ((MonitorRef -> Process ()) -> [MonitorRef] -> Process [()]
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b. Monad m => (a -> m b) -> [a] -> m [b]
mapM MonitorRef -> Process ()
unmonitor)
   (([MonitorRef] -> Process [ProcessId]) -> Process [ProcessId])
-> ([MonitorRef] -> Process [ProcessId]) -> Process [ProcessId]
forall a b. (a -> b) -> a -> b
$ \[MonitorRef]
_ -> do

   -- fire off whereis requests
   [NodeId] -> (NodeId -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [NodeId]
nodes ((NodeId -> Process ()) -> Process ())
-> (NodeId -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \NodeId
nid -> NodeId -> String -> Process ()
whereisRemoteAsync NodeId
nid String
"slaveController"

   -- Wait for the replies
   [Maybe ProcessId] -> [ProcessId]
forall a. [Maybe a] -> [a]
catMaybes ([Maybe ProcessId] -> [ProcessId])
-> Process [Maybe ProcessId] -> Process [ProcessId]
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Int -> Process (Maybe ProcessId) -> Process [Maybe ProcessId]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM ([NodeId] -> Int
forall a. [a] -> Int
forall (t :: * -> *) a. Foldable t => t a -> Int
length [NodeId]
nodes) (
     [Match (Maybe ProcessId)] -> Process (Maybe ProcessId)
forall b. [Match b] -> Process b
receiveWait
       [ (WhereIsReply -> Process (Maybe ProcessId))
-> Match (Maybe ProcessId)
forall a b. Serializable a => (a -> Process b) -> Match b
match WhereIsReply -> Process (Maybe ProcessId)
handleWhereIsReply
       , (NodeMonitorNotification -> Process (Maybe ProcessId))
-> Match (Maybe ProcessId)
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(NodeMonitorNotification {}) -> Maybe ProcessId -> Process (Maybe ProcessId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ProcessId
forall a. Maybe a
Nothing)
       ])
  where
    handleWhereIsReply :: WhereIsReply -> Process (Maybe ProcessId)
    handleWhereIsReply :: WhereIsReply -> Process (Maybe ProcessId)
handleWhereIsReply (WhereIsReply String
name Maybe ProcessId
mPid)
      | String
name String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
"slaveController" = Maybe ProcessId -> Process (Maybe ProcessId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ProcessId
mPid
      | Bool
otherwise                 = Maybe ProcessId -> Process (Maybe ProcessId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ProcessId
forall a. Maybe a
Nothing

-- | Terminate all slaves
terminateAllSlaves :: Backend -> Process ()
terminateAllSlaves :: Backend -> Process ()
terminateAllSlaves Backend
backend = do
  [ProcessId]
slaves <- Backend -> Process [ProcessId]
findSlaves Backend
backend
  [ProcessId] -> (ProcessId -> Process ()) -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
t a -> (a -> m b) -> m ()
forM_ [ProcessId]
slaves ((ProcessId -> Process ()) -> Process ())
-> (ProcessId -> Process ()) -> Process ()
forall a b. (a -> b) -> a -> b
$ \ProcessId
pid -> ProcessId -> SlaveControllerMsg -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
pid SlaveControllerMsg
SlaveTerminate
  IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
1000000

--------------------------------------------------------------------------------
-- Master nodes
--------------------------------------------------------------------------------

-- | 'startMaster' finds all slaves /currently/ available on the local network,
-- redirects all log messages to itself, and then calls the specified process,
-- passing the list of slaves nodes.
--
-- Terminates when the specified process terminates. If you want to terminate
-- the slaves when the master terminates, you should manually call
-- 'terminateAllSlaves'.
--
-- If you start more slave nodes after having started the master node, you can
-- discover them with later calls to 'findSlaves', but be aware that you will
-- need to call 'redirectLogHere' to redirect their logs to the master node.
--
-- Note that you can use functionality of "SimpleLocalnet" directly (through
-- 'Backend'), instead of using 'startMaster'/'startSlave', if the master/slave
-- distinction does not suit your application.
startMaster :: Backend -> ([NodeId] -> Process ()) -> IO ()
startMaster :: Backend -> ([NodeId] -> Process ()) -> IO ()
startMaster Backend
backend [NodeId] -> Process ()
proc = do
  LocalNode
node <- Backend -> IO LocalNode
newLocalNode Backend
backend
  LocalNode -> Process () -> IO ()
Node.runProcess LocalNode
node (Process () -> IO ()) -> Process () -> IO ()
forall a b. (a -> b) -> a -> b
$ do
    [ProcessId]
slaves <- Backend -> Process [ProcessId]
findSlaves Backend
backend
    Backend -> [ProcessId] -> Process ()
redirectLogsHere Backend
backend [ProcessId]
slaves
    [NodeId] -> Process ()
proc ((ProcessId -> NodeId) -> [ProcessId] -> [NodeId]
forall a b. (a -> b) -> [a] -> [b]
map ProcessId -> NodeId
processNodeId [ProcessId]
slaves) Process () -> Process () -> Process ()
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
`finally` Process ()
shutdownLogger

--
-- | shut down the logger process. This ensures that any pending
-- messages are flushed before the process exits.
--
shutdownLogger :: Process ()
shutdownLogger :: Process ()
shutdownLogger = do
  (SendPort ()
sport,ReceivePort ()
rport) <- Process (SendPort (), ReceivePort ())
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
  String -> SendPort () -> Process ()
forall a. Serializable a => String -> a -> Process ()
nsend String
"logger" (SendPort ()
sport :: SendPort ())
  ReceivePort () -> Process ()
forall a. Serializable a => ReceivePort a -> Process a
receiveChan ReceivePort ()
rport
  -- TODO: we should monitor the logger process so we don't deadlock if
  -- it has already died.

--------------------------------------------------------------------------------
-- Accessors                                                                  --
--------------------------------------------------------------------------------

localNodes :: Accessor BackendState [Node.LocalNode]
localNodes :: Accessor BackendState [LocalNode]
localNodes = (BackendState -> [LocalNode])
-> ([LocalNode] -> BackendState -> BackendState)
-> Accessor BackendState [LocalNode]
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor BackendState -> [LocalNode]
_localNodes (\[LocalNode]
ns BackendState
st -> BackendState
st { _localNodes = ns })

peers :: Accessor BackendState (Set NodeId)
peers :: T BackendState (Set NodeId)
peers = (BackendState -> Set NodeId)
-> (Set NodeId -> BackendState -> BackendState)
-> T BackendState (Set NodeId)
forall r a. (r -> a) -> (a -> r -> r) -> Accessor r a
accessor BackendState -> Set NodeId
_peers (\Set NodeId
ps BackendState
st -> BackendState
st { _peers = ps })