{-# LANGUAGE OverloadedStrings, RecordWildCards, DeriveDataTypeable #-}

-- | Peer-to-peer node discovery backend for Cloud Haskell based on the TCP
-- transport. Provided with a known node address it discovers and maintains
-- the knowledge of it's peers.
--
-- > import qualified Control.Distributed.Backend.P2P as P2P
-- > import           Control.Monad.Trans (liftIO)
-- > import           Control.Concurrent (threadDelay)
-- >
-- > main = P2P.bootstrap "myhostname" "9001" [P2P.makeNodeId "seedhost:9000"] $ do
-- >     liftIO $ threadDelay 1000000 -- give dispatcher a second to discover other nodes
-- >     P2P.nsendPeers "myService" ("some", "message")

module Control.Distributed.Backend.P2P (
    bootstrap,
    makeNodeId,
    getPeers,
    getCapable,
    nsendPeers,
    nsendCapable
) where

import Control.Distributed.Process                as DP
import Control.Distributed.Process.Node           as DPN
import Control.Distributed.Process.Internal.Types as DPT
import Control.Distributed.Process.Serializable (Serializable)
import Network.Transport (EndPointAddress(..))
import Network.Transport.TCP (createTransport, defaultTCPParameters)

import Control.Monad
import Control.Applicative
import Control.Monad.Trans
import Control.Concurrent.MVar

import qualified Data.ByteString.Char8 as BS
import qualified Data.ByteString.Lazy.Char8 as BSL
import qualified Data.Set as S
import Data.Typeable
import Data.Binary
import Data.Maybe (isJust)

-- * Peer-to-peer API

peerControllerService = "P2P:Controller"

type Peers = S.Set ProcessId

data PeerState = PeerState { p2pPeers :: MVar Peers }

initPeerState :: Process PeerState
initPeerState = do
    self <- getSelfPid
    peers <- liftIO $ newMVar (S.singleton self)
    register peerControllerService self
    return $! PeerState peers

-- ** Initialization

-- | Make a NodeId from "host:port" string.
makeNodeId :: String -> NodeId
makeNodeId addr = NodeId . EndPointAddress . BS.concat $ [BS.pack addr, ":0"]

-- | Start a controller service process and aquire connections to a swarm.
bootstrap :: String -> String -> [NodeId] -> Process () -> IO ()
bootstrap host port seeds proc = do
    transport <- either (error . show) id `fmap` createTransport host port defaultTCPParameters
    node <- newLocalNode transport initRemoteTable

    pcPid <- forkProcess node $ do
        state <- initPeerState
        mapM_ doDiscover seeds
        say "P2P controller started."
        forever $ receiveWait [ matchIf isPeerDiscover $ onDiscover state
                              , match $ onMonitor state
                              , match $ onPeerRequest state
                              , match $ onPeerQuery state
                              , match $ onPeerCapable
                              ]

    runProcess node proc

-- ** Discovery

doDiscover :: NodeId -> Process ()
doDiscover node = do
    say $ "Examining node: " ++ show node
    whereisRemoteAsync node peerControllerService

doRegister :: PeerState -> ProcessId -> Process ()
doRegister state@PeerState{..} pid = do
    pids <- liftIO $ takeMVar p2pPeers
    if S.member pid pids
        then liftIO $ putMVar p2pPeers pids
        else do
            say $ "Registering peer:" ++ show pid
            monitor pid

            liftIO $ putMVar p2pPeers (S.insert pid pids)
            say $ "New node: " ++ show pid
            doDiscover $ processNodeId pid

doUnregister :: PeerState -> Maybe MonitorRef -> ProcessId -> Process ()
doUnregister PeerState{..} mref pid = do
    say $ "Unregistering peer: " ++ show pid
    maybe (return ()) unmonitor mref
    peers <- liftIO $ takeMVar p2pPeers
    liftIO $ putMVar p2pPeers (S.delete pid peers)

isPeerDiscover :: WhereIsReply -> Bool
isPeerDiscover (WhereIsReply service pid) =
    service == peerControllerService && isJust pid

onDiscover :: PeerState -> WhereIsReply -> Process ()
onDiscover state (WhereIsReply _ (Just seedPid)) = do
    say $ "Peer discovered: " ++ show seedPid

    (sp, rp) <- newChan
    self <- getSelfPid
    send seedPid (self, sp :: SendPort Peers)
    say $ "Waiting for peers..."
    peers <- receiveChan rp

    known <- liftIO $ readMVar (p2pPeers state)
    mapM_ (doRegister state) (S.toList $ S.difference known peers)

onPeerRequest :: PeerState -> (ProcessId, SendPort Peers) -> Process ()
onPeerRequest PeerState{..} (peer, replyTo) = do
    say $ "Peer exchange with " ++ show peer
    peers <- liftIO $ takeMVar p2pPeers
    if S.member peer peers
        then liftIO $ putMVar p2pPeers peers
        else do
            monitor peer
            liftIO $ putMVar p2pPeers (S.insert peer peers)

    sendChan replyTo peers

onPeerQuery :: PeerState -> SendPort Peers -> Process ()
onPeerQuery PeerState{..} replyTo = do
    say $ "Local peer query."
    liftIO (readMVar p2pPeers) >>= sendChan replyTo

onPeerCapable :: (String, SendPort ProcessId) -> Process ()
onPeerCapable (service, replyTo) = do
    say $ "Capability request: " ++ service
    res <- whereis service
    case res of
        Nothing -> say "I can't."
        Just pid -> say "I can!" >> sendChan replyTo pid

onMonitor :: PeerState -> ProcessMonitorNotification -> Process ()
onMonitor state (ProcessMonitorNotification mref pid reason) = do
    say $ "Monitor event: " ++ show (pid, reason)
    doUnregister state (Just mref) pid

-- ** Discovery

-- | Get a list of currently available peer nodes.
getPeers :: Process [NodeId]
getPeers = do
    say $ "Requesting peer list from local controller..."
    (sp, rp) <- newChan
    nsend peerControllerService (sp :: SendPort Peers)
    receiveChan rp >>= return . map processNodeId . S.toList

-- | Poll a network for a list of specific service providers.
getCapable :: String -> Process [ProcessId]
getCapable service = do
    (sp, rp) <- newChan
    nsendPeers peerControllerService (service, sp)
    say "Waiting for capable nodes..."
    go rp []

    where go rp acc = do res <- receiveChanTimeout 100000 rp
                         case res of Just pid -> say "cap hit" >> go rp (pid:acc)
                                     Nothing -> say "cap done" >> return acc

-- ** Messaging

-- | Broadcast a message to a specific service on all peers.
nsendPeers :: Serializable a => String -> a -> Process ()
nsendPeers service msg = getPeers >>= mapM_ (\peer -> nsendRemote peer service msg)

-- | Broadcast a message to a service of on nodes currently running it.
nsendCapable :: Serializable a => String -> a -> Process ()
nsendCapable service msg = getCapable service >>= mapM_ (\pid -> send pid msg)