{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TupleSections #-}
module Network.Haskoin.Store.Block
( blockStore
) where
import Control.Monad.Except
import Control.Monad.Logger
import Control.Monad.Reader
import Control.Monad.Trans.Maybe
import qualified Data.HashMap.Strict as M
import Data.Maybe
import Data.String
import Data.Time.Clock.System
import Database.RocksDB
import Haskoin
import Haskoin.Node
import Network.Haskoin.Store.Data
import Network.Haskoin.Store.Data.HashMap
import Network.Haskoin.Store.Data.ImportDB
import Network.Haskoin.Store.Logic
import Network.Haskoin.Store.Messages
import NQE
import System.Random
import UnliftIO
import UnliftIO.Concurrent
data BlockException
= BlockNotInChain !BlockHash
| Uninitialized
| UnexpectedGenesisNode
| SyncingPeerExpected
| AncestorNotInChain !BlockHeight
!BlockHash
deriving (Show, Eq, Ord, Exception)
data Syncing = Syncing
{ syncingPeer :: !Peer
, syncingTime :: !UnixTime
, syncingHead :: !BlockNode
}
data BlockRead = BlockRead
{ mySelf :: !BlockStore
, myConfig :: !BlockConfig
, myPeer :: !(TVar (Maybe Syncing))
, myUnspent :: !(TVar UnspentMap)
, myBalances :: !(TVar BalanceMap)
}
blockStore ::
(MonadUnliftIO m, MonadLoggerIO m)
=> BlockConfig
-> Inbox BlockMessage
-> m ()
blockStore cfg inbox = do
$(logInfoS) "Block" "Initializing block store..."
pb <- newTVarIO Nothing
um <- newTVarIO M.empty
bm <- newTVarIO (M.empty, [])
runReaderT
(ini >> run)
BlockRead
{ mySelf = inboxToMailbox inbox
, myConfig = cfg
, myPeer = pb
, myUnspent = um
, myBalances = bm
}
where
ini = do
db <- blockConfDB <$> asks myConfig
net <- blockConfNet <$> asks myConfig
um <- asks myUnspent
bm <- asks myBalances
runExceptT (initDB net db um bm) >>= \case
Left e -> do
$(logErrorS) "Block" $
"Could not initialize block store: " <> fromString (show e)
throwIO e
Right () -> $(logInfoS) "Block" "Initialization complete"
run =
withAsync (pingMe (inboxToMailbox inbox)) $ \_ ->
forever $ receive inbox >>= processBlockMessage
isSynced :: (MonadReader BlockRead m, MonadUnliftIO m) => m Bool
isSynced = do
db <- blockConfDB <$> asks myConfig
getBestBlock (db, defaultReadOptions) >>= \case
Nothing -> throwIO Uninitialized
Just bb -> do
ch <- blockConfChain <$> asks myConfig
chainGetBest ch >>= \cb -> return (headerHash (nodeHeader cb) == bb)
mempool ::
(MonadReader BlockRead m, MonadUnliftIO m, MonadLoggerIO m) => Peer -> m ()
mempool p = MMempool `sendMessage` p
pruneCache :: (MonadReader BlockRead m, MonadUnliftIO m, MonadLoggerIO m) => m ()
pruneCache = do
um <- asks myUnspent
bm <- asks myBalances
u <- readTVarIO um
b <- readTVarIO bm
$(logDebugS) "Block" $
"Unspent output cache pre-prune tx count: " <>
fromString (show (M.size u))
$(logDebugS) "Block" $
"Address cache pre-prune count: " <> fromString (show (M.size (fst b)))
pruneUnspent um
pruneBalance bm
u' <- readTVarIO um
b' <- readTVarIO bm
$(logDebugS) "Block" $
"Unspent output cache post-prune tx count: " <>
fromString (show (M.size u'))
$(logDebugS) "Block" $
"Address cache post-prune count: " <>
fromString (show (M.size (fst b')))
processBlock ::
(MonadReader BlockRead m, MonadUnliftIO m, MonadLoggerIO m)
=> Peer
-> Block
-> m ()
processBlock p b = do
$(logDebugS) "Block" ("Processing incoming block " <> hex)
void . runMaybeT $ do
iss >>= \x ->
unless x $ do
$(logErrorS)
"Block"
("Cannot accept block " <> hex <> " from non-syncing peer")
mzero
db <- blockConfDB <$> asks myConfig
n <- cbn
upr
net <- blockConfNet <$> asks myConfig
um <- asks myUnspent
bm <- asks myBalances
runExceptT (newBlock net db um bm b n) >>= \case
Right () -> do
l <- blockConfListener <$> asks myConfig
atomically $ l (StoreBestBlock (headerHash (blockHeader b)))
lift $ isSynced >>= \x -> when x (mempool p)
when (nodeHeight n `mod` 1000 == 0) (lift pruneCache)
Left e -> do
$(logErrorS) "Block" $
"Error importing block " <>
fromString (show $ headerHash (blockHeader b)) <>
": " <>
fromString (show e)
killPeer (PeerMisbehaving (show e)) p
syncMe
where
hex = blockHashToHex (headerHash (blockHeader b))
upr =
asks myPeer >>= \box ->
readTVarIO box >>= \case
Nothing -> throwIO SyncingPeerExpected
Just s@Syncing {syncingHead = h} ->
if nodeHeader h == blockHeader b
then resetPeer
else do
now <- systemSeconds <$> liftIO getSystemTime
atomically . writeTVar box $
Just s {syncingTime = now}
iss =
asks myPeer >>= readTVarIO >>= \case
Just Syncing {syncingPeer = p'} -> return $ p == p'
Nothing -> return False
cbn =
blockConfChain <$> asks myConfig >>=
chainGetBlock (headerHash (blockHeader b)) >>= \case
Nothing -> do
killPeer (PeerMisbehaving "Sent unknown block") p
$(logErrorS)
"Block"
("Block " <> hex <> " rejected as header not found")
mzero
Just n -> return n
processNoBlocks ::
(MonadReader BlockRead m, MonadUnliftIO m, MonadLoggerIO m)
=> Peer
-> [BlockHash]
-> m ()
processNoBlocks p _bs = do
$(logErrorS) "Block" "Killing peer that could not find blocks"
killPeer (PeerMisbehaving "Sent notfound message with block hashes") p
processTx ::
(MonadReader BlockRead m, MonadUnliftIO m, MonadLoggerIO m)
=> Peer
-> Tx
-> m ()
processTx _p tx =
isSynced >>= \x ->
when x $ do
$(logInfoS) "Block" $ "Incoming tx: " <> txHashToHex (txHash tx)
now <- preciseUnixTime <$> liftIO getSystemTime
net <- blockConfNet <$> asks myConfig
db <- blockConfDB <$> asks myConfig
um <- asks myUnspent
bm <- asks myBalances
runExceptT (runImportDB db um bm $ \i -> newMempoolTx net i tx now) >>= \case
Left e ->
$(logErrorS) "Block" $
"Error importing tx: " <> txHashToHex (txHash tx) <> ": " <>
fromString (show e)
Right () -> do
l <- blockConfListener <$> asks myConfig
atomically $ l (StoreMempoolNew (txHash tx))
processTxs ::
(MonadReader BlockRead m, MonadUnliftIO m, MonadLoggerIO m)
=> Peer
-> [TxHash]
-> m ()
processTxs p hs =
isSynced >>= \x ->
when x $ do
db <- blockConfDB <$> asks myConfig
$(logDebugS) "Block" $
"Received " <> fromString (show (length hs)) <>
" tranasaction inventory"
xs <-
fmap catMaybes . forM hs $ \h ->
runMaybeT $ do
t <- getTxData (db, defaultReadOptions) h
guard (isNothing t)
return (getTxHash h)
$(logDebugS) "Block" $
"Requesting " <> fromString (show (length xs)) <>
" new transactions"
net <- blockConfNet <$> asks myConfig
let inv = if getSegWit net then InvWitnessTx else InvTx
MGetData (GetData (map (InvVector inv) xs)) `sendMessage` p
checkTime :: (MonadReader BlockRead m, MonadUnliftIO m, MonadLoggerIO m) => m ()
checkTime =
asks myPeer >>= readTVarIO >>= \case
Nothing -> return ()
Just Syncing {syncingTime = t, syncingPeer = p} -> do
n <- systemSeconds <$> liftIO getSystemTime
when (n > t + 60) $ do
$(logErrorS) "Block" "Peer timeout"
killPeer PeerTimeout p
processDisconnect ::
(MonadReader BlockRead m, MonadUnliftIO m, MonadLoggerIO m)
=> Peer
-> m ()
processDisconnect p =
asks myPeer >>= readTVarIO >>= \case
Nothing -> return ()
Just Syncing {syncingPeer = p'}
| p == p' -> do
$(logErrorS) "Block" "Syncing peer disconnected"
resetPeer
syncMe
| otherwise -> return ()
purgeMempool ::
(MonadReader BlockRead m, MonadUnliftIO m, MonadLoggerIO m) => m ()
purgeMempool = $(logErrorS) "Block" "Mempool purging not implemented"
syncMe :: (MonadReader BlockRead m, MonadUnliftIO m, MonadLoggerIO m) => m ()
syncMe =
void . runMaybeT $ do
rev
p <-
gpr >>= \case
Nothing -> do
$(logWarnS)
"Block"
"Not syncing blocks as no peers available"
mzero
Just p -> return p
b <- mbn
d <- dbn
c <- cbn
when (end b d c) mzero
ns <- bls c b
setPeer p (last ns)
net <- blockConfNet <$> asks myConfig
let inv =
if getSegWit net
then InvWitnessBlock
else InvBlock
vs <- mapM (f inv) ns
$(logInfoS) "Block" $
"Requesting " <> fromString (show (length vs)) <> " blocks"
MGetData (GetData vs) `sendMessage` p
where
gpr =
asks myPeer >>= readTVarIO >>= \case
Just Syncing {syncingPeer = p} -> return (Just p)
Nothing ->
blockConfManager <$> asks myConfig >>= managerGetPeers >>= \case
[] -> return Nothing
OnlinePeer {onlinePeerMailbox = p}:_ -> return (Just p)
cbn = chainGetBest =<< blockConfChain <$> asks myConfig
dbn = do
db <- blockConfDB <$> asks myConfig
bb <-
getBestBlock (db, defaultReadOptions) >>= \case
Nothing -> do
$(logErrorS) "Block" "Best block not in database"
throwIO Uninitialized
Just b -> return b
ch <- blockConfChain <$> asks myConfig
chainGetBlock bb ch >>= \case
Nothing -> do
$(logErrorS) "Block" $
"Block header not found for best block " <>
blockHashToHex bb
throwIO (BlockNotInChain bb)
Just x -> return x
mbn =
asks myPeer >>= readTVarIO >>= \case
Just Syncing {syncingHead = b} -> return b
Nothing -> dbn
end b d c
| nodeHeader b == nodeHeader c = True
| otherwise = nodeHeight b > nodeHeight d + 500
f _ GenesisNode {} = throwIO UnexpectedGenesisNode
f inv BlockNode {nodeHeader = bh} =
return $ InvVector inv (getBlockHash (headerHash bh))
bls c b = do
t <- top c (tts (nodeHeight c) (nodeHeight b))
ch <- blockConfChain <$> asks myConfig
ps <- chainGetParents (nodeHeight b + 1) t ch
return $
if length ps < 500
then ps <> [c]
else ps
tts c b
| c <= b + 501 = c
| otherwise = b + 501
top c t = do
ch <- blockConfChain <$> asks myConfig
if t == nodeHeight c
then return c
else chainGetAncestor t c ch >>= \case
Just x -> return x
Nothing -> do
$(logErrorS) "Block" $
"Unknown header for ancestor of block " <>
blockHashToHex (headerHash (nodeHeader c)) <>
" at height " <>
fromString (show t)
throwIO $
AncestorNotInChain t (headerHash (nodeHeader c))
rev = do
d <- headerHash . nodeHeader <$> dbn
ch <- blockConfChain <$> asks myConfig
chainBlockMain d ch >>= \m ->
unless m $ do
$(logErrorS) "Block" $
"Reverting best block " <> blockHashToHex d <>
" as it is not in main chain..."
resetPeer
db <- blockConfDB <$> asks myConfig
um <- asks myUnspent
bm <- asks myBalances
net <- blockConfNet <$> asks myConfig
runExceptT (runImportDB db um bm $ \i -> revertBlock net i d) >>= \case
Left e -> do
$(logErrorS) "Block" $
"Could not revert best block: " <>
fromString (show e)
throwIO e
Right () -> rev
resetPeer :: (MonadReader BlockRead m, MonadLoggerIO m) => m ()
resetPeer = do
$(logDebugS) "Block" "Resetting syncing peer..."
box <- asks myPeer
atomically $ writeTVar box Nothing
setPeer :: (MonadReader BlockRead m, MonadIO m) => Peer -> BlockNode -> m ()
setPeer p b = do
box <- asks myPeer
now <- systemSeconds <$> liftIO getSystemTime
atomically . writeTVar box $
Just Syncing {syncingPeer = p, syncingHead = b, syncingTime = now}
processBlockMessage ::
(MonadReader BlockRead m, MonadUnliftIO m, MonadLoggerIO m)
=> BlockMessage
-> m ()
processBlockMessage (BlockNewBest bn) = do
$(logDebugS) "Block" $
"New best block header " <> fromString (show (nodeHeight bn)) <> ": " <>
blockHashToHex (headerHash (nodeHeader bn))
syncMe
processBlockMessage (BlockPeerConnect _p sa) = do
$(logDebugS) "Block" $ "New peer connected: " <> fromString (show sa)
syncMe
processBlockMessage (BlockPeerDisconnect p _sa) = processDisconnect p
processBlockMessage (BlockReceived p b) = processBlock p b
processBlockMessage (BlockNotFound p bs) = processNoBlocks p bs
processBlockMessage (BlockTxReceived p tx) = processTx p tx
processBlockMessage (BlockTxAvailable p ts) = processTxs p ts
processBlockMessage BlockPing = checkTime
processBlockMessage PurgeMempool = purgeMempool
pingMe :: MonadIO m => Mailbox BlockMessage -> m ()
pingMe mbox = forever $ do
threadDelay =<< liftIO (randomRIO (5 * 1000 * 1000, 10 * 1000 * 1000))
BlockPing `send` mbox