module Control.Distributed.Backend.P2P (
bootstrap,
makeNodeId,
getPeers,
getCapable,
nsendPeers,
nsendCapable
) where
import Control.Distributed.Process as DP
import Control.Distributed.Process.Node as DPN
import Control.Distributed.Process.Internal.Types as DPT
import Control.Distributed.Process.Serializable (Serializable)
import Network.Transport (EndPointAddress(..))
import Network.Transport.TCP (createTransport, defaultTCPParameters)
import Control.Monad
import Control.Applicative
import Control.Monad.Trans
import Control.Concurrent.MVar
import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy.Char8 as BSL
import qualified Data.Set as S
import Data.Typeable
import Data.Binary
import Data.Maybe (isJust)
peerControllerService = "P2P:Controller"
type Peers = S.Set ProcessId
data PeerState = PeerState { p2pPeers :: MVar Peers }
initPeerState :: Process PeerState
initPeerState = do
self <- getSelfPid
peers <- liftIO $ newMVar (S.singleton self)
register peerControllerService self
return $! PeerState peers
makeNodeId :: String -> NodeId
makeNodeId addr = NodeId . EndPointAddress . BS.concat $ [BS.pack addr, ":0"]
bootstrap :: String -> String -> [NodeId] -> Process () -> IO ()
bootstrap host port seeds proc = do
transport <- either (error . show) id `fmap` createTransport host port defaultTCPParameters
node <- newLocalNode transport initRemoteTable
pcPid <- forkProcess node $ do
state <- initPeerState
mapM_ doDiscover seeds
say "P2P controller started."
forever $ receiveWait [ matchIf isPeerDiscover $ onDiscover state
, match $ onMonitor state
, match $ onPeerRequest state
, match $ onPeerQuery state
, match $ onPeerCapable
]
runProcess node proc
doDiscover :: NodeId -> Process ()
doDiscover node = do
say $ "Examining node: " ++ show node
whereisRemoteAsync node peerControllerService
doRegister :: PeerState -> ProcessId -> Process ()
doRegister state@PeerState{..} pid = do
pids <- liftIO $ takeMVar p2pPeers
if S.member pid pids
then liftIO $ putMVar p2pPeers pids
else do
say $ "Registering peer:" ++ show pid
monitor pid
liftIO $ putMVar p2pPeers (S.insert pid pids)
say $ "New node: " ++ show pid
doDiscover $ processNodeId pid
doUnregister :: PeerState -> Maybe MonitorRef -> ProcessId -> Process ()
doUnregister PeerState{..} mref pid = do
say $ "Unregistering peer: " ++ show pid
maybe (return ()) unmonitor mref
peers <- liftIO $ takeMVar p2pPeers
liftIO $ putMVar p2pPeers (S.delete pid peers)
isPeerDiscover :: WhereIsReply -> Bool
isPeerDiscover (WhereIsReply service pid) =
service == peerControllerService && isJust pid
onDiscover :: PeerState -> WhereIsReply -> Process ()
onDiscover state (WhereIsReply _ (Just seedPid)) = do
say $ "Peer discovered: " ++ show seedPid
(sp, rp) <- newChan
self <- getSelfPid
send seedPid (self, sp :: SendPort Peers)
say $ "Waiting for peers..."
peers <- receiveChan rp
known <- liftIO $ readMVar (p2pPeers state)
mapM_ (doRegister state) (S.toList $ S.difference known peers)
onPeerRequest :: PeerState -> (ProcessId, SendPort Peers) -> Process ()
onPeerRequest PeerState{..} (peer, replyTo) = do
say $ "Peer exchange with " ++ show peer
peers <- liftIO $ takeMVar p2pPeers
if S.member peer peers
then liftIO $ putMVar p2pPeers peers
else do
monitor peer
liftIO $ putMVar p2pPeers (S.insert peer peers)
sendChan replyTo peers
onPeerQuery :: PeerState -> SendPort Peers -> Process ()
onPeerQuery PeerState{..} replyTo = do
say $ "Local peer query."
liftIO (readMVar p2pPeers) >>= sendChan replyTo
onPeerCapable :: (String, SendPort ProcessId) -> Process ()
onPeerCapable (service, replyTo) = do
say $ "Capability request: " ++ service
res <- whereis service
case res of
Nothing -> say "I can't."
Just pid -> say "I can!" >> sendChan replyTo pid
onMonitor :: PeerState -> ProcessMonitorNotification -> Process ()
onMonitor state (ProcessMonitorNotification mref pid reason) = do
say $ "Monitor event: " ++ show (pid, reason)
doUnregister state (Just mref) pid
getPeers :: Process [NodeId]
getPeers = do
say $ "Requesting peer list from local controller..."
(sp, rp) <- newChan
nsend peerControllerService (sp :: SendPort Peers)
receiveChan rp >>= return . map processNodeId . S.toList
getCapable :: String -> Process [ProcessId]
getCapable service = do
(sp, rp) <- newChan
nsendPeers peerControllerService (service, sp)
say "Waiting for capable nodes..."
go rp []
where go rp acc = do res <- receiveChanTimeout 100000 rp
case res of Just pid -> say "cap hit" >> go rp (pid:acc)
Nothing -> say "cap done" >> return acc
nsendPeers :: Serializable a => String -> a -> Process ()
nsendPeers service msg = getPeers >>= mapM_ (\peer -> nsendRemote peer service msg)
nsendCapable :: Serializable a => String -> a -> Process ()
nsendCapable service msg = getCapable service >>= mapM_ (\pid -> send pid msg)