{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TupleSections #-}
module Network.Haskoin.Node.Manager
( manager
) where
import Control.Applicative
import Control.Concurrent.NQE
import Control.Concurrent.Unique
import Control.Monad
import Control.Monad.Except
import Control.Monad.Logger
import Control.Monad.Reader
import Data.Bits
import qualified Data.ByteString as BS
import Data.Conduit
import qualified Data.Conduit.Combinators as CC
import Data.Function
import Data.List
import Data.Maybe
import Data.Serialize (Get, Put, Serialize, get, put)
import qualified Data.Serialize as S
import Data.String
import Data.String.Conversions
import Data.Word
import Database.RocksDB (DB)
import Database.RocksDB.Query
import Network.Haskoin.Block
import Network.Haskoin.Constants
import Network.Haskoin.Network
import Network.Haskoin.Node.Common
import Network.Haskoin.Node.Peer
import Network.Socket (SockAddr (..))
import System.Random
import UnliftIO
import UnliftIO.Concurrent
import UnliftIO.Resource
type MonadManager n m
= ( MonadUnliftIO n
, MonadLoggerIO n
, MonadUnliftIO m
, MonadLoggerIO m
, MonadReader (ManagerReader n) m)
data ManagerReader n = ManagerReader
{ mySelf :: !Manager
, myChain :: !Chain
, myConfig :: !(ManagerConfig n)
, myPeerDB :: !DB
, myPeerSupervisor :: !(PeerSupervisor n)
, onlinePeers :: !(TVar [OnlinePeer])
, myBloomFilter :: !(TVar (Maybe BloomFilter))
, myBestBlock :: !(TVar BlockNode)
}
data Priority
= PriorityNetwork
| PrioritySeed
| PriorityManual
deriving (Eq, Show, Ord)
data PeerAddress
= PeerAddress { getPeerAddress :: SockAddr }
| PeerAddressBase
deriving (Eq, Show)
instance Serialize Priority where
get =
S.getWord8 >>= \case
0x00 -> return PriorityManual
0x01 -> return PrioritySeed
0x02 -> return PriorityNetwork
_ -> mzero
put PriorityManual = S.putWord8 0x00
put PrioritySeed = S.putWord8 0x01
put PriorityNetwork = S.putWord8 0x02
instance Serialize PeerAddress where
get = do
guard . (== 0x81) =<< S.getWord8
record <|> return PeerAddressBase
where
record = do
getPeerAddress <- decodeSockAddr
return PeerAddress {..}
put PeerAddress {..} = do
S.putWord8 0x81
encodeSockAddr getPeerAddress
put PeerAddressBase = S.putWord8 0x81
data PeerTimeAddress
= PeerTimeAddress { getPeerPrio :: !Priority
, getPeerBanned :: !Word32
, getPeerLastConnect :: !Word32
, getPeerNextConnect :: !Word32
, getPeerTimeAddress :: !PeerAddress }
| PeerTimeAddressBase
deriving (Eq, Show)
instance Key PeerTimeAddress
instance KeyValue PeerAddress PeerTimeAddress
instance KeyValue PeerTimeAddress PeerAddress
instance Serialize PeerTimeAddress where
get = do
guard . (== 0x80) =<< S.getWord8
record <|> return PeerTimeAddressBase
where
record = do
getPeerPrio <- S.get
getPeerBanned <- S.get
getPeerLastConnect <- (maxBound -) <$> S.get
getPeerNextConnect <- S.get
getPeerTimeAddress <- S.get
return PeerTimeAddress {..}
put PeerTimeAddress {..} = do
S.putWord8 0x80
S.put getPeerPrio
S.put getPeerBanned
S.put (maxBound - getPeerLastConnect)
S.put getPeerNextConnect
S.put getPeerTimeAddress
put PeerTimeAddressBase = S.putWord8 0x80
manager :: (MonadUnliftIO m, MonadLoggerIO m) => ManagerConfig m -> m ()
manager cfg = do
bb <- chainGetBest $ mgrConfChain cfg
opb <- newTVarIO []
bfb <- newTVarIO Nothing
bbb <- newTVarIO bb
withConnectLoop (mgrConfManager cfg) $ do
let rd =
ManagerReader
{ mySelf = mgrConfManager cfg
, myChain = mgrConfChain cfg
, myConfig = cfg
, myPeerDB = mgrConfDB cfg
, myPeerSupervisor = mgrConfPeerSupervisor cfg
, onlinePeers = opb
, myBloomFilter = bfb
, myBestBlock = bbb
}
run `runReaderT` rd
where
run = do
connectNewPeers
managerLoop
resolvePeers :: (MonadUnliftIO m, MonadManager n m) => m [(SockAddr, Priority)]
resolvePeers = do
cfg <- asks myConfig
let net = mgrConfNetwork cfg
confPeers <-
fmap
(map (, PriorityManual) . concat)
(mapM toSockAddr (mgrConfPeers cfg))
if mgrConfDiscover cfg
then do
seedPeers <-
fmap
(map (, PrioritySeed) . concat)
(mapM (toSockAddr . (, getDefaultPort net)) (getSeeds net))
return (confPeers ++ seedPeers)
else return confPeers
encodeSockAddr :: SockAddr -> Put
encodeSockAddr (SockAddrInet6 p _ (a, b, c, d) _) = do
S.putWord32be a
S.putWord32be b
S.putWord32be c
S.putWord32be d
S.putWord16be (fromIntegral p)
encodeSockAddr (SockAddrInet p a) = do
S.putWord32be 0x00000000
S.putWord32be 0x00000000
S.putWord32be 0x0000ffff
S.putWord32host a
S.putWord16be (fromIntegral p)
encodeSockAddr x = error $ "Colud not encode address: " <> show x
decodeSockAddr :: Get SockAddr
decodeSockAddr = do
a <- S.getWord32be
b <- S.getWord32be
c <- S.getWord32be
if a == 0x00000000 && b == 0x00000000 && c == 0x0000ffff
then do
d <- S.getWord32host
p <- S.getWord16be
return $ SockAddrInet (fromIntegral p) d
else do
d <- S.getWord32be
p <- S.getWord16be
return $ SockAddrInet6 (fromIntegral p) 0 (a, b, c, d) 0
connectPeer :: MonadManager n m => SockAddr -> m ()
connectPeer sa = do
db <- asks myPeerDB
let k = PeerAddress sa
retrieve db Nothing k >>= \case
Nothing -> error "Could not find peer to mark connected"
Just v -> do
now <- computeTime
let v' = v {getPeerLastConnect = now}
writeBatch db [deleteOp v, insertOp v' k, insertOp k v']
logPeersConnected
logPeersConnected :: MonadManager n m => m ()
logPeersConnected = do
mo <- mgrConfMaxPeers <$> asks myConfig
ps <- getOnlinePeers
$(logInfoS) "Manager" $
"Peers connected: " <> cs (show (length ps)) <> "/" <> cs (show mo)
storePeer :: MonadManager n m => SockAddr -> Priority -> m ()
storePeer sa prio = do
db <- asks myPeerDB
let k = PeerAddress sa
retrieve db Nothing k >>= \case
Nothing -> do
let v =
PeerTimeAddress
{ getPeerPrio = prio
, getPeerBanned = 0
, getPeerLastConnect = 0
, getPeerNextConnect = 0
, getPeerTimeAddress = k
}
writeBatch
db
[insertOp v k, insertOp k v]
Just v@PeerTimeAddress {..} ->
when (getPeerPrio < prio) $ do
let v' = v {getPeerPrio = prio}
writeBatch
db
[deleteOp v, insertOp v' k, insertOp k v']
Just PeerTimeAddressBase ->
error "Key for peer is corrupted"
banPeer :: MonadManager n m => SockAddr -> m ()
banPeer sa = do
db <- asks myPeerDB
let k = PeerAddress sa
retrieve db Nothing k >>= \case
Nothing -> error "Cannot find peer to be banned"
Just v -> do
now <- computeTime
let v' =
v
{ getPeerBanned = now
, getPeerNextConnect = now + 6 * 60 * 60
}
when (getPeerPrio v == PriorityNetwork) $ do
$(logWarnS) "Manager" $ "Banning peer " <> cs (show sa)
writeBatch db [deleteOp v, insertOp k v', insertOp v' k]
backoffPeer :: MonadManager n m => SockAddr -> m ()
backoffPeer sa = do
db <- asks myPeerDB
onlinePeers <- map onlinePeerAddress <$> getOnlinePeers
let k = PeerAddress sa
retrieve db Nothing k >>= \case
Nothing -> error "Cannot find peer to backoff in database"
Just v -> do
now <- computeTime
r <-
liftIO . randomRIO $
if null onlinePeers
then (90, 300)
else (900, 1800)
let t = max (now + r) (getPeerNextConnect v)
v' = v {getPeerNextConnect = t}
when (getPeerPrio v == PriorityNetwork) $ do
$(logWarnS) "Manager" $
"Backing off peer " <> cs (show sa) <> " for " <>
cs (show r) <>
" seconds"
writeBatch db [deleteOp v, insertOp k v', insertOp v' k]
getNewPeer :: (MonadUnliftIO m, MonadManager n m) => m (Maybe SockAddr)
getNewPeer = do
ManagerConfig {..} <- asks myConfig
online_peers <- map onlinePeerAddress <$> getOnlinePeers
config_peers <- concat <$> mapM toSockAddr mgrConfPeers
if mgrConfDiscover
then do
db <- asks myPeerDB
now <- computeTime
runResourceT . runConduit $
matching db Nothing PeerTimeAddressBase .|
CC.filter ((<= now) . getPeerNextConnect . fst) .|
CC.map (getPeerAddress . snd) .|
CC.find (not . (`elem` online_peers))
else return $ find (not . (`elem` online_peers)) config_peers
getConnectedPeers :: MonadManager n m => m [OnlinePeer]
getConnectedPeers = filter onlinePeerConnected <$> getOnlinePeers
withConnectLoop :: (MonadUnliftIO m, MonadLoggerIO m) => Manager -> m a -> m a
withConnectLoop mgr f = withAsync go $ const f
where
go =
forever $ do
ManagerPing `send` mgr
i <- liftIO (randomRIO (30, 90))
threadDelay (i * 1000 * 1000)
managerLoop :: (MonadUnliftIO m, MonadManager n m) => m ()
managerLoop =
forever $ do
mgr <- asks mySelf
msg <- receive mgr
processManagerMessage msg
processManagerMessage ::
(MonadUnliftIO m, MonadManager n m) => ManagerMessage -> m ()
processManagerMessage (ManagerSetFilter bf) = setFilter bf
processManagerMessage (ManagerSetBest bb) =
asks myBestBlock >>= atomically . (`writeTVar` bb)
processManagerMessage ManagerPing = connectNewPeers
processManagerMessage (ManagerGetAddr p) = do
pn <- peerString p
$(logWarnS) "Manager" $ "Ignoring address request from peer " <> fromString pn
processManagerMessage (ManagerNewPeers p as) =
asks myConfig >>= \case
ManagerConfig {..}
| not mgrConfDiscover -> return ()
| otherwise -> do
pn <- peerString p
$(logInfoS) "Manager" $
"Received " <> cs (show (length as)) <>
" peers from " <>
fromString pn
forM_ as $ \(_, na) ->
let sa = naAddress na
in storePeer sa PriorityNetwork
processManagerMessage (ManagerKill e p) =
findPeer p >>= \case
Nothing -> return ()
Just op -> do
$(logErrorS) "Manager" $
"Killing peer " <> cs (show (onlinePeerAddress op))
banPeer $ onlinePeerAddress op
onlinePeerAsync op `cancelWith` e
processManagerMessage (ManagerSetPeerBest p bn) = modifyPeer f p
where
f op = op {onlinePeerBestBlock = bn}
processManagerMessage (ManagerGetPeerBest p reply) =
findPeer p >>= \op -> atomically (reply (fmap onlinePeerBestBlock op))
processManagerMessage (ManagerSetPeerVersion p v) =
modifyPeer f p >> findPeer p >>= \case
Nothing -> return ()
Just op ->
runExceptT testVersion >>= \case
Left ex -> do
banPeer (onlinePeerAddress op)
onlinePeerAsync op `cancelWith` ex
Right () -> do
loadFilter
askForPeers
connectPeer (onlinePeerAddress op)
announcePeer
where
f op =
op
{ onlinePeerVersion = version v
, onlinePeerServices = services v
, onlinePeerRemoteNonce = verNonce v
, onlinePeerUserAgent = getVarString (userAgent v)
, onlinePeerRelay = relay v
}
testVersion = do
when (services v .&. nodeNetwork == 0) $ throwError NotNetworkPeer
bfb <- asks myBloomFilter
bf <- readTVarIO bfb
when (isJust bf && services v .&. nodeBloom == 0) $
throwError BloomFiltersNotSupported
myself <-
any ((verNonce v ==) . onlinePeerNonce) <$> lift getOnlinePeers
when myself $ throwError PeerIsMyself
loadFilter = do
bfb <- asks myBloomFilter
bf <- readTVarIO bfb
case bf of
Nothing -> return ()
Just b -> b `peerSetFilter` p
askForPeers =
mgrConfDiscover <$> asks myConfig >>= \discover ->
when discover (MGetAddr `sendMessage` p)
announcePeer =
findPeer p >>= \case
Nothing -> return ()
Just op
| onlinePeerConnected op -> return ()
| otherwise -> do
$(logInfoS) "Manager" $
"Connected to " <> cs (show (onlinePeerAddress op))
l <- mgrConfMgrListener <$> asks myConfig
atomically (l (ManagerConnect p))
ch <- asks myChain
chainNewPeer p ch
setPeerAnnounced p
processManagerMessage (ManagerGetPeerVersion p reply) =
fmap onlinePeerVersion <$> findPeer p >>= atomically . reply
processManagerMessage (ManagerGetPeers reply) =
getPeers >>= atomically . reply
processManagerMessage (ManagerGetOnlinePeer p reply) =
getOnlinePeer p >>= atomically . reply
processManagerMessage (ManagerPeerPing p i) =
modifyPeer (\x -> x {onlinePeerPings = take 11 $ i : onlinePeerPings x}) p
processManagerMessage (PeerStopped (p, _ex)) = do
opb <- asks onlinePeers
m <- atomically $ do
m <- findPeerAsync p opb
when (isJust m) $ removePeer p opb
return m
case m of
Just op -> do
backoffPeer (onlinePeerAddress op)
processPeerOffline op
Nothing -> return ()
processPeerOffline :: MonadManager n m => OnlinePeer -> m ()
processPeerOffline op
| onlinePeerConnected op = do
let p = onlinePeerMailbox op
$(logWarnS) "Manager" $
"Disconnected peer " <> cs (show (onlinePeerAddress op))
asks myChain >>= chainRemovePeer p
l <- mgrConfMgrListener <$> asks myConfig
atomically (l (ManagerDisconnect p))
logPeersConnected
| otherwise =
$(logWarnS) "Manager" $
"Could not connect to peer " <> cs (show (onlinePeerAddress op))
getPeers :: MonadManager n m => m [OnlinePeer]
getPeers = sortBy (compare `on` median . onlinePeerPings) <$> getConnectedPeers
getOnlinePeer :: MonadManager n m => Peer -> m (Maybe OnlinePeer)
getOnlinePeer p = find ((== p) . onlinePeerMailbox) <$> getConnectedPeers
connectNewPeers :: MonadManager n m => m ()
connectNewPeers = do
mo <- mgrConfMaxPeers <$> asks myConfig
ps <- getOnlinePeers
let n = mo - length ps
when (null ps) $ do
ps' <- resolvePeers
mapM_ (uncurry storePeer) ps'
go n
where
go 0 = return ()
go n =
getNewPeer >>= \case
Nothing -> return ()
Just sa -> conn sa >> go (n - 1)
conn sa = do
ad <- mgrConfNetAddr <$> asks myConfig
mgr <- asks mySelf
ch <- asks myChain
pl <- mgrConfPeerListener <$> asks myConfig
net <- mgrConfNetwork <$> asks myConfig
$(logInfoS) "Manager" $ "Connecting to peer " <> cs (show sa)
bbb <- asks myBestBlock
bb <- readTVarIO bbb
nonce <- liftIO randomIO
let pc =
PeerConfig
{ peerConfConnect = NetworkAddress (srv net) sa
, peerConfInitBest = bb
, peerConfLocal = ad
, peerConfManager = mgr
, peerConfChain = ch
, peerConfListener = pl
, peerConfNonce = nonce
, peerConfNetwork = net
}
psup <- asks myPeerSupervisor
pmbox <- newTBQueueIO 100
uid <- liftIO newUnique
let p = UniqueInbox {uniqueInbox = Inbox pmbox, uniqueId = uid}
a <- psup `addChild` peer pc p
newPeerConnection net sa nonce p a
srv net
| getSegWit net = 8
| otherwise = 0
newPeerConnection ::
MonadManager n m
=> Network
-> SockAddr
-> Word64
-> Peer
-> Async ()
-> m ()
newPeerConnection net sa nonce p a =
addPeer
OnlinePeer
{ onlinePeerAddress = sa
, onlinePeerConnected = False
, onlinePeerVersion = 0
, onlinePeerServices = 0
, onlinePeerRemoteNonce = 0
, onlinePeerUserAgent = BS.empty
, onlinePeerRelay = False
, onlinePeerBestBlock = genesisNode net
, onlinePeerAsync = a
, onlinePeerMailbox = p
, onlinePeerNonce = nonce
, onlinePeerPings = []
}
peerString :: MonadManager n m => Peer -> m String
peerString p = maybe "[unknown]" (show . onlinePeerAddress) <$> findPeer p
setPeerAnnounced :: MonadManager n m => Peer -> m ()
setPeerAnnounced = modifyPeer (\x -> x {onlinePeerConnected = True})
setFilter :: MonadManager n m => BloomFilter -> m ()
setFilter bl = do
bfb <- asks myBloomFilter
atomically . writeTVar bfb $ Just bl
ops <- getOnlinePeers
forM_ ops $ \op ->
when (onlinePeerConnected op) $
if acceptsFilters $ onlinePeerServices op
then bl `peerSetFilter` onlinePeerMailbox op
else do
$(logErrorS) "Manager" $
"Peer " <> cs (show (onlinePeerAddress op)) <>
"does not support bloom filters"
banPeer (onlinePeerAddress op)
onlinePeerAsync op `cancelWith` BloomFiltersNotSupported
findPeer :: MonadManager n m => Peer -> m (Maybe OnlinePeer)
findPeer p = find ((== p) . onlinePeerMailbox) <$> getOnlinePeers
findPeerAsync :: Async () -> TVar [OnlinePeer] -> STM (Maybe OnlinePeer)
findPeerAsync a t = find ((== a) . onlinePeerAsync) <$> readTVar t
modifyPeer :: MonadManager n m => (OnlinePeer -> OnlinePeer) -> Peer -> m ()
modifyPeer f p = modifyOnlinePeers $ map upd
where
upd op =
if onlinePeerMailbox op == p
then f op
else op
addPeer :: MonadManager n m => OnlinePeer -> m ()
addPeer op = modifyOnlinePeers $ nubBy f . (op :)
where
f = (==) `on` onlinePeerMailbox
removePeer :: Async () -> TVar [OnlinePeer] -> STM ()
removePeer a t = modifyTVar t $ filter ((/= a) . onlinePeerAsync)
getOnlinePeers :: MonadManager n m => m [OnlinePeer]
getOnlinePeers = asks onlinePeers >>= readTVarIO
modifyOnlinePeers :: MonadManager n m => ([OnlinePeer] -> [OnlinePeer]) -> m ()
modifyOnlinePeers f = asks onlinePeers >>= atomically . (`modifyTVar` f)
median :: Fractional a => [a] -> Maybe a
median ls
| null ls = Nothing
| length ls `mod` 2 == 0 =
Just . (/ 2) . sum . take 2 $ drop (length ls `div` 2 - 1) ls
| otherwise = Just . head $ drop (length ls `div` 2) ls