{-|
Module      : Network.Kademlia.Instance
Description : Implementation of the KademliaInstance type

"Network.Kademlia.Instance" implements the KademliaInstance type, as well
as all the things that need to happen in the background to get a working
Kademlia instance.
-}

module Network.Kademlia.Instance
    ( KademliaInstance(..)
    , KademliaState(..)
    , start
    , newInstance
    , insertNode
    , lookupNode
    , dumpPeers
    ) where

import Control.Concurrent
import Control.Concurrent.Chan
import Control.Concurrent.STM
import Control.Monad (void, forever, when, join, forM_, forever)
import Control.Monad.Trans
import Control.Monad.Trans.State hiding (state)
import Control.Monad.Trans.Reader
import Control.Monad.IO.Class (liftIO)
import Control.Applicative ((<$>), (<*>))
import System.IO.Error (catchIOError)
import qualified Data.Map as M
import Data.Maybe (catMaybes, isJust, fromJust)
import Data.Function (on)
import Data.Map (toList)

import Network.Kademlia.Networking
import qualified Network.Kademlia.Tree as T
import Network.Kademlia.Types
import Network.Kademlia.ReplyQueue

-- | The handle of a running Kademlia Node
data KademliaInstance i a = KI {
      handle :: KademliaHandle i a
    , state :: KademliaState i a
    , expirationThreads :: TVar (M.Map i ThreadId)
    }

-- | Representation of the data the KademliaProcess carries
data KademliaState i a = KS {
      sTree   :: TVar (T.NodeTree i)
    , values :: TVar (M.Map i a)
    }

-- | Create a new KademliaInstance from an Id and a KademliaHandle
newInstance :: (Serialize i) =>
               i -> KademliaHandle i a -> IO (KademliaInstance i a)
newInstance id handle = do
    tree <- atomically . newTVar . T.create $ id
    values <- atomically . newTVar $ M.empty
    threads <- atomically . newTVar $ M.empty
    return . KI handle (KS tree values) $ threads

-- | Insert a Node into the NodeTree
insertNode :: (Serialize i, Ord i) => KademliaInstance i a -> Node i -> IO ()
insertNode (KI _ (KS sTree _) _) node = atomically $ do
    tree <- readTVar sTree
    writeTVar sTree . T.insert tree $ node

-- | Signal a Node's timeout and retur wether it should be repinged
timeoutNode :: (Serialize i, Ord i) => KademliaInstance i a -> i -> IO Bool
timeoutNode (KI _ (KS sTree _) _) id = atomically $ do
    tree <- readTVar sTree
    let (newTree, pingAgain) = T.handleTimeout tree id
    writeTVar sTree newTree
    return pingAgain

-- | Lookup a Node in the NodeTree
lookupNode :: (Serialize i, Ord i) => KademliaInstance i a -> i -> IO (Maybe (Node i))
lookupNode (KI _ (KS sTree _) _) id = atomically $ do
    tree <- readTVar sTree
    return . T.lookup tree $ id

-- | Return all the Nodes an Instance has encountered so far
dumpPeers :: KademliaInstance i a -> IO [Node i]
dumpPeers (KI _ (KS sTree _) _) = atomically $ do
    tree <- readTVar sTree
    return . T.toList $ tree

-- | Insert a value into the store
insertValue :: (Ord i) => i -> a -> KademliaInstance i a -> IO ()
insertValue key value (KI _ (KS _ values) _) = atomically $ do
    vals <- readTVar values
    writeTVar values $ M.insert key value vals

-- | Delete a value from the store
deleteValue :: (Ord i) => i -> KademliaInstance i a -> IO ()
deleteValue key (KI _ (KS _ values) _) = atomically $ do
    vals <- readTVar values
    writeTVar values $ M.delete key vals

-- | Lookup a value in the store
lookupValue :: (Ord i) => i -> KademliaInstance i a -> IO (Maybe a)
lookupValue key (KI _ (KS _ values) _) = atomically $ do
    vals <- readTVar values
    return . M.lookup key $ vals

-- | Start the background process for a KademliaInstance
start :: (Serialize i, Ord i, Serialize a, Eq i, Eq a) =>
         KademliaInstance i a -> ReplyQueue i a -> IO ()
start inst rq = do
        startRecvProcess . handle $ inst
        let rChan = timeoutChan rq
            dChan = defaultChan rq
        receivingId <- forkIO . receivingProcess inst rq rChan $ dChan
        pingId <- forkIO . pingProcess inst $ dChan
        spreadId <- forkIO . spreadValueProcess $ inst
        void . forkIO $ backgroundProcess inst dChan [pingId, spreadId, receivingId]

-- | The central process all Replys go trough
receivingProcess :: (Serialize i, Serialize a, Eq i, Ord i) =>
       KademliaInstance i a -> ReplyQueue i a -> Chan (Reply i a)
    -> Chan (Reply i a)-> IO ()
receivingProcess inst rq replyChan registerChan = forever $ do
    reply <- readChan replyChan

    case reply of
        -- Handle a timed out node
        Timeout registration -> do
            let origin = replyOrigin registration
                h = handle inst
                newRegistration = registration { replyTypes = [R_PONG] }

            -- Mark the node as timed out
            pingAgain <- timeoutNode inst origin
            -- If the node should be repinged
            when pingAgain $ do
                result <- lookupNode inst origin
                case result of
                    Nothing -> return ()
                    Just node -> do
                        -- Ping the node
                        send h (peer node) PING
                        expect h newRegistration registerChan

        -- Store values in newly encountered nodes that you are the closest to
        Answer (Signal node cmd) -> do
            let originId = nodeId node
            tree <- retrieve sTree

            -- This node is not yet known
            when (not . isJust . T.lookup tree $ originId) $ do
                let closestKnown = T.findClosest tree originId 1
                    ownId = T.extractId tree
                    self = node { nodeId = ownId }
                    bucket = self:closestKnown
                    -- Find out closest known node
                    closestId = nodeId . head . sortByDistanceTo bucket $ originId

                -- This node can be assumed to be closest to the new node
                when (ownId == closestId) $ do
                    storedValues <- toList <$> retrieve values
                    let h = handle inst
                        p = peer node
                    -- Store all stored values in the new node
                    forM_ storedValues (send h p . uncurry STORE)

            case cmd of
                -- Ping unknown Nodes that were returned by RETURN_NODES.
                -- Pinging them first is neccessary to prevent disconnected
                -- nodes from spreading through the networks NodeTrees.
                (RETURN_NODES _ nodes) -> forM_ nodes $ \node -> do
                    result <- lookupNode inst . nodeId $ node
                    case result of
                        Nothing -> send (handle inst) (peer node) PING
                        _ -> return ()
                _ -> return ()

        _ -> return ()

    dispatch reply rq
    where retrieve f = atomically . readTVar . f . state $ inst


-- | The actual process running in the background
backgroundProcess :: (Serialize i, Ord i, Serialize a, Eq i, Eq a) =>
    KademliaInstance i a -> Chan (Reply i a) -> [ThreadId] -> IO ()
backgroundProcess inst chan threadIds = do
    reply <- liftIO . readChan $ chan

    case reply of
        Answer sig -> do
            let node = source sig

            -- Handle the signal
            handleCommand (command sig) (peer node) inst

            -- Insert the node into the tree, if it's allready known, it will
            -- be refreshed
            insertNode inst node

            backgroundProcess inst chan threadIds

        -- Kill all other processes and stop on Closed
        Closed -> do
            mapM_ killThread threadIds

            eThreads <- atomically . readTVar . expirationThreads $ inst
            mapM_ killThread $ map snd (M.toList eThreads)

        _ -> return ()

-- | Ping all known nodes every five minutes to make sure they are still present
pingProcess :: (Serialize i, Serialize a, Eq i) => KademliaInstance i a
            -> Chan (Reply i a) -> IO ()
pingProcess (KI h (KS sTree _) _) chan = forever $ do
    threadDelay fiveMinutes

    tree <- atomically . readTVar $ sTree
    forM_ (T.toList tree) $ \node -> do
        -- Send PING and expect a PONG
        send h (peer node) PING
        expect h (RR [R_PONG] (nodeId node)) $ chan

    where fiveMinutes = 300000000

-- | Store all values stored in the node in the 7 closest known nodes every hour
spreadValueProcess :: (Serialize i, Serialize a, Eq i) => KademliaInstance i a
                   -> IO ()
spreadValueProcess (KI h (KS sTree sValues) _) = forever $ do
    threadDelay hour

    values <- atomically . readTVar $ sValues
    tree <- atomically . readTVar $ sTree

    mapMWithKey (sendRequests tree) $ values

    where hour = 60 * 60 * 1000000
          sendRequests tree key val = do
            let closest = T.findClosest tree key 7
            forM_ closest $ \node -> send h (peer node) (STORE key val)

          mapMWithKey :: (k -> v -> IO a) -> M.Map k v -> IO [a]
          mapMWithKey f m = sequence . map snd . M.toList . M.mapWithKey f $ m

-- | Delete a value after a certain amount of time has passed
expirationProcess :: (Ord i) => KademliaInstance i a -> i -> IO ()
expirationProcess inst@(KI _ _ valueTs) key = do
    -- Map own ThreadId to the key
    myTId <- myThreadId
    oldTId <- atomically $ do
        threadIds <- readTVar valueTs
        writeTVar valueTs $ M.insert key myTId threadIds
        return . M.lookup key $ threadIds

    -- Kill the old timeout thread, if it exists
    when (isJust oldTId) (killThread . fromJust $ oldTId)

    threadDelay hour
    deleteValue key inst

    where hour = 60 * 60 * 1000000

-- | Handles the differendt Kademlia Commands appropriately
handleCommand :: (Serialize i, Eq i, Ord i, Serialize a) =>
    Command i a -> Peer -> KademliaInstance i a -> IO ()
-- Simply answer a PING with a PONG
handleCommand PING peer inst = send (handle inst) peer PONG
-- Return a KBucket with the closest Nodes
handleCommand (FIND_NODE id) peer inst = returnNodes peer id inst
-- Insert the value into the values store and start the expiration process
handleCommand (STORE key value) _ inst = do
    insertValue key value inst
    void . forkIO . expirationProcess inst $ key
-- Return the value, if known, or the closest other known Nodes
handleCommand (FIND_VALUE key) peer inst = do
    result <- lookupValue key inst
    case result of
        Just value -> liftIO $ send (handle inst) peer $ RETURN_VALUE key value
        Nothing    -> returnNodes peer key inst
handleCommand _ _ _ = return ()

-- | Return a KBucket with the closest Nodes to a supplied Id
returnNodes :: (Serialize i, Eq i, Ord i, Serialize a) =>
    Peer -> i -> KademliaInstance i a -> IO ()
returnNodes peer id (KI h (KS sTree _) _) = do
    tree <- atomically . readTVar $ sTree
    let nodes = T.findClosest tree id 7
    liftIO $ send h peer (RETURN_NODES id nodes)