{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE RecordWildCards, ScopedTypeVariables, PackageImports, GADTs,
             DeriveDataTypeable, NamedFieldPuns, ViewPatterns, TupleSections, DoRec #-}
module Network.Hermes.Gossip(
  GossipContext, TTL,
  writeFactoid,
  readFactoid,
  readFactoids,
  addCallback,
  newGossiper,
  setPeriod,
  snapshotGossiper,
  restoreGossiper
  ) where

import Control.Applicative
import Control.Monad
import Control.Monad.Trans.Maybe
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.MVar
 
import Codec.Crypto.RSA hiding(sign,verify)
import qualified Codec.Crypto.RSA as RSA
import Data.ByteString(ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Time
import Data.Map(Map)
import qualified Data.Map as M
import Data.Typeable
import Data.Serialize
import Data.Maybe
import System.Log.Logger
import System.Random

import Network.Hermes.Core
import Network.Hermes.Misc
import Network.Hermes.Protocol
import Network.Hermes.Types(CoreContext(..),PeerKey(peerKey))

-- RSA adaption

sign :: PrivateKey -> ByteString -> ByteString
sign key bs = BS.concat $ BSL.toChunks $ RSA.sign key (BSL.fromChunks [bs])

verify :: PublicKey -> ByteString -> ByteString -> Bool
verify key bs sig = RSA.verify key (BSL.fromChunks [bs]) (BSL.fromChunks [sig])


-- * State

data GossipContext = GossipContext {
  core :: CoreContext
  ,factoids :: TVar (Map FactoidKey (Map HermesID Factoid))
  ,factoidTTLs :: TVar (Map TTL FactoidKey)
  ,gossipInterval :: TVar Int -- ^ Microseconds, oddly enough
  ,messageLoop :: ThreadId
  ,callbacks :: TVar (Map (Type,Type) [Callback])
  }

data Callback where
  Callback :: (Serialize tag, Serialize msg) => (HermesID -> tag -> msg -> IO ()) -> Callback

-- | Seconds
type Age = Double
-- | Seconds
type TTL = Double

-- * External protocol

data FactoidKey = FactoidKey {
  factoidType :: Type,
  factoidTagType :: Type,
  factoidTag :: ByteString
  } deriving(Typeable,Show,Eq,Ord)

instance Serialize FactoidKey where
  put (FactoidKey a b c) = put a >> put b >> put c
  get = liftM3 FactoidKey get get get

data Factoid = Factoid {
  factoidDate :: UTCTime,
  factoidInsertTime :: UTCTime, -- ^ The time at which it was inserted locally
  factoidTTL :: Maybe TTL,
  factoidData :: ByteString,
  factoidSignature :: ByteString,
  factoidSource :: HermesID
  } deriving(Typeable,Show)

makeFactoid :: CoreContext -> Maybe TTL -> ByteString -> IO Factoid
makeFactoid CoreContext{myPrivateKey,myHermesID} ttl bs = do
  now <- getCurrentTime
  return $ Factoid now now ttl bs (sign myPrivateKey bs) myHermesID

verifyFactoid :: CoreContext -> Factoid -> STM Bool
verifyFactoid (peerKeys -> keys) Factoid{factoidData,factoidSignature,factoidSource} = do
  key <- fmap peerKey . M.lookup factoidSource <$> readTVar keys
  return $ maybe False (\k -> verify k factoidData factoidSignature) key

instance Serialize Factoid where
  put (Factoid a b c d e f) = put a >> put b >> put c >> put d >> put e >> put f
  get = Factoid <$> get <*> get <*> get <*> get <*> get <*> get

instance Serialize UTCTime where
  put (UTCTime a b) = put a >> put b
  get = liftM2 UTCTime get get

instance Serialize Day where
  put = put . toModifiedJulianDay
  get = ModifiedJulianDay <$> get

instance Serialize DiffTime where
  put = put . toRational
  get = fromRational <$> get

data FactoidMessage =
  PushFactoid FactoidKey Factoid
  | PushAbort -- ^ Tagged by FactoidKey
  | PullFactoids
  | PulledFactoids [(FactoidKey,Factoid)]
    deriving(Typeable,Show)

instance Serialize FactoidMessage where
  put (PushFactoid a b) = putWord8 0 >> put a >> put b
  put PushAbort = putWord8 1
  put PullFactoids = putWord8 2
  put (PulledFactoids a) = putWord8 3 >> put a
  get = do
    tag <- getWord8
    case tag of
      0 -> liftM2 PushFactoid get get
      1 -> return PushAbort
      2 -> return PullFactoids
      3 -> liftM PulledFactoids get
      _ -> fail "Serialize FactoidMessage: Broken message"

-- | Insert a factoid in the gossip network. This will immediately
-- trigger a limited gossip exchange, hopefully spreading it to a
-- large fraction of the network.
--
-- Factoids are keyed by their type, source, and the type and
-- serialized value of an arbitrary tag. They can be replaced by
-- re-inserting later, and optionally expire after a timeout.
-- 
-- Don't rely on the timeout, though. It's for garbage collection, and
-- is not required to be exact.
writeFactoid :: forall factoid tag. (Typeable factoid, Serialize factoid, Typeable tag, Serialize tag)
                => GossipContext -> factoid -> tag
                -> Maybe TTL -- ^ The timeout, in seconds
                -> IO ()
writeFactoid ctx@GossipContext{..} fact tag ttl = do
  factoid <- makeFactoid core ttl (encode fact)
  let key = FactoidKey (showType fact) (showType tag) (encode tag)
  -- Insert the factoid in gossip state
  -- print =<< (atomically $ readTVar factoids)
  insertFactoidUnchecked ctx key factoid
  -- print =<< (atomically $ readTVar factoids)
  -- Perform limited gossip
  forkIO $ limitedGossip core factoid key
  return ()


-- | Returns true if the fact was valid and insertable, otherwise false
insertFactoid :: GossipContext -> (FactoidKey,Factoid) -> IO Bool
insertFactoid GossipContext{core} (_,factoid) | myHermesID core == factoidSource factoid = return False
insertFactoid ctx@GossipContext{..} (key,factoid) = do
  -- Make sure the factoid hasn't been altered
  -- print =<< (atomically $ readTVar factoids)
  ok <- atomically $ do
    key <- M.lookup (factoidSource factoid) <$> readTVar (peerKeys core)
    -- FIXME: Send the key along as well, and.. something.
    return $ maybe True (\k -> verify (peerKey k) (factoidData factoid) (factoidSignature factoid)) key
  -- And insert.
  if ok
    then do
    inserted <- insertFactoidUnchecked ctx key factoid
    -- Call callbacks
    when inserted $ do
      let mapKey = (factoidTagType key, factoidType key)
      -- print ("Calling callbacks for " ++ show mapKey)
      callbacks <- atomically $ M.findWithDefault [] mapKey <$> readTVar callbacks
      forM_ callbacks $ \(Callback callback) -> do
        -- print "Calling callback"
        let tag = decode $ factoidTag key
            fact = decode $ factoidData factoid
        case (tag,fact) of
          (Right t, Right f) -> callback (factoidSource factoid) t f
          otherwise -> alertM "hermes.gossip.callback" "Unable to decode factoid"
    return inserted
    else do
    alertM "hermes.gossip.push" $ "Incorrect factoid signature for " ++ show factoid
    return False

-- | Returns true if the fact was insertable (not older than one we already have), otherwise false
insertFactoidUnchecked :: GossipContext -> FactoidKey -> Factoid -> IO Bool
insertFactoidUnchecked GossipContext{..} key factoid = atomically $ do
  oldFact <- join . fmap (M.lookup (factoidSource factoid)) . M.lookup key <$> readTVar factoids
  let update = case (factoidDate <$> oldFact, factoidDate factoid) of
        (Just oldDate, newDate) | oldDate >= newDate -> False
        otherwise -> True
  when update $ do
    modifyTVar factoids (adjustWithDefault M.empty (M.insert (factoidSource factoid) factoid) key)
    when (isJust $ factoidTTL factoid) $ modifyTVar factoidTTLs (M.insert (fromJust $ factoidTTL factoid) key)
  return update

-- | Gossips about a fact until someone tells us they already know it
-- (or we run out of peers).
limitedGossip :: CoreContext -> Factoid -> FactoidKey -> IO ()
limitedGossip core factoid key = do
  return () -- FIXME: THIS ENTIRE FUNCTION
  
fooasdhu core factoid key = do
  -- HACK: Let the system bootstrap before we start looking for peers
  threadDelay 100000
  -- Get a list of every peer we know the address of
  -- FIXME: SHUFFLE THE LIST, FILTER OUT MYSELF
  peers <- (fmap . fmap) fst $ atomically $ M.toList <$> readTVar (peerAddress core)
  -- Then contact them, one by one. Instead of doing it serially, 
  mvar <- newEmptyMVar
  acceptType core (undefined :: FactoidMessage) key
  timerThread <- forkIO $ forever $ do putMVar mvar Nothing; threadDelay 500000
  recvThread <- forkIO $ forever $ recv' core key >>= putMVar mvar . Just . snd
  let gossiper list@(peer:peers) = do
        input <- takeMVar mvar
        case input of
          Just PushAbort -> return () -- We're done.
          Nothing -> do
            -- Timer triggered, contact new peer
            infoM "hermes.gossip.push" $ "Pushing to " ++ show peer
            forkIO $ send core peer (PushFactoid key factoid)
            gossiper peers
          Just e -> do
            -- What?
            warningM "hermes.gossip.push" $ "Unexpected reply: " ++ show e
            gossiper list
      gossiper [] = return ()
  gossiper peers
  -- Finally, kill the threads.
  killThread timerThread
  killThread recvThread
  -- And reset core.
  refuseType core (undefined :: FactoidMessage) key

-- | Factoid message loop
factoidLoop :: GossipContext -> IO ()
factoidLoop ctx@GossipContext{..} = do
  -- Setup
  mbox <- newEmptyMVar
  delayThread <- forkIO $ do
    threadDelay 100000 -- FIXME: Ugly hack to let clients call setPeriod before we delay.
    forever $ do
      threadDelay =<< atomically (readTVar gossipInterval)
      putMVar mbox Nothing
      infoM "hermes.gossip.pull" "Initiating gossip"
  recvThread <- forkIO $ forever $ recv core >>= putMVar mbox . Just
  -- Aand.. go!
  forever $ do
    input <- takeMVar mbox
    forkIO $ case input of
      Nothing -> do
        -- Time for our periodic gossiping. Pick some random peer.
        peers <- atomically $ readTVar (peerAddress core)
        unless (M.null peers) $ do
          peer <- fst . flip M.elemAt peers <$> randomRIO (0, M.size peers - 1)
          send core peer PullFactoids
      Just (source,PushFactoid key factoid) -> do
        infoM "hermes.gossip.push" "Caught gossip"
        inserted <- insertFactoid ctx (key,factoid)
        if inserted
          then limitedGossip core factoid key
          else send' core source PushAbort key
      Just (source,PullFactoids) -> do
        -- Someone wants to know the truth, the full truth, and nothing but the factoids. Let's oblige.
        facts <- concatMap (\(key,m) -> map ((key,) . snd) $ M.toList m) . M.toList <$>
                 atomically (readTVar factoids)
        infoM "hermes.gossip.pull" $ "Sending gossip: " ++ show (length facts)
        -- print facts
        send core source $ PulledFactoids facts
      Just (_,PulledFactoids factoids) -> do
        infoM "hermes.gossip.pull" $ "Received gossip: " ++ show (length factoids)
        -- print factoids
        mapM_ (insertFactoid ctx) factoids
      Just (source,e) -> do
        warningM "hermes.gossip.pull" $ "Unexpected reply: " ++ show (source,e)

-- | Read a factoid, assuming it exists.
readFactoid :: forall factoid tag. (Typeable factoid, Serialize factoid, Typeable tag, Serialize tag)
               => GossipContext -> tag -> HermesID -> IO (Maybe factoid)
readFactoid GossipContext{factoids} tag source = do
  let key = FactoidKey (showType (undefined :: factoid)) (showType tag) (encode tag)
  factoid <- atomically $ fmap (decode . factoidData) . M.lookup source . M.findWithDefault M.empty key <$> readTVar factoids
  case factoid of
    Nothing -> return Nothing
    Just (Left err) -> alertM "hermes.gossip.readFactoid" ("Factoid decode error: " ++ err) >> return Nothing
    Just (Right f) -> return f

-- | Read all factoids with an appropriate type and tag. Useful if you
-- don't know what source to expect.
readFactoids :: forall factoid tag. (Typeable factoid, Serialize factoid, Typeable tag, Serialize tag)
                => GossipContext -> tag -> IO [(HermesID,factoid)]
readFactoids GossipContext{factoids} tag = do
  let key = FactoidKey (showType (undefined :: factoid)) (showType tag) (encode tag)
  facts <- M.toList . M.findWithDefault M.empty key <$> atomically (readTVar factoids)
  fmap catMaybes $ forM facts $ \(source, decode . factoidData -> factoid) -> do
    case factoid of
      Left err -> alertM "hermes.gossip.readFactoids" ("Factoid decode error: " ++ err) >> return Nothing
      Right f -> return $ Just f
  

-- | Creates a gossiper from scratch
newGossiper :: CoreContext 
               -> Double -- ^ The gossip period, in seconds
               -> IO GossipContext
newGossiper core period = do
  rec
    factoids <- newTVarIO M.empty
    factoidTTLs <- newTVarIO M.empty
    gossipInterval <- newTVarIO $ round $ period * 1000000
    messageLoop <- forkIO $ factoidLoop ctx
    callbacks <- newTVarIO M.empty
    let ctx = GossipContext{..}
  return ctx

-- | Set the period for the periodic gossiper. It will take effect
-- after the next periodic gossip.
setPeriod :: GossipContext
             -> Double -- ^ The period, in seconds
             -> IO ()
setPeriod GossipContext{gossipInterval} period =
  atomically $ writeTVar gossipInterval $ round $ period * 1000000

-- | Snapshots a gossip context for storage. All state is saved,
-- except callbacks.
snapshotGossiper :: GossipContext -> STM ByteString
snapshotGossiper GossipContext{..} = do
  facts <- readTVar factoids
  ttls <- readTVar factoidTTLs
  interval <- readTVar gossipInterval
  return $ encode (facts,ttls,interval)

-- | Restore a gossip context and start gossiping
restoreGossiper :: CoreContext -> ByteString -> IO GossipContext
restoreGossiper core (decode -> Right (facts,ttls,interval)) = do
  rec
    factoids <- newTVarIO facts
    factoidTTLs <- newTVarIO ttls
    gossipInterval <- newTVarIO interval
    messageLoop <- forkIO $ factoidLoop ctx
    callbacks <- newTVarIO M.empty
    let ctx = GossipContext{..}
  return ctx

-- | Add a callback to be called every time a type-matching factoid is
-- inserted or updated. It will not be called for writeFactoid calls.
addCallback :: forall msg tag. (Serialize tag, Typeable tag, Serialize msg, Typeable msg)
               => GossipContext -> (HermesID -> tag -> msg -> IO ()) -> IO ()
addCallback GossipContext{callbacks} function = do
  let callback = Callback function
      key = (showType (undefined :: tag), showType (undefined :: msg))
  -- print ("INserting callback for " ++ show key)
  atomically $ modifyTVar callbacks (adjustWithDefault [] (callback:) key)