module Network.Hermes.Gossip(
GossipContext, TTL,
writeFactoid,
readFactoid,
readFactoids,
addCallback,
newGossiper,
setPeriod,
snapshotGossiper,
restoreGossiper
) where
import Control.Applicative
import Control.Monad
import Control.Monad.Trans.Maybe
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.MVar
import Codec.Crypto.RSA hiding(sign,verify)
import qualified Codec.Crypto.RSA as RSA
import Data.ByteString(ByteString)
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BSL
import Data.Time
import Data.Map(Map)
import qualified Data.Map as M
import Data.Typeable
import Data.Serialize
import Data.Maybe
import System.Log.Logger
import System.Random
import Network.Hermes.Core
import Network.Hermes.Misc
import Network.Hermes.Protocol
import Network.Hermes.Types(CoreContext(..),PeerKey(peerKey))
sign :: PrivateKey -> ByteString -> ByteString
sign key bs = BS.concat $ BSL.toChunks $ RSA.sign key (BSL.fromChunks [bs])
verify :: PublicKey -> ByteString -> ByteString -> Bool
verify key bs sig = RSA.verify key (BSL.fromChunks [bs]) (BSL.fromChunks [sig])
data GossipContext = GossipContext {
core :: CoreContext
,factoids :: TVar (Map FactoidKey (Map HermesID Factoid))
,factoidTTLs :: TVar (Map TTL FactoidKey)
,gossipInterval :: TVar Int
,messageLoop :: ThreadId
,callbacks :: TVar (Map (Type,Type) [Callback])
}
data Callback where
Callback :: (Serialize tag, Serialize msg) => (HermesID -> tag -> msg -> IO ()) -> Callback
type Age = Double
type TTL = Double
data FactoidKey = FactoidKey {
factoidType :: Type,
factoidTagType :: Type,
factoidTag :: ByteString
} deriving(Typeable,Show,Eq,Ord)
instance Serialize FactoidKey where
put (FactoidKey a b c) = put a >> put b >> put c
get = liftM3 FactoidKey get get get
data Factoid = Factoid {
factoidDate :: UTCTime,
factoidInsertTime :: UTCTime,
factoidTTL :: Maybe TTL,
factoidData :: ByteString,
factoidSignature :: ByteString,
factoidSource :: HermesID
} deriving(Typeable,Show)
makeFactoid :: CoreContext -> Maybe TTL -> ByteString -> IO Factoid
makeFactoid CoreContext{myPrivateKey,myHermesID} ttl bs = do
now <- getCurrentTime
return $ Factoid now now ttl bs (sign myPrivateKey bs) myHermesID
verifyFactoid :: CoreContext -> Factoid -> STM Bool
verifyFactoid (peerKeys -> keys) Factoid{factoidData,factoidSignature,factoidSource} = do
key <- fmap peerKey . M.lookup factoidSource <$> readTVar keys
return $ maybe False (\k -> verify k factoidData factoidSignature) key
instance Serialize Factoid where
put (Factoid a b c d e f) = put a >> put b >> put c >> put d >> put e >> put f
get = Factoid <$> get <*> get <*> get <*> get <*> get <*> get
instance Serialize UTCTime where
put (UTCTime a b) = put a >> put b
get = liftM2 UTCTime get get
instance Serialize Day where
put = put . toModifiedJulianDay
get = ModifiedJulianDay <$> get
instance Serialize DiffTime where
put = put . toRational
get = fromRational <$> get
data FactoidMessage =
PushFactoid FactoidKey Factoid
| PushAbort
| PullFactoids
| PulledFactoids [(FactoidKey,Factoid)]
deriving(Typeable,Show)
instance Serialize FactoidMessage where
put (PushFactoid a b) = putWord8 0 >> put a >> put b
put PushAbort = putWord8 1
put PullFactoids = putWord8 2
put (PulledFactoids a) = putWord8 3 >> put a
get = do
tag <- getWord8
case tag of
0 -> liftM2 PushFactoid get get
1 -> return PushAbort
2 -> return PullFactoids
3 -> liftM PulledFactoids get
_ -> fail "Serialize FactoidMessage: Broken message"
writeFactoid :: forall factoid tag. (Typeable factoid, Serialize factoid, Typeable tag, Serialize tag)
=> GossipContext -> factoid -> tag
-> Maybe TTL
-> IO ()
writeFactoid ctx@GossipContext{..} fact tag ttl = do
factoid <- makeFactoid core ttl (encode fact)
let key = FactoidKey (showType fact) (showType tag) (encode tag)
insertFactoidUnchecked ctx key factoid
forkIO $ limitedGossip core factoid key
return ()
insertFactoid :: GossipContext -> (FactoidKey,Factoid) -> IO Bool
insertFactoid GossipContext{core} (_,factoid) | myHermesID core == factoidSource factoid = return False
insertFactoid ctx@GossipContext{..} (key,factoid) = do
ok <- atomically $ do
key <- M.lookup (factoidSource factoid) <$> readTVar (peerKeys core)
return $ maybe True (\k -> verify (peerKey k) (factoidData factoid) (factoidSignature factoid)) key
if ok
then do
inserted <- insertFactoidUnchecked ctx key factoid
when inserted $ do
let mapKey = (factoidTagType key, factoidType key)
callbacks <- atomically $ M.findWithDefault [] mapKey <$> readTVar callbacks
forM_ callbacks $ \(Callback callback) -> do
let tag = decode $ factoidTag key
fact = decode $ factoidData factoid
case (tag,fact) of
(Right t, Right f) -> callback (factoidSource factoid) t f
otherwise -> alertM "hermes.gossip.callback" "Unable to decode factoid"
return inserted
else do
alertM "hermes.gossip.push" $ "Incorrect factoid signature for " ++ show factoid
return False
insertFactoidUnchecked :: GossipContext -> FactoidKey -> Factoid -> IO Bool
insertFactoidUnchecked GossipContext{..} key factoid = atomically $ do
oldFact <- join . fmap (M.lookup (factoidSource factoid)) . M.lookup key <$> readTVar factoids
let update = case (factoidDate <$> oldFact, factoidDate factoid) of
(Just oldDate, newDate) | oldDate >= newDate -> False
otherwise -> True
when update $ do
modifyTVar factoids (adjustWithDefault M.empty (M.insert (factoidSource factoid) factoid) key)
when (isJust $ factoidTTL factoid) $ modifyTVar factoidTTLs (M.insert (fromJust $ factoidTTL factoid) key)
return update
limitedGossip :: CoreContext -> Factoid -> FactoidKey -> IO ()
limitedGossip core factoid key = do
return ()
fooasdhu core factoid key = do
threadDelay 100000
peers <- (fmap . fmap) fst $ atomically $ M.toList <$> readTVar (peerAddress core)
mvar <- newEmptyMVar
acceptType core (undefined :: FactoidMessage) key
timerThread <- forkIO $ forever $ do putMVar mvar Nothing; threadDelay 500000
recvThread <- forkIO $ forever $ recv' core key >>= putMVar mvar . Just . snd
let gossiper list@(peer:peers) = do
input <- takeMVar mvar
case input of
Just PushAbort -> return ()
Nothing -> do
infoM "hermes.gossip.push" $ "Pushing to " ++ show peer
forkIO $ send core peer (PushFactoid key factoid)
gossiper peers
Just e -> do
warningM "hermes.gossip.push" $ "Unexpected reply: " ++ show e
gossiper list
gossiper [] = return ()
gossiper peers
killThread timerThread
killThread recvThread
refuseType core (undefined :: FactoidMessage) key
factoidLoop :: GossipContext -> IO ()
factoidLoop ctx@GossipContext{..} = do
mbox <- newEmptyMVar
delayThread <- forkIO $ do
threadDelay 100000
forever $ do
threadDelay =<< atomically (readTVar gossipInterval)
putMVar mbox Nothing
infoM "hermes.gossip.pull" "Initiating gossip"
recvThread <- forkIO $ forever $ recv core >>= putMVar mbox . Just
forever $ do
input <- takeMVar mbox
forkIO $ case input of
Nothing -> do
peers <- atomically $ readTVar (peerAddress core)
unless (M.null peers) $ do
peer <- fst . flip M.elemAt peers <$> randomRIO (0, M.size peers 1)
send core peer PullFactoids
Just (source,PushFactoid key factoid) -> do
infoM "hermes.gossip.push" "Caught gossip"
inserted <- insertFactoid ctx (key,factoid)
if inserted
then limitedGossip core factoid key
else send' core source PushAbort key
Just (source,PullFactoids) -> do
facts <- concatMap (\(key,m) -> map ((key,) . snd) $ M.toList m) . M.toList <$>
atomically (readTVar factoids)
infoM "hermes.gossip.pull" $ "Sending gossip: " ++ show (length facts)
send core source $ PulledFactoids facts
Just (_,PulledFactoids factoids) -> do
infoM "hermes.gossip.pull" $ "Received gossip: " ++ show (length factoids)
mapM_ (insertFactoid ctx) factoids
Just (source,e) -> do
warningM "hermes.gossip.pull" $ "Unexpected reply: " ++ show (source,e)
readFactoid :: forall factoid tag. (Typeable factoid, Serialize factoid, Typeable tag, Serialize tag)
=> GossipContext -> tag -> HermesID -> IO (Maybe factoid)
readFactoid GossipContext{factoids} tag source = do
let key = FactoidKey (showType (undefined :: factoid)) (showType tag) (encode tag)
factoid <- atomically $ fmap (decode . factoidData) . M.lookup source . M.findWithDefault M.empty key <$> readTVar factoids
case factoid of
Nothing -> return Nothing
Just (Left err) -> alertM "hermes.gossip.readFactoid" ("Factoid decode error: " ++ err) >> return Nothing
Just (Right f) -> return f
readFactoids :: forall factoid tag. (Typeable factoid, Serialize factoid, Typeable tag, Serialize tag)
=> GossipContext -> tag -> IO [(HermesID,factoid)]
readFactoids GossipContext{factoids} tag = do
let key = FactoidKey (showType (undefined :: factoid)) (showType tag) (encode tag)
facts <- M.toList . M.findWithDefault M.empty key <$> atomically (readTVar factoids)
fmap catMaybes $ forM facts $ \(source, decode . factoidData -> factoid) -> do
case factoid of
Left err -> alertM "hermes.gossip.readFactoids" ("Factoid decode error: " ++ err) >> return Nothing
Right f -> return $ Just f
newGossiper :: CoreContext
-> Double
-> IO GossipContext
newGossiper core period = do
rec
factoids <- newTVarIO M.empty
factoidTTLs <- newTVarIO M.empty
gossipInterval <- newTVarIO $ round $ period * 1000000
messageLoop <- forkIO $ factoidLoop ctx
callbacks <- newTVarIO M.empty
let ctx = GossipContext{..}
return ctx
setPeriod :: GossipContext
-> Double
-> IO ()
setPeriod GossipContext{gossipInterval} period =
atomically $ writeTVar gossipInterval $ round $ period * 1000000
snapshotGossiper :: GossipContext -> STM ByteString
snapshotGossiper GossipContext{..} = do
facts <- readTVar factoids
ttls <- readTVar factoidTTLs
interval <- readTVar gossipInterval
return $ encode (facts,ttls,interval)
restoreGossiper :: CoreContext -> ByteString -> IO GossipContext
restoreGossiper core (decode -> Right (facts,ttls,interval)) = do
rec
factoids <- newTVarIO facts
factoidTTLs <- newTVarIO ttls
gossipInterval <- newTVarIO interval
messageLoop <- forkIO $ factoidLoop ctx
callbacks <- newTVarIO M.empty
let ctx = GossipContext{..}
return ctx
addCallback :: forall msg tag. (Serialize tag, Typeable tag, Serialize msg, Typeable msg)
=> GossipContext -> (HermesID -> tag -> msg -> IO ()) -> IO ()
addCallback GossipContext{callbacks} function = do
let callback = Callback function
key = (showType (undefined :: tag), showType (undefined :: msg))
atomically $ modifyTVar callbacks (adjustWithDefault [] (callback:) key)