{-# OPTIONS_HADDOCK hide #-} {-# LANGUAGE RecordWildCards, ScopedTypeVariables, PackageImports, GADTs, DeriveDataTypeable, NamedFieldPuns, ViewPatterns, TupleSections, DoRec #-} module Network.Hermes.Gossip( GossipContext, TTL, writeFactoid, readFactoid, readFactoids, addCallback, newGossiper, setPeriod, snapshotGossiper, restoreGossiper ) where import Control.Applicative import Control.Monad import Control.Monad.Trans.Maybe import Control.Concurrent import Control.Concurrent.STM import Control.Concurrent.MVar import Codec.Crypto.RSA hiding(sign,verify) import qualified Codec.Crypto.RSA as RSA import Data.ByteString(ByteString) import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BSL import Data.Time import Data.Map(Map) import qualified Data.Map as M import Data.Typeable import Data.Serialize import Data.Maybe import System.Log.Logger import System.Random import System.Random.Shuffle import Network.Hermes.Core import Network.Hermes.Misc import Network.Hermes.Protocol import Network.Hermes.Types(CoreContext(..),PeerKey(peerKey)) -- RSA adaption sign :: PrivateKey -> ByteString -> ByteString sign key bs = BS.concat $ BSL.toChunks $ RSA.sign key (BSL.fromChunks [bs]) verify :: PublicKey -> ByteString -> ByteString -> Bool verify key bs sig = RSA.verify key (BSL.fromChunks [bs]) (BSL.fromChunks [sig]) -- * State data GossipContext = GossipContext { core :: CoreContext ,factoids :: TVar (Map FactoidKey (Map HermesID Factoid)) ,factoidTTLs :: TVar (Map TTL FactoidKey) ,gossipInterval :: TVar Int -- ^ Microseconds, oddly enough ,messageLoop :: ThreadId ,callbacks :: TVar (Map (Type,Type) [Callback]) } data Callback where Callback :: (Serialize tag, Serialize msg) => (HermesID -> tag -> msg -> IO ()) -> Callback -- | Seconds type Age = Double -- | Seconds type TTL = Double -- * External protocol data FactoidKey = FactoidKey { factoidType :: Type, factoidTagType :: Type, factoidTag :: ByteString } deriving(Typeable,Show,Eq,Ord) instance Serialize FactoidKey where put (FactoidKey a b c) = put a >> put b >> put c get = liftM3 FactoidKey get get get data Factoid = Factoid { factoidDate :: UTCTime, factoidInsertTime :: UTCTime, -- ^ The time at which it was inserted locally factoidTTL :: Maybe TTL, factoidData :: ByteString, factoidSignature :: ByteString, factoidSource :: HermesID } deriving(Typeable,Show) makeFactoid :: CoreContext -> Maybe TTL -> ByteString -> IO Factoid makeFactoid CoreContext{myPrivateKey,myHermesID} ttl bs = do now <- getCurrentTime return $ Factoid now now ttl bs (sign myPrivateKey bs) myHermesID verifyFactoid :: CoreContext -> Factoid -> STM Bool verifyFactoid (peerKeys -> keys) Factoid{factoidData,factoidSignature,factoidSource} = do key <- fmap peerKey . M.lookup factoidSource <$> readTVar keys return $ maybe False (\k -> verify k factoidData factoidSignature) key instance Serialize Factoid where put (Factoid a b c d e f) = put a >> put b >> put c >> put d >> put e >> put f get = Factoid <$> get <*> get <*> get <*> get <*> get <*> get instance Serialize UTCTime where put (UTCTime a b) = put a >> put b get = liftM2 UTCTime get get instance Serialize Day where put = put . toModifiedJulianDay get = ModifiedJulianDay <$> get instance Serialize DiffTime where put = put . toRational get = fromRational <$> get data FactoidMessage = PushFactoid FactoidKey Factoid | PushAbort -- ^ Tagged by FactoidKey | PullFactoids | PulledFactoids [(FactoidKey,Factoid)] deriving(Typeable,Show) instance Serialize FactoidMessage where put (PushFactoid a b) = putWord8 0 >> put a >> put b put PushAbort = putWord8 1 put PullFactoids = putWord8 2 put (PulledFactoids a) = putWord8 3 >> put a get = do tag <- getWord8 case tag of 0 -> liftM2 PushFactoid get get 1 -> return PushAbort 2 -> return PullFactoids 3 -> liftM PulledFactoids get _ -> fail "Serialize FactoidMessage: Broken message" -- | Insert a factoid in the gossip network. This will immediately -- trigger a limited gossip exchange, hopefully spreading it to a -- large fraction of the network. -- -- Factoids are keyed by their type, source, and the type and -- serialized value of an arbitrary tag. They can be replaced by -- re-inserting later, and optionally expire after a timeout. -- -- Don't rely on the timeout, though. It's for garbage collection, and -- is not required to be exact. writeFactoid :: forall factoid tag. (Typeable factoid, Serialize factoid, Typeable tag, Serialize tag) => GossipContext -> factoid -> tag -> Maybe TTL -- ^ The timeout, in seconds -> IO () writeFactoid ctx@GossipContext{..} fact tag ttl = do factoid <- makeFactoid core ttl (encode fact) let key = FactoidKey (showType fact) (showType tag) (encode tag) -- Insert the factoid in gossip state -- print =<< (atomically $ readTVar factoids) insertFactoidUnchecked ctx key factoid -- print =<< (atomically $ readTVar factoids) -- Perform limited gossip forkIO $ limitedGossip core factoid key return () -- | Returns true if the fact was valid and insertable, otherwise false insertFactoid :: GossipContext -> (FactoidKey,Factoid) -> IO Bool insertFactoid GossipContext{core} (_,factoid) | myHermesID core == factoidSource factoid = return False insertFactoid ctx@GossipContext{..} (key,factoid) = do -- Make sure the factoid hasn't been altered -- print =<< (atomically $ readTVar factoids) ok <- atomically $ do key <- M.lookup (factoidSource factoid) <$> readTVar (peerKeys core) -- FIXME: Send the key along as well, and.. something. return $ maybe True (\k -> verify (peerKey k) (factoidData factoid) (factoidSignature factoid)) key -- And insert. if ok then do inserted <- insertFactoidUnchecked ctx key factoid -- Call callbacks when inserted $ do let mapKey = (factoidTagType key, factoidType key) -- print ("Calling callbacks for " ++ show mapKey) callbacks <- atomically $ M.findWithDefault [] mapKey <$> readTVar callbacks forM_ callbacks $ \(Callback callback) -> do -- print "Calling callback" let tag = decode $ factoidTag key fact = decode $ factoidData factoid case (tag,fact) of (Right t, Right f) -> callback (factoidSource factoid) t f otherwise -> alertM "hermes.gossip.callback" "Unable to decode factoid" return inserted else do alertM "hermes.gossip.push" $ "Incorrect factoid signature for " ++ show factoid return False -- | Returns true if the fact was insertable (not older than one we already have), otherwise false insertFactoidUnchecked :: GossipContext -> FactoidKey -> Factoid -> IO Bool insertFactoidUnchecked GossipContext{..} key factoid = atomically $ do oldFact <- join . fmap (M.lookup (factoidSource factoid)) . M.lookup key <$> readTVar factoids let update = case (factoidDate <$> oldFact, factoidDate factoid) of (Just oldDate, newDate) | oldDate >= newDate -> False otherwise -> True when update $ do modifyTVar factoids (adjustWithDefault M.empty (M.insert (factoidSource factoid) factoid) key) when (isJust $ factoidTTL factoid) $ modifyTVar factoidTTLs (M.insert (fromJust $ factoidTTL factoid) key) return update -- | Gossips about a fact until someone tells us they already know it -- (or we run out of peers). limitedGossip :: CoreContext -> Factoid -> FactoidKey -> IO () limitedGossip core factoid key = do -- HACK: Let the system bootstrap before we start looking for peers threadDelay 100000 -- Get a list of every peer we know the address of, shuffled rndgen <- newStdGen peers' <- (fmap . fmap) fst $ atomically $ M.toList <$> readTVar (peerAddress core) let peers = shuffle' peers' (length peers') rndgen -- Then contact them, one by one. Instead of doing it serially, we start a new contact -- every half-second. mvar <- newEmptyMVar acceptType core (undefined :: FactoidMessage) key timerThread <- forkIO $ forever $ do putMVar mvar Nothing; threadDelay 500000 recvThread <- forkIO $ forever $ recv' core key >>= putMVar mvar . Just . snd let gossiper list@(peer:peers) = do input <- takeMVar mvar case input of Just PushAbort -> return () -- We're done. Nothing -> do -- Timer triggered, contact new peer infoM "hermes.gossip.push" $ "Pushing to " ++ show peer forkIO $ send core peer (PushFactoid key factoid) gossiper peers Just e -> do -- What? warningM "hermes.gossip.push" $ "Unexpected reply: " ++ show e gossiper list gossiper [] = return () gossiper peers -- Finally, kill the threads. killThread timerThread killThread recvThread -- And reset core. refuseType core (undefined :: FactoidMessage) key -- | Factoid message loop factoidLoop :: GossipContext -> IO () factoidLoop ctx@GossipContext{..} = do -- Setup mbox <- newEmptyMVar delayThread <- forkIO $ do threadDelay 100000 -- FIXME: Ugly hack to let clients call setPeriod before we delay. forever $ do threadDelay =<< atomically (readTVar gossipInterval) putMVar mbox Nothing infoM "hermes.gossip.pull" "Initiating gossip" recvThread <- forkIO $ forever $ recv core >>= putMVar mbox . Just -- Aand.. go! forever $ do input <- takeMVar mbox forkIO $ case input of Nothing -> do -- Time for our periodic gossiping. Pick some random peer. peers <- atomically $ readTVar (peerAddress core) unless (M.null peers) $ do peer <- fst . flip M.elemAt peers <$> randomRIO (0, M.size peers - 1) send core peer PullFactoids Just (source,PushFactoid key factoid) -> do infoM "hermes.gossip.push" "Caught gossip" inserted <- insertFactoid ctx (key,factoid) if inserted then limitedGossip core factoid key else send' core source PushAbort key Just (source,PullFactoids) -> do -- Someone wants to know the truth, the full truth, and nothing but the factoids. Let's oblige. facts <- concatMap (\(key,m) -> map ((key,) . snd) $ M.toList m) . M.toList <$> atomically (readTVar factoids) infoM "hermes.gossip.pull" $ "Sending gossip: " ++ show (length facts) -- print facts send core source $ PulledFactoids facts Just (_,PulledFactoids factoids) -> do infoM "hermes.gossip.pull" $ "Received gossip: " ++ show (length factoids) -- print factoids mapM_ (insertFactoid ctx) factoids Just (source,e) -> do warningM "hermes.gossip.pull" $ "Unexpected reply: " ++ show (source,e) -- | Read a factoid, assuming it exists. readFactoid :: forall factoid tag. (Typeable factoid, Serialize factoid, Typeable tag, Serialize tag) => GossipContext -> tag -> HermesID -> IO (Maybe factoid) readFactoid GossipContext{factoids} tag source = do let key = FactoidKey (showType (undefined :: factoid)) (showType tag) (encode tag) factoid' <- atomically $ M.lookup source . M.findWithDefault M.empty key <$> readTVar factoids let factoid = fmap (decode . factoidData) factoid' case factoid of Nothing -> return Nothing Just (Left err) -> alertM "hermes.gossip.readFactoid" ("Factoid decode error: " ++ err) >> return Nothing Just (Right f) -> return f -- | Read all factoids with an appropriate type and tag. Useful if you -- don't know what source to expect. readFactoids :: forall factoid tag. (Typeable factoid, Serialize factoid, Typeable tag, Serialize tag) => GossipContext -> tag -> IO [(HermesID,factoid)] readFactoids GossipContext{factoids} tag = do let key = FactoidKey (showType (undefined :: factoid)) (showType tag) (encode tag) facts <- M.toList . M.findWithDefault M.empty key <$> atomically (readTVar factoids) fmap catMaybes $ forM facts $ \(source, factoid) -> do case decode . factoidData $ factoid of Left err -> alertM "hermes.gossip.readFactoids" ("Factoid decode error: " ++ err) >> return Nothing Right f -> return $ Just (source,f) -- | Creates a gossiper from scratch newGossiper :: CoreContext -> Double -- ^ The gossip period, in seconds -> IO GossipContext newGossiper core period = do rec factoids <- newTVarIO M.empty factoidTTLs <- newTVarIO M.empty gossipInterval <- newTVarIO $ round $ period * 1000000 messageLoop <- forkIO $ factoidLoop ctx callbacks <- newTVarIO M.empty let ctx = GossipContext{..} return ctx -- | Set the period for the periodic gossiper. It will take effect -- after the next periodic gossip. setPeriod :: GossipContext -> Double -- ^ The period, in seconds -> IO () setPeriod GossipContext{gossipInterval} period = atomically $ writeTVar gossipInterval $ round $ period * 1000000 -- | Snapshots a gossip context for storage. All state is saved, -- except callbacks. snapshotGossiper :: GossipContext -> STM ByteString snapshotGossiper GossipContext{..} = do facts <- readTVar factoids ttls <- readTVar factoidTTLs interval <- readTVar gossipInterval return $ encode (facts,ttls,interval) -- | Restore a gossip context and start gossiping restoreGossiper :: CoreContext -> ByteString -> IO GossipContext restoreGossiper core (decode -> Right (facts,ttls,interval)) = do rec factoids <- newTVarIO facts factoidTTLs <- newTVarIO ttls gossipInterval <- newTVarIO interval messageLoop <- forkIO $ factoidLoop ctx callbacks <- newTVarIO M.empty let ctx = GossipContext{..} return ctx -- | Add a callback to be called every time a type-matching factoid is -- inserted or updated. It will not be called for writeFactoid calls. addCallback :: forall msg tag. (Serialize tag, Typeable tag, Serialize msg, Typeable msg) => GossipContext -> (HermesID -> tag -> msg -> IO ()) -> IO () addCallback GossipContext{callbacks} function = do let callback = Callback function key = (showType (undefined :: tag), showType (undefined :: msg)) -- print ("INserting callback for " ++ show key) atomically $ modifyTVar callbacks (adjustWithDefault [] (callback:) key)