{-# LANGUAGE ConstraintKinds #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE TemplateHaskell #-} {-# LANGUAGE TupleSections #-} {-| Module : Network.Haskoin.Node.Manager Copyright : No rights reserved License : UNLICENSE Maintainer : xenog@protonmail.com Stability : experimental Portability : POSIX Peer manager process. -} module Network.Haskoin.Node.Manager ( manager ) where import Conduit import Control.Monad import Control.Monad.Except import Control.Monad.Logger import Control.Monad.Reader import Control.Monad.Trans.Maybe import Data.Bits import qualified Data.ByteString as B import Data.Default import Data.List import Data.Maybe import Data.Serialize (Get, Put, Serialize) import Data.Serialize as S import Data.Set (Set) import qualified Data.Set as Set import Data.String.Conversions import Data.Text (Text) import Data.Time.Clock import Data.Time.Clock.POSIX import Data.Word import Database.RocksDB (DB) import qualified Database.RocksDB as R import Database.RocksDB.Query as R import Haskoin import Network.Haskoin.Node.Common import Network.Haskoin.Node.Peer import Network.Socket (SockAddr (..)) import NQE import System.Random import UnliftIO import UnliftIO.Concurrent import UnliftIO.Resource as U -- | Monad used by most functions in this module. type MonadManager m = (MonadLoggerIO m, MonadReader ManagerReader m) -- | Reader for peer configuration and state. data ManagerReader = ManagerReader { myConfig :: !ManagerConfig , mySupervisor :: !Supervisor , myMailbox :: !Manager , myBestBlock :: !(TVar BlockHeight) , knownPeers :: !(TVar (Set SockAddr)) , onlinePeers :: !(TVar [OnlinePeer]) } -- | Peer Manager process. In order to fully start it needs to receive a -- 'ManageBestBlock' event. manager :: (MonadUnliftIO m, MonadLoggerIO m) => ManagerConfig -> Inbox ManagerMessage -> m () manager cfg inbox = withSupervisor (Notify f) $ \sup -> do bb <- newTVarIO 0 kp <- newTVarIO Set.empty ob <- newTVarIO [] let rd = ManagerReader { myConfig = cfg , mySupervisor = sup , myMailbox = mgr , myBestBlock = bb , knownPeers = kp , onlinePeers = ob } (go `finally` storePeersInDB) `runReaderT` rd where mgr = inboxToMailbox inbox discover = mgrConfDiscover cfg go = do db <- getManagerDB $(logDebugS) "Manager" "Initializing..." initPeerDB db discover putBestBlock <=< receiveMatch inbox $ \case ManagerBestBlock b -> Just b _ -> Nothing $(logDebugS) "Manager" "Initialization complete" withConnectLoop mgr $ forever $ do $(logDebugS) "Manager" "Awaiting message..." m <- receive inbox managerMessage m f (a, mex) = ManagerPeerDied a mex `sendSTM` mgr putBestBlock :: MonadManager m => BlockHeight -> m () putBestBlock bb = do $(logDebugS) "Manager" $ "Best block at height " <> cs (show bb) asks myBestBlock >>= \b -> atomically $ writeTVar b bb getBestBlock :: MonadManager m => m BlockHeight getBestBlock = asks myBestBlock >>= readTVarIO getNetwork :: MonadManager m => m Network getNetwork = mgrConfNetwork <$> asks myConfig loadPeers :: (MonadUnliftIO m, MonadManager m) => m () loadPeers = do os <- readTVarIO =<< asks onlinePeers ks <- readTVarIO =<< asks knownPeers if null os && Set.null ks then do loadStaticPeers d <- mgrConfDiscover <$> asks myConfig when d $ loadPeersFromDB >> loadNetSeeds else $(logDebugS) "Manager" "Peers already available, not initialising" loadStaticPeers :: (MonadUnliftIO m, MonadManager m) => m () loadStaticPeers = do $(logDebugS) "Manager" "Loading static peers" xs <- mgrConfPeers <$> asks myConfig mapM_ newPeer =<< concat <$> mapM toSockAddr xs loadPeersFromDB :: (MonadUnliftIO m, MonadManager m) => m () loadPeersFromDB = do $(logDebugS) "Manager" "Loading peers from database" db <- getManagerDB xs <- matchingAsList db R.defaultReadOptions PeerAddressBase let f (PeerAddress x, ()) = x f _ = undefined ys = map f xs $(logInfoS) "Manager" $ "Loaded " <> cs (show (length ys)) <> " peers from database" mapM_ newPeer ys storePeersInDB :: (MonadUnliftIO m, MonadManager m) => m () storePeersInDB = do db <- getManagerDB os <- readTVarIO =<< asks onlinePeers ks <- readTVarIO =<< asks knownPeers let ps = map onlinePeerAddress os <> Set.toList ks xs = map ((`insertOp` ()) . PeerAddress) ps unless (null xs) $ do $(logInfoS) "Manager" $ "Storing " <> cs (show (length xs)) <> " peers in database" writeBatch db =<< (++ xs) <$> purgePeerDB db loadNetSeeds :: (MonadUnliftIO m, MonadManager m) => m () loadNetSeeds = do net <- getNetwork $(logDebugS) "Manager" "Loading network seeds" ss <- concat <$> mapM toSockAddr (networkSeeds net) $(logDebugS) "Manager" $ "Adding " <> cs (show (length ss)) <> " seed peers" mapM_ newPeer ss logConnectedPeers :: MonadManager m => m () logConnectedPeers = do m <- mgrConfMaxPeers <$> asks myConfig l <- length <$> getConnectedPeers $(logInfoS) "Manager" $ "Peers connected: " <> cs (show l) <> "/" <> cs (show m) getManagerDB :: MonadManager m => m DB getManagerDB = mgrConfDB <$> asks myConfig getOnlinePeers :: MonadManager m => m [OnlinePeer] getOnlinePeers = asks onlinePeers >>= readTVarIO getConnectedPeers :: MonadManager m => m [OnlinePeer] getConnectedPeers = filter onlinePeerConnected <$> getOnlinePeers purgePeers :: (MonadUnliftIO m, MonadManager m) => m () purgePeers = do db <- getManagerDB ops <- getOnlinePeers forM_ ops $ \OnlinePeer {onlinePeerMailbox = p} -> killPeer PurgingPeer p writeBatch db =<< purgePeerDB db forwardMessage :: MonadManager m => Peer -> Message -> m () forwardMessage p = managerEvent . PeerMessage p managerEvent :: MonadManager m => PeerEvent -> m () managerEvent e = mgrConfEvents <$> asks myConfig >>= \l -> atomically $ l e managerMessage :: (MonadUnliftIO m, MonadManager m) => ManagerMessage -> m () managerMessage (ManagerPeerMessage p (MVersion v)) = do b <- asks onlinePeers s <- atomically $ peerString b p e <- runExceptT $ do let ua = getVarString $ userAgent v $(logDebugS) "Manager" $ "Got version from peer " <> s <> ": " <> cs ua o <- ExceptT . atomically $ setPeerVersion b p v when (onlinePeerConnected o) $ announcePeer p case e of Right () -> do $(logDebugS) "Manager" $ "Version accepted for peer " <> s MVerAck `sendMessage` p Left x -> do $(logErrorS) "Manager" $ "Version rejected for peer " <> s <> ": " <> cs (show x) killPeer x p managerMessage (ManagerPeerMessage p MVerAck) = do b <- asks onlinePeers s <- atomically $ peerString b p atomically (setPeerVerAck b p) >>= \case Just o -> do $(logDebugS) "Manager" $ "Received verack from peer: " <> s when (onlinePeerConnected o) $ announcePeer p Nothing -> do $(logErrorS) "Manager" $ "Received verack from unknown peer: " <> s killPeer UnknownPeer p managerMessage (ManagerPeerMessage p (MAddr (Addr nas))) = do b <- asks onlinePeers s <- atomically $ peerString b p let n = length nas $(logDebugS) "Manager" $ "Received " <> cs (show n) <> " addresses from peer " <> s mgrConfDiscover <$> asks myConfig >>= \case True -> do let sas = map (naAddress . snd) nas forM_ sas newPeer False -> $(logDebugS) "Manager" "Ignoring received peers (discovery disabled)" managerMessage (ManagerPeerMessage p m@(MPong (Pong n))) = do now <- liftIO getCurrentTime b <- asks onlinePeers s <- atomically $ peerString b p atomically (gotPong b n now p) >>= \case Nothing -> do $(logDebugS) "Manager" $ "Forwarding pong " <> cs (show n) <> " from " <> s forwardMessage p m Just d -> do let ms = fromRational . toRational $ d * 1000 :: Double $(logDebugS) "Manager" $ "Ping roundtrip to " <> s <> ": " <> cs (show ms) <> " ms" managerMessage (ManagerPeerMessage p (MPing (Ping n))) = do b <- asks onlinePeers s <- atomically $ peerString b p $(logDebugS) "Manager" $ "Responding to ping " <> cs (show n) <> " from " <> s MPong (Pong n) `sendMessage` p managerMessage (ManagerPeerMessage p m) = do b <- asks onlinePeers s <- atomically $ peerString b p let cmd = commandToString $ msgType m $(logDebugS) "Manager" $ "Forwarding message " <> cs cmd <> " from peer " <> s forwardMessage p m managerMessage (ManagerBestBlock h) = do $(logDebugS) "Manager" $ "Setting best block at height " <> cs (show h) putBestBlock h managerMessage ManagerConnect = do l <- length <$> getConnectedPeers x <- mgrConfMaxPeers <$> asks myConfig if l < x then getNewPeer >>= \case Nothing -> $(logDebugS) "Manager" "No peers available to connect" Just sa -> connectPeer sa else $(logDebugS) "Manager" "Enough peers connected." managerMessage (ManagerPeerDied a e) = processPeerOffline a e managerMessage ManagerPurgePeers = do $(logWarnS) "Manager" "Purging connected peers and peer database" purgePeers managerMessage (ManagerGetPeers reply) = do $(logDebugS) "Manager" "Responding to request for connected peers" ps <- getConnectedPeers $(logDebugS) "Manager" $ "There are " <> cs (show (length ps)) <> " connected peers" atomically $ reply ps managerMessage (ManagerGetOnlinePeer p reply) = do $(logDebugS) "Manager" "Responding to request for particular peer" b <- asks onlinePeers m <- atomically $ findPeer b p >>= \o -> reply o >> return o case m of Nothing -> $(logDebugS) "Manager" "Requested peer not found" Just o -> $(logDebugS) "Manager" $ "Peer found at address: " <> cs (show (onlinePeerAddress o)) managerMessage (ManagerCheckPeer p) = checkPeer p checkPeer :: MonadManager m => Peer -> m () checkPeer p = do ManagerConfig {mgrConfTimeout = to} <- asks myConfig b <- asks onlinePeers s <- atomically $ peerString b p $(logDebugS) "Manager" $ "Checking on peer " <> s atomically (lastPing b p) >>= \case Nothing -> pingPeer p Just t -> do now <- liftIO getCurrentTime if diffUTCTime now t > fromIntegral to then do $(logErrorS) "Manager" $ "Peer " <> s <> " did not respond ping on time" killPeer PeerTimeout p else $(logDebugS) "Manager" $ "peer " <> s <> " awaiting pong" pingPeer :: MonadManager m => Peer -> m () pingPeer p = do b <- asks onlinePeers s <- atomically $ peerString b p atomically (findPeer b p) >>= \case Nothing -> $(logErrorS) "Manager" $ "Will not ping unknown peer " <> s Just o | onlinePeerConnected o -> do n <- liftIO randomIO now <- liftIO getCurrentTime atomically (setPeerPing b n now p) $(logDebugS) "Manager" $ "Sending ping " <> cs (show n) <> " to peer " <> s MPing (Ping n) `sendMessage` p | otherwise -> $(logWarnS) "Manager" $ "Will not ping peer " <> s <> " until handshake complete" processPeerOffline :: MonadManager m => Child -> Maybe SomeException -> m () processPeerOffline a e = do b <- asks onlinePeers atomically (findPeerAsync b a) >>= \case Nothing -> log_unknown e Just o -> do let p = onlinePeerMailbox o d = onlinePeerAddress o s <- atomically $ peerString b p if onlinePeerConnected o then do log_disconnected s e managerEvent $ PeerDisconnected p d else log_not_connect s e atomically $ removePeer b p logConnectedPeers where log_unknown Nothing = $(logErrorS) "Manager" "Disconnected unknown peer" log_unknown (Just x) = $(logErrorS) "Manager" $ "Unknown peer died: " <> cs (show x) log_disconnected s Nothing = $(logWarnS) "Manager" $ "Disconnected peer: " <> s log_disconnected s (Just x) = $(logErrorS) "Manager" $ "Peer " <> s <> " died: " <> cs (show x) log_not_connect s Nothing = $(logWarnS) "Manager" $ "Could not connect to peer " <> s log_not_connect s (Just x) = $(logErrorS) "Manager" $ "Could not connect to peer " <> s <> ": " <> cs (show x) announcePeer :: MonadManager m => Peer -> m () announcePeer p = do b <- asks onlinePeers s <- atomically $ peerString b p mgr <- asks myMailbox atomically (findPeer b p) >>= \case Just OnlinePeer {onlinePeerAddress = a, onlinePeerConnected = True} -> do $(logInfoS) "Manager" $ "Handshake completed for peer " <> s managerEvent $ PeerConnected p a logConnectedPeers managerCheck p mgr Just OnlinePeer {onlinePeerConnected = False} -> $(logErrorS) "Manager" $ "Not announcing because not handshaken: " <> s Nothing -> $(logErrorS) "Manager" "Will not announce unknown peer" getNewPeer :: (MonadUnliftIO m, MonadManager m) => m (Maybe SockAddr) getNewPeer = runMaybeT $ lift loadPeers >> go where go = do b <- asks knownPeers ks <- readTVarIO b guard . not $ Set.null ks let xs = Set.toList ks a <- liftIO $ randomRIO (0, length xs - 1) let p = xs !! a atomically . modifyTVar b $ Set.delete p o <- asks onlinePeers atomically (findPeerAddress o p) >>= \case Nothing -> return p Just _ -> go connectPeer :: (MonadUnliftIO m, MonadManager m) => SockAddr -> m () connectPeer sa = do os <- asks onlinePeers atomically (findPeerAddress os sa) >>= \case Just _ -> $(logErrorS) "Manager" $ "Attempted to connect to peer twice: " <> cs (show sa) Nothing -> do $(logInfoS) "Manager" $ "Connecting to " <> cs (show sa) ManagerConfig {mgrConfNetAddr = ad, mgrConfNetwork = net} <- asks myConfig mgr <- asks myMailbox sup <- asks mySupervisor nonce <- liftIO randomIO bb <- getBestBlock now <- round <$> liftIO getPOSIXTime let rmt = NetworkAddress (srv net) sa ver = buildVersion net nonce bb ad rmt now (inbox, p) <- newMailbox let pc pub = PeerConfig { peerConfListen = pub , peerConfNetwork = net , peerConfAddress = sa } a <- withRunInIO $ \io -> sup `addChild` io (launch mgr pc inbox p) MVersion ver `sendMessage` p b <- asks onlinePeers _ <- atomically $ newOnlinePeer b sa nonce p a return () where l mgr p m = ManagerPeerMessage p m `sendSTM` mgr srv net | getSegWit net = 8 | otherwise = 0 launch mgr pc inbox p = withPublisher $ \pub -> bracket (subscribe pub (l mgr p)) (unsubscribe pub) $ \_ -> withPeerLoop sa p mgr $ \a -> do link a peer (pc pub) inbox withPeerLoop :: (MonadUnliftIO m, MonadLogger m) => SockAddr -> Peer -> Manager -> (Async a -> m a) -> m a withPeerLoop sa p mgr = withAsync go where go = forever $ do threadDelay =<< liftIO (randomRIO (30 * 1000 * 1000, 90 * 1000 * 1000)) $(logDebugS) "ManagerPeerLoop" $ "Ping manager about peer: " <> cs (show sa) ManagerCheckPeer p `send` mgr withConnectLoop :: (MonadLogger m, MonadUnliftIO m) => Manager -> m a -> m a withConnectLoop mgr act = withAsync go (\a -> link a >> act) where go = forever $ do $(logDebugS) "ManagerConnectLoop" "Ping manager for housekeeping" ManagerConnect `send` mgr threadDelay =<< liftIO (randomRIO (2 * 1000 * 1000, 10 * 1000 * 1000)) -- | Database version. versionPeerDB :: Word32 versionPeerDB = 5 -- | Peer address key in database. data PeerAddress = PeerAddress { getPeerAddress :: !SockAddr -- ^ peer network address and port } | PeerAddressBase deriving (Eq, Ord, Show) -- | Database version key. data PeerDataVersionKey = PeerDataVersionKey deriving (Eq, Ord, Show) instance Serialize PeerDataVersionKey where get = do guard . (== 0x82) =<< S.getWord8 return PeerDataVersionKey put PeerDataVersionKey = S.putWord8 0x82 instance R.Key PeerDataVersionKey instance KeyValue PeerDataVersionKey Word32 instance Serialize PeerAddress where get = do guard . (== 0x81) =<< S.getWord8 PeerAddress <$> decodeSockAddr put PeerAddress {getPeerAddress = a} = do S.putWord8 0x81 encodeSockAddr a put PeerAddressBase = S.putWord8 0x81 instance R.Key PeerAddress instance KeyValue PeerAddress () -- | Add a peer. newPeer :: (MonadIO m, MonadManager m) => SockAddr -> m () newPeer sa = do b <- asks knownPeers o <- asks onlinePeers i <- atomically $ findPeerAddress o sa when (isNothing i) $ atomically . modifyTVar b $ Set.insert sa -- | Initialize peer database, purging it of conflicting records if version -- doesn't match current one, or if peer discovery is disabled. initPeerDB :: MonadUnliftIO m => DB -> Bool -> m () initPeerDB db discover = do ver <- retrieve db def PeerDataVersionKey when (ver /= Just versionPeerDB || not discover) $ writeBatch db =<< purgePeerDB db R.insert db PeerDataVersionKey versionPeerDB -- | Purge peer records from database. purgePeerDB :: MonadUnliftIO m => DB -> m [R.BatchOp] purgePeerDB db = (++) <$> purge_byte 0x81 <*> purge_byte 0x83 where purge_byte byte = U.runResourceT . R.withIterator db def $ \it -> do R.iterSeek it $ B.singleton byte recurse_delete it byte recurse_delete it byte = R.iterKey it >>= \case Just k | B.head k == byte -> do R.iterNext it (R.Del k :) <$> recurse_delete it byte _ -> return [] -- | Get static network seeds. networkSeeds :: Network -> [HostPort] networkSeeds net = map (, getDefaultPort net) (getSeeds net) -- | Serialize a network address/port. encodeSockAddr :: SockAddr -> Put encodeSockAddr (SockAddrInet6 p _ (a, b, c, d) _) = do S.putWord32be a S.putWord32be b S.putWord32be c S.putWord32be d S.putWord16be (fromIntegral p) encodeSockAddr (SockAddrInet p a) = do S.putWord32be 0x00000000 S.putWord32be 0x00000000 S.putWord32be 0x0000ffff S.putWord32host a S.putWord16be (fromIntegral p) encodeSockAddr x = error $ "Could not encode address: " <> show x -- | Deserialize a network address/port. decodeSockAddr :: Get SockAddr decodeSockAddr = do a <- S.getWord32be b <- S.getWord32be c <- S.getWord32be if a == 0x00000000 && b == 0x00000000 && c == 0x0000ffff then do d <- S.getWord32host p <- S.getWord16be return $ SockAddrInet (fromIntegral p) d else do d <- S.getWord32be p <- S.getWord16be return $ SockAddrInet6 (fromIntegral p) 0 (a, b, c, d) 0 -- | Report receiving a pong from a connected peer. Will store ping roundtrip -- time in a window of latest eleven. Peers are returned by the manager in order -- of median roundtrip time. gotPong :: TVar [OnlinePeer] -> Word64 -> UTCTime -> Peer -> STM (Maybe NominalDiffTime) gotPong b nonce now p = runMaybeT $ do o <- MaybeT $ findPeer b p (time, old_nonce) <- MaybeT . return $ onlinePeerPing o guard $ nonce == old_nonce let diff = now `diffUTCTime` time lift $ insertPeer b o { onlinePeerPing = Nothing , onlinePeerPings = take 11 $ diff : onlinePeerPings o } return diff -- | Return time of last ping sent to peer, if any. lastPing :: TVar [OnlinePeer] -> Peer -> STM (Maybe UTCTime) lastPing b p = findPeer b p >>= \case Just OnlinePeer {onlinePeerPing = Just (time, _)} -> return (Just time) _ -> return Nothing -- | Set nonce and time of last ping sent to peer. setPeerPing :: TVar [OnlinePeer] -> Word64 -> UTCTime -> Peer -> STM () setPeerPing b nonce now p = modifyPeer b p $ \o -> o {onlinePeerPing = Just (now, nonce)} -- | Set version for online peer. Will set the peer connected status to 'True' -- if a verack message has already been registered for that peer. setPeerVersion :: TVar [OnlinePeer] -> Peer -> Version -> STM (Either PeerException OnlinePeer) setPeerVersion b p v = runExceptT $ do when (services v .&. nodeNetwork == 0) $ throwError NotNetworkPeer ops <- lift $ readTVar b when (any ((verNonce v ==) . onlinePeerNonce) ops) $ throwError PeerIsMyself lift (findPeer b p) >>= \case Nothing -> throwError UnknownPeer Just o -> do let n = o { onlinePeerVersion = Just v , onlinePeerConnected = onlinePeerVerAck o } lift $ insertPeer b n return n -- | Register that a verack message was received from a peer. setPeerVerAck :: TVar [OnlinePeer] -> Peer -> STM (Maybe OnlinePeer) setPeerVerAck b p = runMaybeT $ do o <- MaybeT $ findPeer b p let n = o { onlinePeerVerAck = True , onlinePeerConnected = isJust (onlinePeerVersion o) } lift $ insertPeer b n return n -- | Create 'OnlinePeer' data structure. newOnlinePeer :: TVar [OnlinePeer] -> SockAddr -- ^ peer address -> Word64 -- ^ nonce sent to peer -> Peer -- ^ peer mailbox -> Async () -- ^ peer asynchronous handle -> STM OnlinePeer newOnlinePeer b sa n p a = do let op = OnlinePeer { onlinePeerAddress = sa , onlinePeerVerAck = False , onlinePeerConnected = False , onlinePeerVersion = Nothing , onlinePeerAsync = a , onlinePeerMailbox = p , onlinePeerNonce = n , onlinePeerPings = [] , onlinePeerPing = Nothing } insertPeer b op return op -- | Get a human-readable string for the peer address. peerString :: TVar [OnlinePeer] -> Peer -> STM Text peerString b p = maybe "[unknown]" (cs . show . onlinePeerAddress) <$> findPeer b p -- | Find a connected peer. findPeer :: TVar [OnlinePeer] -> Peer -> STM (Maybe OnlinePeer) findPeer b p = find ((== p) . onlinePeerMailbox) <$> readTVar b -- | Insert or replace a connected peer. insertPeer :: TVar [OnlinePeer] -> OnlinePeer -> STM () insertPeer b o = modifyTVar b $ \x -> sort . nub $ o : x -- | Modify an online peer. modifyPeer :: TVar [OnlinePeer] -> Peer -> (OnlinePeer -> OnlinePeer) -> STM () modifyPeer b p f = findPeer b p >>= \case Nothing -> return () Just o -> insertPeer b $ f o -- | Remove an online peer. removePeer :: TVar [OnlinePeer] -> Peer -> STM () removePeer b p = modifyTVar b $ \x -> filter ((/= p) . onlinePeerMailbox) x -- | Find online peer by asynchronous handle. findPeerAsync :: TVar [OnlinePeer] -> Async () -> STM (Maybe OnlinePeer) findPeerAsync b a = find ((== a) . onlinePeerAsync) <$> readTVar b findPeerAddress :: TVar [OnlinePeer] -> SockAddr -> STM (Maybe OnlinePeer) findPeerAddress b a = find ((== a) . onlinePeerAddress) <$> readTVar b