module Network.Kademlia.Instance
( KademliaInstance(..)
, KademliaState(..)
, start
, newInstance
, insertNode
, lookupNode
, dumpPeers
) where
import Control.Concurrent
import Control.Concurrent.Chan
import Control.Concurrent.STM
import Control.Monad (void, forever, when, join, forM_, forever)
import Control.Monad.Trans
import Control.Monad.Trans.State hiding (state)
import Control.Monad.Trans.Reader
import Control.Monad.IO.Class (liftIO)
import Control.Applicative ((<$>), (<*>))
import System.IO.Error (catchIOError)
import qualified Data.Map as M
import Data.Maybe (catMaybes, isJust, fromJust)
import Data.Function (on)
import Data.Map (toList)
import Network.Kademlia.Networking
import qualified Network.Kademlia.Tree as T
import Network.Kademlia.Types
import Network.Kademlia.ReplyQueue
data KademliaInstance i a = KI {
handle :: KademliaHandle i a
, state :: KademliaState i a
, expirationThreads :: TVar (M.Map i ThreadId)
}
data KademliaState i a = KS {
sTree :: TVar (T.NodeTree i)
, values :: TVar (M.Map i a)
}
newInstance :: (Serialize i) =>
i -> KademliaHandle i a -> IO (KademliaInstance i a)
newInstance id handle = do
tree <- atomically . newTVar . T.create $ id
values <- atomically . newTVar $ M.empty
threads <- atomically . newTVar $ M.empty
return . KI handle (KS tree values) $ threads
insertNode :: (Serialize i, Ord i) => KademliaInstance i a -> Node i -> IO ()
insertNode (KI _ (KS sTree _) _) node = atomically $ do
tree <- readTVar sTree
writeTVar sTree . T.insert tree $ node
timeoutNode :: (Serialize i, Ord i) => KademliaInstance i a -> i -> IO Bool
timeoutNode (KI _ (KS sTree _) _) id = atomically $ do
tree <- readTVar sTree
let (newTree, pingAgain) = T.handleTimeout tree id
writeTVar sTree newTree
return pingAgain
lookupNode :: (Serialize i, Ord i) => KademliaInstance i a -> i -> IO (Maybe (Node i))
lookupNode (KI _ (KS sTree _) _) id = atomically $ do
tree <- readTVar sTree
return . T.lookup tree $ id
dumpPeers :: KademliaInstance i a -> IO [Node i]
dumpPeers (KI _ (KS sTree _) _) = atomically $ do
tree <- readTVar sTree
return . T.toList $ tree
insertValue :: (Ord i) => i -> a -> KademliaInstance i a -> IO ()
insertValue key value (KI _ (KS _ values) _) = atomically $ do
vals <- readTVar values
writeTVar values $ M.insert key value vals
deleteValue :: (Ord i) => i -> KademliaInstance i a -> IO ()
deleteValue key (KI _ (KS _ values) _) = atomically $ do
vals <- readTVar values
writeTVar values $ M.delete key vals
lookupValue :: (Ord i) => i -> KademliaInstance i a -> IO (Maybe a)
lookupValue key (KI _ (KS _ values) _) = atomically $ do
vals <- readTVar values
return . M.lookup key $ vals
start :: (Serialize i, Ord i, Serialize a, Eq i, Eq a) =>
KademliaInstance i a -> ReplyQueue i a -> IO ()
start inst rq = do
startRecvProcess . handle $ inst
let rChan = timeoutChan rq
dChan = defaultChan rq
receivingId <- forkIO . receivingProcess inst rq rChan $ dChan
pingId <- forkIO . pingProcess inst $ dChan
spreadId <- forkIO . spreadValueProcess $ inst
void . forkIO $ backgroundProcess inst dChan [pingId, spreadId, receivingId]
receivingProcess :: (Serialize i, Serialize a, Eq i, Ord i) =>
KademliaInstance i a -> ReplyQueue i a -> Chan (Reply i a)
-> Chan (Reply i a)-> IO ()
receivingProcess inst rq replyChan registerChan = forever $ do
reply <- readChan replyChan
case reply of
Timeout registration -> do
let origin = replyOrigin registration
h = handle inst
newRegistration = registration { replyTypes = [R_PONG] }
pingAgain <- timeoutNode inst origin
when pingAgain $ do
result <- lookupNode inst origin
case result of
Nothing -> return ()
Just node -> do
send h (peer node) PING
expect h newRegistration registerChan
Answer (Signal node cmd) -> do
let originId = nodeId node
tree <- retrieve sTree
when (not . isJust . T.lookup tree $ originId) $ do
let closestKnown = T.findClosest tree originId 1
ownId = T.extractId tree
self = node { nodeId = ownId }
bucket = self:closestKnown
closestId = nodeId . head . sortByDistanceTo bucket $ originId
when (ownId == closestId) $ do
storedValues <- toList <$> retrieve values
let h = handle inst
p = peer node
forM_ storedValues (send h p . uncurry STORE)
case cmd of
(RETURN_NODES _ nodes) -> forM_ nodes $ \node -> do
result <- lookupNode inst . nodeId $ node
case result of
Nothing -> send (handle inst) (peer node) PING
_ -> return ()
_ -> return ()
_ -> return ()
dispatch reply rq
where retrieve f = atomically . readTVar . f . state $ inst
backgroundProcess :: (Serialize i, Ord i, Serialize a, Eq i, Eq a) =>
KademliaInstance i a -> Chan (Reply i a) -> [ThreadId] -> IO ()
backgroundProcess inst chan threadIds = do
reply <- liftIO . readChan $ chan
case reply of
Answer sig -> do
let node = source sig
handleCommand (command sig) (peer node) inst
insertNode inst node
backgroundProcess inst chan threadIds
Closed -> do
mapM_ killThread threadIds
eThreads <- atomically . readTVar . expirationThreads $ inst
mapM_ killThread $ map snd (M.toList eThreads)
_ -> return ()
pingProcess :: (Serialize i, Serialize a, Eq i) => KademliaInstance i a
-> Chan (Reply i a) -> IO ()
pingProcess (KI h (KS sTree _) _) chan = forever $ do
threadDelay fiveMinutes
tree <- atomically . readTVar $ sTree
forM_ (T.toList tree) $ \node -> do
send h (peer node) PING
expect h (RR [R_PONG] (nodeId node)) $ chan
where fiveMinutes = 300000000
spreadValueProcess :: (Serialize i, Serialize a, Eq i) => KademliaInstance i a
-> IO ()
spreadValueProcess (KI h (KS sTree sValues) _) = forever $ do
threadDelay hour
values <- atomically . readTVar $ sValues
tree <- atomically . readTVar $ sTree
mapMWithKey (sendRequests tree) $ values
where hour = 60 * 60 * 1000000
sendRequests tree key val = do
let closest = T.findClosest tree key 7
forM_ closest $ \node -> send h (peer node) (STORE key val)
mapMWithKey :: (k -> v -> IO a) -> M.Map k v -> IO [a]
mapMWithKey f m = sequence . map snd . M.toList . M.mapWithKey f $ m
expirationProcess :: (Ord i) => KademliaInstance i a -> i -> IO ()
expirationProcess inst@(KI _ _ valueTs) key = do
myTId <- myThreadId
oldTId <- atomically $ do
threadIds <- readTVar valueTs
writeTVar valueTs $ M.insert key myTId threadIds
return . M.lookup key $ threadIds
when (isJust oldTId) (killThread . fromJust $ oldTId)
threadDelay hour
deleteValue key inst
where hour = 60 * 60 * 1000000
handleCommand :: (Serialize i, Eq i, Ord i, Serialize a) =>
Command i a -> Peer -> KademliaInstance i a -> IO ()
handleCommand PING peer inst = send (handle inst) peer PONG
handleCommand (FIND_NODE id) peer inst = returnNodes peer id inst
handleCommand (STORE key value) _ inst = do
insertValue key value inst
void . forkIO . expirationProcess inst $ key
handleCommand (FIND_VALUE key) peer inst = do
result <- lookupValue key inst
case result of
Just value -> liftIO $ send (handle inst) peer $ RETURN_VALUE key value
Nothing -> returnNodes peer key inst
handleCommand _ _ _ = return ()
returnNodes :: (Serialize i, Eq i, Ord i, Serialize a) =>
Peer -> i -> KademliaInstance i a -> IO ()
returnNodes peer id (KI h (KS sTree _) _) = do
tree <- atomically . readTVar $ sTree
let nodes = T.findClosest tree id 7
liftIO $ send h peer (RETURN_NODES id nodes)