{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TupleSections #-}
module Network.Haskoin.Store.Block
( blockStore
, getBestBlock
, getBestBlockHash
, getBlocksAtHeights
, getBlockAtHeight
, getBlock
, getBlocks
, getUnspent
, getAddrOutputs
, getAddrsOutputs
, getBalance
, getBalances
, getTx
, getTxs
, getUnspents
, getMempool
) where
import Conduit
import Control.Applicative
import Control.Concurrent.NQE
import Control.Monad.Except
import Control.Monad.Logger
import Control.Monad.Reader
import Control.Monad.State.Strict
import Control.Monad.Trans.Maybe
import qualified Data.ByteString as B
import Data.Foldable
import Data.Function
import Data.List
import Data.Map (Map)
import qualified Data.Map.Strict as M
import Data.Maybe
import Data.Serialize (encode)
import Data.Set (Set)
import qualified Data.Set as S
import Data.String
import Data.String.Conversions
import Data.Text (Text)
import Data.Time.Clock.POSIX
import Database.RocksDB (BatchOp, DB, Snapshot)
import qualified Database.RocksDB as R
import Database.RocksDB.Query as R
import Haskoin
import Network.Haskoin.Node
import Network.Haskoin.Store.Types
import UnliftIO
data BlockRead = BlockRead
{ myBlockDB :: !DB
, mySelf :: !BlockStore
, myChain :: !Chain
, myManager :: !Manager
, myListener :: !(Listen StoreEvent)
, myBaseHeight :: !(TVar BlockHeight)
, myPeer :: !(TVar (Maybe Peer))
, myNetwork :: !Network
}
type MonadBlock m
= (MonadLoggerIO m, MonadReader BlockRead m)
type OutputMap = Map OutPoint Output
type AddressMap = Map Address Balance
type TxMap = Map TxHash ImportTx
data TxStatus
= TxValid
| TxOrphan
| TxLowFunds
| TxInputSpent
deriving (Eq, Show, Ord)
data ImportTx = ImportTx
{ importTx :: !Tx
, importTxBlock :: !(Maybe BlockRef)
}
data ImportState = ImportState
{ outputMap :: !OutputMap
, addressMap :: !AddressMap
, deleteTxs :: !(Set TxHash)
, newTxs :: !TxMap
, blockAction :: !(Maybe BlockAction)
}
type MonadImport m = MonadState ImportState m
data BlockAction = RevertBlock | ImportBlock !Block
runMonadImport :: MonadBlock m => StateT ImportState m a -> m a
runMonadImport f =
evalStateT
(f >>= \a -> update_database >> return a)
ImportState
{ outputMap = M.empty
, addressMap = M.empty
, deleteTxs = S.empty
, newTxs = M.empty
, blockAction = Nothing
}
where
update_database = do
ops <-
concat <$>
sequence
[ getBlockOps
, getBalanceOps
, getDeleteTxOps
, getInsertTxOps
, purgeOrphanOps
]
db <- asks myBlockDB
writeBatch db ops
l <- asks myListener
gets blockAction >>= \case
Just (ImportBlock Block {..}) ->
atomically (l (BestBlock (headerHash blockHeader)))
Just RevertBlock -> $(logWarnS) "Block" "Reverted best block"
_ -> return ()
gets newTxs >>= \ths ->
forM_ (M.keys ths) $ \tx -> atomically (l (MempoolNew tx))
blockStore :: (MonadUnliftIO m, MonadLoggerIO m) => BlockConfig -> m ()
blockStore BlockConfig {..} = do
base_height_box <- newTVarIO 0
peer_box <- newTVarIO Nothing
runReaderT
(init_db >> syncBlocks >> run)
BlockRead
{ mySelf = blockConfMailbox
, myBlockDB = blockConfDB
, myChain = blockConfChain
, myManager = blockConfManager
, myListener = blockConfListener
, myBaseHeight = base_height_box
, myPeer = peer_box
, myNetwork = blockConfNet
}
where
run =
forever $ do
msg <- receive blockConfMailbox
processBlockMessage msg
init_db =
runResourceT $ do
runConduit $
matching blockConfDB Nothing OrphanKey .|
mapM_C (\(k, Tx {}) -> remove blockConfDB k)
retrieve blockConfDB Nothing BestBlockKey >>= \case
Nothing -> addNewBlock (genesisBlock blockConfNet)
Just (_ :: BlockHash) ->
getBestBlock blockConfDB Nothing >>= \BlockValue {..} -> do
base_height_box <- asks myBaseHeight
atomically $ writeTVar base_height_box blockValueHeight
getBestBlockHash :: MonadIO m => DB -> Maybe Snapshot -> m BlockHash
getBestBlockHash db snapshot =
retrieve db snapshot BestBlockKey >>= \case
Nothing -> throwString "Best block hash not available"
Just bh -> return bh
getBestBlock :: MonadIO m => DB -> Maybe Snapshot -> m BlockValue
getBestBlock db s =
case s of
Nothing -> R.withSnapshot db $ f . Just
Just _ -> f s
where
f s' =
getBestBlockHash db s' >>= \bh ->
getBlock bh db s' >>= \case
Nothing ->
throwString $
"Best block not available at hash: " <>
cs (blockHashToHex bh)
Just b -> return b
getBlocksAtHeights ::
MonadIO m => [BlockHeight] -> DB -> Maybe Snapshot -> m [BlockValue]
getBlocksAtHeights bhs db s =
case s of
Nothing -> R.withSnapshot db $ f . Just
Just _ -> f s
where
f s' =
fmap catMaybes . forM (nub bhs) $ \bh ->
getBlockAtHeight bh db s'
getBlockAtHeight ::
MonadIO m => BlockHeight -> DB -> Maybe Snapshot -> m (Maybe BlockValue)
getBlockAtHeight height db s =
case s of
Nothing -> R.withSnapshot db $ f . Just
Just _ -> f s
where
f s' = retrieve db s' (HeightKey height) >>= \case
Nothing -> return Nothing
Just h -> retrieve db s' (BlockKey h)
getBlocks :: MonadIO m => [BlockHash] -> DB -> Maybe Snapshot -> m [BlockValue]
getBlocks bids db s =
case s of
Nothing -> R.withSnapshot db $ f . Just
Just _ -> f s
where
f s' =
fmap catMaybes . forM (nub bids) $ \bid -> getBlock bid db s'
getBlock ::
MonadIO m => BlockHash -> DB -> Maybe Snapshot -> m (Maybe BlockValue)
getBlock bh db snapshot = retrieve db snapshot (BlockKey bh)
getAddrSpent ::
(MonadResource m, MonadUnliftIO m)
=> Address
-> Maybe BlockHeight
-> DB
-> Maybe Snapshot
-> ConduitT () (AddrOutputKey, Output) m ()
getAddrSpent addr h db snapshot =
matchingSkip
db
snapshot
(MultiAddrOutputKey True addr)
(MultiAddrHeightKey True addr h)
getAddrUnspent ::
(MonadUnliftIO m, MonadResource m)
=> Address
-> Maybe BlockHeight
-> DB
-> Maybe Snapshot
-> ConduitT () (AddrOutputKey, Output) m ()
getAddrUnspent addr h db snapshot =
matchingSkip
db
snapshot
(MultiAddrOutputKey False addr)
(MultiAddrHeightKey False addr h)
getBalances ::
MonadIO m => [Address] -> DB -> Maybe Snapshot -> m [AddressBalance]
getBalances addrs db s =
case s of
Nothing -> R.withSnapshot db $ f . Just
Just _ -> f s
where
f s' = forM (nub addrs) $ \a -> getBalance a db s'
getBalance ::
MonadIO m => Address -> DB -> Maybe Snapshot -> m AddressBalance
getBalance addr db s =
retrieve db s (BalanceKey addr) >>= \case
Just Balance {..} ->
return
AddressBalance
{ addressBalAddress = addr
, addressBalConfirmed = balanceValue
, addressBalUnconfirmed = balanceUnconfirmed
, addressOutputCount = balanceOutputCount
, addressSpentCount = balanceSpentCount
}
Nothing ->
return
AddressBalance
{ addressBalAddress = addr
, addressBalConfirmed = 0
, addressBalUnconfirmed = 0
, addressOutputCount = 0
, addressSpentCount = 0
}
getMempool :: MonadUnliftIO m => DB -> Maybe Snapshot -> m [TxHash]
getMempool db snapshot = get_hashes <$> matchingAsList db snapshot MempoolKey
where
get_hashes mempool_txs = [tx_hash | (MempoolTx tx_hash, ()) <- mempool_txs]
getTxs :: MonadUnliftIO m => Network -> [TxHash] -> DB -> Maybe Snapshot -> m [DetailedTx]
getTxs net ths db s =
case s of
Nothing -> R.withSnapshot db $ f . Just
Just _ -> f s
where
f s' = fmap catMaybes . forM (nub ths) $ \th -> getTx net th db s'
getTx ::
MonadUnliftIO m => Network -> TxHash -> DB -> Maybe Snapshot -> m (Maybe DetailedTx)
getTx net th db s = do
xs <- matchingAsList db s (BaseTxKey th)
case find_tx xs of
Just TxRecord {..} ->
let os = map (uncurry output) (filter_outputs xs)
is = map (input txValuePrevOuts) (txIn txValue)
in return $
Just
DetailedTx
{ detailedTxData = txValue
, detailedTxFee = fee is os
, detailedTxBlock = txValueBlock
, detailedTxInputs = is
, detailedTxOutputs = os
}
Nothing -> return Nothing
where
fee is os =
if any isCoinbase is
then 0
else sum (map detInValue is) - sum (map detOutValue os)
input prevs TxIn {..} =
if outPointHash prevOutput == zero
then DetailedCoinbase
{ detInOutPoint = prevOutput
, detInSequence = txInSequence
, detInSigScript = scriptInput
, detInNetwork = net
}
else let PrevOut {..} =
fromMaybe
(error
("Could not locate outpoint: " <>
showOutPoint prevOutput))
(lookup prevOutput prevs)
in DetailedInput
{ detInOutPoint = prevOutput
, detInSequence = txInSequence
, detInSigScript = scriptInput
, detInPkScript = prevOutScript
, detInValue = prevOutValue
, detInBlock = prevOutBlock
, detInNetwork = net
}
output OutPoint {..} Output {..} =
DetailedOutput
{ detOutValue = outputValue
, detOutScript = outScript
, detOutSpender = outSpender
, detOutNetwork = net
}
find_tx xs =
listToMaybe
[ t
| (k, v) <- xs
, case k of
MultiTxKey {} -> True
_ -> False
, let MultiTx t = v
]
filter_outputs xs =
[ (p, o)
| (k, v) <- xs
, case (k, v) of
(MultiTxKeyOutput {}, MultiTxOutput {}) -> True
_ -> False
, let MultiTxKeyOutput (OutputKey p) = k
, let MultiTxOutput o = v
]
getOutput :: (MonadBlock m, MonadImport m) => OutPoint -> m (Maybe Output)
getOutput out_point = runMaybeT $ MaybeT map_lookup <|> MaybeT db_lookup
where
map_lookup = M.lookup out_point <$> gets outputMap
db_key = OutputKey out_point
db_lookup = asks myBlockDB >>= \db -> retrieve db Nothing db_key
getAddress :: (MonadBlock m, MonadImport m) => Address -> m Balance
getAddress address =
fromMaybe emptyBalance <$>
runMaybeT (MaybeT map_lookup <|> MaybeT db_lookup)
where
map_lookup = M.lookup address <$> gets addressMap
db_key = BalanceKey address
db_lookup = asks myBlockDB >>= \db -> retrieve db Nothing db_key
getDeleteTxs :: MonadImport m => m (Set TxHash)
getDeleteTxs = gets deleteTxs
shouldDelete :: MonadImport m => TxHash -> m Bool
shouldDelete tx_hash = S.member tx_hash <$> getDeleteTxs
addBlock :: MonadImport m => Block -> m ()
addBlock block = modify $ \s -> s {blockAction = Just (ImportBlock block)}
revertBlock :: MonadImport m => m ()
revertBlock = modify $ \s -> s {blockAction = Just RevertBlock}
deleteTx :: MonadImport m => TxHash -> m ()
deleteTx tx_hash =
modify $ \s -> s {deleteTxs = S.insert tx_hash (deleteTxs s)}
insertTx :: MonadImport m => Tx -> Maybe BlockRef -> m ()
insertTx tx maybe_block_ref =
modify $ \s -> s {newTxs = M.insert (txHash tx) import_tx (newTxs s)}
where
import_tx = ImportTx {importTx = tx, importTxBlock = maybe_block_ref}
updateOutput :: MonadImport m => OutPoint -> Output -> m ()
updateOutput out_point output =
modify $ \s -> s {outputMap = M.insert out_point output (outputMap s)}
updateAddress :: MonadImport m => Address -> Balance -> m ()
updateAddress address balance =
modify $ \s -> s {addressMap = M.insert address balance (addressMap s)}
spendOutput :: (MonadBlock m, MonadImport m) => OutPoint -> Spender -> m ()
spendOutput out_point spender@Spender {..} =
void . runMaybeT $ do
net <- asks myNetwork
guard (out_point /= nullOutPoint)
output@Output {..} <-
getOutput out_point >>= \case
Nothing ->
throwString $
"Could not get output to spend at outpoint: " <>
showOutPoint out_point
Just output -> return output
when (isJust outSpender) . throwString $
"Output to spend already spent at outpoint: " <> showOutPoint out_point
updateOutput out_point output {outSpender = Just spender}
address <- MaybeT (return (scriptToAddressBS net outScript))
balance@Balance {..} <- getAddress address
updateAddress address $
if isJust spenderBlock
then balance
{ balanceValue = balanceValue - outputValue
, balanceSpentCount = balanceSpentCount + 1
}
else balance
{ balanceUnconfirmed =
balanceUnconfirmed - fromIntegral outputValue
, balanceSpentCount = balanceSpentCount + 1
}
unspendOutput :: (MonadBlock m, MonadImport m) => OutPoint -> m ()
unspendOutput out_point =
void . runMaybeT $ do
net <- asks myNetwork
guard (out_point /= nullOutPoint)
output@Output {..} <-
getOutput out_point >>= \case
Nothing ->
throwString $
"Could not get output to unspend at outpoint: " <>
showOutPoint out_point
Just output -> return output
Spender {..} <- MaybeT (return outSpender)
updateOutput out_point output {outSpender = Nothing}
address <- MaybeT (return (scriptToAddressBS net outScript))
balance@Balance {..} <- getAddress address
updateAddress address $
if isJust spenderBlock
then balance
{ balanceValue = balanceValue + outputValue
, balanceSpentCount = balanceSpentCount - 1
}
else balance
{ balanceUnconfirmed =
balanceUnconfirmed + fromIntegral outputValue
, balanceSpentCount = balanceSpentCount - 1
}
removeOutput :: (MonadBlock m, MonadImport m) => OutPoint -> m ()
removeOutput out_point@OutPoint {..} = do
net <- asks myNetwork
Output {..} <-
getOutput out_point >>= \case
Nothing ->
throwString $
"Could not get output to remove at outpoint: " <> show out_point
Just o -> return o
when (isJust outSpender) . throwString $
"Cannot delete because spent outpoint: " <> show out_point
case scriptToAddressBS net outScript of
Nothing -> return ()
Just address -> do
balance@Balance {..} <- getAddress address
updateAddress address $
if isJust outBlock
then balance
{ balanceValue = balanceValue - outputValue
, balanceOutputCount = balanceOutputCount - 1
}
else balance
{ balanceUnconfirmed =
balanceUnconfirmed - fromIntegral outputValue
, balanceOutputCount = balanceOutputCount - 1
}
addOutput :: (MonadBlock m, MonadImport m) => OutPoint -> Output -> m ()
addOutput out_point@OutPoint {..} output@Output {..} = do
net <- asks myNetwork
updateOutput out_point output
case scriptToAddressBS net outScript of
Nothing -> return ()
Just address -> do
balance@Balance {..} <- getAddress address
updateAddress address $
if isJust outBlock
then balance
{ balanceValue = balanceValue + outputValue
, balanceOutputCount = balanceOutputCount + 1
}
else balance
{ balanceUnconfirmed =
balanceUnconfirmed + fromIntegral outputValue
, balanceOutputCount = balanceOutputCount + 1
}
getTxRecord :: MonadBlock m => TxHash -> m (Maybe TxRecord)
getTxRecord tx_hash =
asks myBlockDB >>= \db -> retrieve db Nothing (TxKey tx_hash)
deleteTransaction ::
(MonadBlock m, MonadImport m)
=> TxHash
-> m ()
deleteTransaction tx_hash = shouldDelete tx_hash >>= \d -> unless d delete_it
where
delete_it = do
TxRecord {..} <-
getTxRecord tx_hash >>= \case
Nothing ->
throwString $
"Could not get tx to delete at hash: " <>
cs (txHashToHex tx_hash)
Just r -> return r
let n_out = length (txOut txValue)
prevs = map prevOutput (txIn txValue)
remove_spenders n_out
remove_outputs n_out
unspend_inputs prevs
deleteTx tx_hash
remove_spenders n_out =
forM_ (take n_out [0 ..]) $ \i ->
let out_point = OutPoint tx_hash i
in getOutput out_point >>= \case
Nothing ->
throwString $
"Could not get spent outpoint: " <> show out_point
Just Output {outSpender = Just Spender {..}} ->
deleteTransaction spenderHash
Just _ -> return ()
remove_outputs n_out =
mapM_ (removeOutput . OutPoint tx_hash) (take n_out [0 ..])
unspend_inputs = mapM_ unspendOutput
addNewBlock :: MonadBlock m => Block -> m ()
addNewBlock block@Block {..} =
runMonadImport $ do
new_height <- get_new_height
$(logInfoS) "Block" $
"Importing block height: " <> cs (show new_height)
import_txs new_height
addBlock block
where
import_txs new_height =
mapM_
(uncurry (import_tx (BlockRef new_hash new_height)))
(zip [0 ..] blockTxns)
import_tx block_ref i tx = importTransaction tx (Just (block_ref i))
new_hash = headerHash blockHeader
prev_block = prevBlock blockHeader
get_new_height = do
net <- asks myNetwork
if blockHeader == getGenesisHeader net
then return 0
else do
best <- asks myBlockDB >>= \db -> getBestBlock db Nothing
when (prev_block /= headerHash (blockValueHeader best)) .
throwString $
"Block does not build on best at hash: " <> show new_hash
return $ blockValueHeight best + 1
getBlockOps :: (MonadBlock m, MonadImport m) => m [BatchOp]
getBlockOps =
gets blockAction >>= \case
Nothing -> return []
Just RevertBlock -> get_block_remove_ops
Just (ImportBlock block) -> get_block_insert_ops block
where
get_block_insert_ops block@Block {..} = do
let block_hash = headerHash blockHeader
ch <- asks myChain
bn <-
chainGetBlock block_hash ch >>= \case
Just bn -> return bn
Nothing ->
throwString $
"Could not get block header for hash: " <>
cs (blockHashToHex block_hash)
let block_value =
BlockValue
{ blockValueHeight = nodeHeight bn
, blockValueWork = nodeWork bn
, blockValueHeader = nodeHeader bn
, blockValueSize = fromIntegral (B.length (encode block))
, blockValueTxs = map txHash blockTxns
}
return
[ insertOp (BlockKey block_hash) block_value
, insertOp (HeightKey (nodeHeight bn)) block_hash
, insertOp BestBlockKey block_hash
]
get_block_remove_ops = do
db <- asks myBlockDB
BlockValue {..} <- getBestBlock db Nothing
let block_hash = headerHash blockValueHeader
block_key = BlockKey block_hash
height_key = HeightKey blockValueHeight
prev_block = prevBlock blockValueHeader
return
[ deleteOp block_key
, deleteOp height_key
, insertOp BestBlockKey prev_block
]
outputOps :: (MonadBlock m, MonadImport m) => OutPoint -> m [BatchOp]
outputOps out_point@OutPoint {..}
| out_point == nullOutPoint = return []
| otherwise = do
net <- asks myNetwork
output@Output {..} <-
getOutput out_point >>= \case
Nothing ->
throwString $
"Could not get output to unspend at outpoint: " <>
show out_point
Just o -> return o
let output_op = insertOp (OutputKey out_point) output
addr_ops = addressOutOps net out_point output False
return $ output_op : addr_ops
addressOutOps :: Network -> OutPoint -> Output -> Bool -> [BatchOp]
addressOutOps net out_point output@Output {..} del =
case scriptToAddressBS net outScript of
Nothing -> []
Just address ->
let key =
AddrOutputKey
{ addrOutputSpent = isJust outSpender
, addrOutputAddress = address
, addrOutputHeight = blockRefHeight <$> outBlock
, addrOutputPos = blockRefPos <$> outBlock
, addrOutPoint = out_point
}
key_mempool = key {addrOutputHeight = Nothing}
key_delete = key {addrOutputSpent = isNothing outSpender}
key_delete_mempool = key_delete {addrOutputHeight = Nothing}
op =
if del
then deleteOp key
else insertOp key output
in if isJust outBlock
then [ op
, deleteOp key_delete
, deleteOp key_mempool
, deleteOp key_delete_mempool
]
else [op, deleteOp key_delete]
deleteOutOps :: (MonadBlock m, MonadImport m) => OutPoint -> m [BatchOp]
deleteOutOps out_point@OutPoint {..} = do
net <- asks myNetwork
output@Output {..} <-
getOutput out_point >>= \case
Nothing ->
throwString $
"Could not get output to delete at outpoint: " <> show out_point
Just o -> return o
let output_op = deleteOp (OutputKey out_point)
addr_ops = addressOutOps net out_point output True
return $ output_op : addr_ops
deleteTxOps :: TxHash -> [BatchOp]
deleteTxOps tx_hash =
[ deleteOp (TxKey tx_hash)
, deleteOp (MempoolTx tx_hash)
, deleteOp (OrphanTxKey tx_hash)
]
purgeOrphanOps :: (MonadBlock m, MonadImport m) => m [BatchOp]
purgeOrphanOps =
fmap (fromMaybe []) . runMaybeT $ do
db <- asks myBlockDB
guard . isJust =<< gets blockAction
liftIO . runResourceT . runConduit $
matching db Nothing OrphanKey .| mapC (\(k, Tx {}) -> deleteOp k) .|
sinkList
getSimpleTx :: MonadBlock m => TxHash -> m Tx
getSimpleTx tx_hash =
getTxRecord tx_hash >>= \case
Nothing -> throwString $ "Cannot find tx hash: " <> show tx_hash
Just TxRecord {..} -> return txValue
getTxOutPoints :: Tx -> [OutPoint]
getTxOutPoints tx@Tx {..} =
let tx_hash = txHash tx
in [OutPoint tx_hash i | i <- take (length txOut) [0 ..]]
getPrevOutPoints :: Tx -> [OutPoint]
getPrevOutPoints Tx {..} = map prevOutput txIn
getDeleteTxOps :: (MonadBlock m, MonadImport m) => m [BatchOp]
getDeleteTxOps = do
del_txs <- S.toList <$> getDeleteTxs
txs <- mapM getSimpleTx del_txs
let prev_outs = concatMap getPrevOutPoints txs
tx_outs = concatMap getTxOutPoints txs
tx_ops = concatMap deleteTxOps del_txs
prev_out_ops <- concat <$> mapM outputOps prev_outs
tx_out_ops <- concat <$> mapM deleteOutOps tx_outs
return $ prev_out_ops <> tx_out_ops <> tx_ops
insertTxOps :: (MonadBlock m, MonadImport m) => ImportTx -> m [BatchOp]
insertTxOps ImportTx {..} = do
prev_outputs <- get_prev_outputs
let key = TxKey (txHash importTx)
mempool_key = MempoolTx (txHash importTx)
orphan_key = OrphanTxKey (txHash importTx)
value =
TxRecord
{ txValueBlock = importTxBlock
, txValue = importTx
, txValuePrevOuts = prev_outputs
}
case importTxBlock of
Nothing ->
return
[ insertOp key value
, insertOp mempool_key ()
, deleteOp orphan_key
]
Just _ ->
return
[insertOp key value, deleteOp mempool_key, deleteOp orphan_key]
where
get_prev_outputs =
let real_inputs =
filter ((/= nullOutPoint) . prevOutput) (txIn importTx)
in forM real_inputs $ \TxIn {..} -> do
Output {..} <-
getOutput prevOutput >>= \case
Nothing ->
throwString $
"While importing tx hash: " <>
cs (txHashToHex (txHash importTx)) <>
"could not get outpoint: " <>
showOutPoint prevOutput
Just out -> return out
return
( prevOutput
, PrevOut
{ prevOutValue = outputValue
, prevOutBlock = outBlock
, prevOutScript = outScript
})
getInsertTxOps :: (MonadBlock m, MonadImport m) => m [BatchOp]
getInsertTxOps = do
new_txs <- M.elems <$> gets newTxs
let txs = map importTx new_txs
let prev_outs = concatMap getPrevOutPoints txs
tx_outs = concatMap getTxOutPoints txs
prev_out_ops <- concat <$> mapM outputOps prev_outs
tx_out_ops <- concat <$> mapM outputOps tx_outs
tx_ops <- concat <$> mapM insertTxOps new_txs
return $ prev_out_ops <> tx_out_ops <> tx_ops
getBalanceOps :: MonadImport m => m [BatchOp]
getBalanceOps = do
address_map <- gets addressMap
return $ map (uncurry (insertOp . BalanceKey)) (M.toList address_map)
revertBestBlock :: MonadBlock m => m ()
revertBestBlock = do
net <- asks myNetwork
db <- asks myBlockDB
BlockValue {..} <- getBestBlock db Nothing
when (blockValueHeader == getGenesisHeader net) . throwString $
"Attempted to revert genesis block"
import_txs <- mapM getSimpleTx (tail blockValueTxs)
runMonadImport $ do
mapM_ deleteTransaction blockValueTxs
revertBlock
reset_peer (blockValueHeight - 1)
runMonadImport $ mapM_ (`importTransaction` Nothing) import_txs
where
reset_peer height = do
base_height_box <- asks myBaseHeight
peer_box <- asks myPeer
atomically $ do
writeTVar base_height_box height
writeTVar peer_box Nothing
validateTx :: Monad m => OutputMap -> Tx -> ExceptT TxException m ()
validateTx outputs tx = do
prev_outs <-
forM (txIn tx) $ \TxIn {..} ->
case M.lookup prevOutput outputs of
Nothing -> throwError OrphanTx
Just o -> return o
when (any (isJust . outSpender) prev_outs) (throwError DoubleSpend)
let sum_inputs = sum (map outputValue prev_outs)
sum_outputs = sum (map outValue (txOut tx))
when (sum_outputs > sum_inputs) (throwError OverSpend)
importTransaction ::
(MonadBlock m, MonadImport m) => Tx -> Maybe BlockRef -> m Bool
importTransaction tx maybe_block_ref =
runExceptT validate_tx >>= \case
Left e -> do
ret <-
case e of
AlreadyImported ->
return True
OrphanTx -> do
import_orphan
return False
_ -> do
$(logErrorS) "Block" $
"Could not import tx hash: " <>
cs (txHashToHex (txHash tx)) <>
" reason: " <>
cs (show e)
return False
asks myListener >>= \l -> atomically (l (TxException (txHash tx) e))
return ret
Right () -> do
delete_spenders
spend_inputs
insert_outputs
insertTx tx maybe_block_ref
return True
where
import_orphan = do
$(logInfoS) "BlockStore " $
"Got orphan tx hash: " <> cs (txHashToHex (txHash tx))
db <- asks myBlockDB
R.insert db (OrphanTxKey (txHash tx)) tx
validate_tx
| isJust maybe_block_ref = return ()
| otherwise = do
getTxRecord (txHash tx) >>= \maybe_tx ->
when (isJust maybe_tx) (throwError AlreadyImported)
prev_outs <-
fmap (M.fromList . catMaybes) . forM (txIn tx) $ \TxIn {..} ->
getOutput prevOutput >>= \case
Nothing -> return Nothing
Just o -> return $ Just (prevOutput, o)
validateTx prev_outs tx
delete_spenders =
forM_ (txIn tx) $ \TxIn {..} ->
getOutput prevOutput >>= \case
Nothing ->
unless (prevOutput == nullOutPoint) . throwString $
"Could not get output spent by tx hash: " <>
show (txHash tx)
Just Output {outSpender = Just Spender {..}} ->
deleteTransaction spenderHash
_ -> return ()
spend_inputs =
forM_ (zip [0 ..] (txIn tx)) $ \(i, TxIn {..}) ->
spendOutput
prevOutput
Spender
{ spenderHash = txHash tx
, spenderIndex = i
, spenderBlock = maybe_block_ref
}
insert_outputs =
forM_ (zip [0 ..] (txOut tx)) $ \(i, TxOut {..}) ->
addOutput
OutPoint {outPointHash = txHash tx, outPointIndex = i}
Output
{ outputValue = outValue
, outBlock = maybe_block_ref
, outScript = scriptOutput
, outSpender = Nothing
}
syncBlocks :: MonadBlock m => m ()
syncBlocks =
void . runMaybeT $ do
net <- asks myNetwork
chain_best <- asks myChain >>= chainGetBest
revert_if_needed chain_best
let chain_height = nodeHeight chain_best
base_height_box <- asks myBaseHeight
db <- asks myBlockDB
best_block <- getBestBlock db Nothing
let best_height = blockValueHeight best_block
when (best_height == chain_height) $ do
reset_peer best_height
empty
base_height <- readTVarIO base_height_box
p <- get_peer
when (base_height > best_height + 500) empty
when (base_height >= chain_height) empty
ch <- asks myChain
let sync_lowest = min chain_height (base_height + 1)
sync_highest = min chain_height (base_height + 501)
sync_top <-
if sync_highest == chain_height
then return chain_best
else chainGetAncestor sync_highest chain_best ch >>= \case
Nothing ->
throwString
"Could not get syncing header from chain"
Just b -> return b
sync_blocks <-
(++ [sync_top]) <$>
if sync_lowest == chain_height
then return []
else chainGetParents sync_lowest sync_top ch
update_peer sync_highest (Just p)
peerGetBlocks net p (map (headerHash . nodeHeader) sync_blocks)
where
get_peer =
asks myPeer >>= readTVarIO >>= \case
Just p -> return p
Nothing ->
asks myManager >>= managerGetPeers >>= \case
[] -> empty
p:_ -> return (onlinePeerMailbox p)
reset_peer best_height = update_peer best_height Nothing
update_peer height mp = do
base_height_box <- asks myBaseHeight
peer_box <- asks myPeer
atomically $ do
writeTVar base_height_box height
writeTVar peer_box mp
revert_if_needed chain_best = do
db <- asks myBlockDB
ch <- asks myChain
best <- getBestBlock db Nothing
let best_hash = headerHash (blockValueHeader best)
chain_hash = headerHash (nodeHeader chain_best)
when (best_hash /= chain_hash) $
chainGetBlock best_hash ch >>= \case
Nothing -> do
revertBestBlock
revert_if_needed chain_best
Just best_node -> do
split_hash <-
headerHash . nodeHeader <$>
chainGetSplitBlock chain_best best_node ch
revert_until split_hash
revert_until split = do
best_hash <-
asks myBlockDB >>= \db ->
headerHash . blockValueHeader <$> getBestBlock db Nothing
when (best_hash /= split) $ do
revertBestBlock
revert_until split
importBlock :: (MonadError String m, MonadBlock m) => Block -> m ()
importBlock block@Block {..} = do
bn <- asks myChain >>= chainGetBlock (headerHash blockHeader)
when (isNothing bn) $
throwString $
"Not in chain: block hash" <>
cs (blockHashToHex (headerHash blockHeader))
best <- asks myBlockDB >>= \db -> getBestBlock db Nothing
let best_hash = headerHash (blockValueHeader best)
prev_hash = prevBlock blockHeader
when (prev_hash /= best_hash) (throwError "does not build on best")
addNewBlock block
processBlockMessage :: (MonadUnliftIO m, MonadBlock m) => BlockMessage -> m ()
processBlockMessage (BlockChainNew _) = syncBlocks
processBlockMessage (BlockPeerConnect p) = syncBlocks >> syncMempool p
processBlockMessage (BlockReceived p b) =
runExceptT (importBlock b) >>= \case
Left e -> do
pstr <- peerString p
let hash = headerHash (blockHeader b)
$(logErrorS) "Block" $
"Could not import from peer" <> pstr <> " block hash:" <>
cs (blockHashToHex hash) <>
" error: " <>
fromString e
Right () -> importOrphans >> syncBlocks >> syncMempool p
processBlockMessage (TxReceived _ tx) =
isAtHeight >>= \x ->
when x $ do
_ <- runMonadImport $ importTransaction tx Nothing
importOrphans
processBlockMessage (TxPublished tx) =
void . runMonadImport $ importTransaction tx Nothing
processBlockMessage (BlockPeerDisconnect p) = do
peer_box <- asks myPeer
base_height_box <- asks myBaseHeight
db <- asks myBlockDB
best <- getBestBlock db Nothing
is_my_peer <-
atomically $
readTVar peer_box >>= \x ->
if x == Just p
then do
writeTVar peer_box Nothing
writeTVar base_height_box (blockValueHeight best)
return True
else return False
when is_my_peer syncBlocks
processBlockMessage (BlockNotReceived p h) = do
pstr <- peerString p
$(logErrorS) "Block" $
"Peer " <> pstr <> " unable to serve block hash: " <> cs (show h)
mgr <- asks myManager
managerKill (PeerMisbehaving "Block not found") p mgr
processBlockMessage (TxAvailable p ts) =
isAtHeight >>= \h ->
when h $ do
pstr <- peerString p
$(logDebugS) "Block" $
"Received " <> cs (show (length ts)) <>
" tx inventory from peer " <>
pstr
net <- asks myNetwork
db <- asks myBlockDB
has <-
fmap catMaybes . forM ts $ \t ->
let mem =
retrieve db Nothing (MempoolTx t) >>= \case
Nothing -> return Nothing
Just () -> return (Just t)
orp =
retrieve db Nothing (OrphanTxKey t) >>= \case
Nothing -> return Nothing
Just Tx {} -> return (Just t)
in runMaybeT $ MaybeT mem <|> MaybeT orp
let new = ts \\ has
unless (null new) $ do
$(logDebugS) "Block" $
"Requesting " <> cs (show (length new)) <>
" new txs from peer " <>
pstr
peerGetTxs net p new
processBlockMessage (PongReceived p n) = do
pstr <- peerString p
$(logDebugS) "Block" $
"Pong received with nonce " <> cs (show n) <> " from peer " <> pstr
asks myListener >>= atomically . ($ PeerPong p n)
importOrphans :: (MonadUnliftIO m, MonadBlock m) => m ()
importOrphans = do
db <- asks myBlockDB
ret <-
runResourceT . runConduit $
matching db Nothing OrphanKey .| mapMC (import_tx . snd) .| anyC id
when ret importOrphans
where
import_tx tx' = runMonadImport $ importTransaction tx' Nothing
getAddrOutputs ::
(MonadResource m, MonadUnliftIO m)
=> Address
-> Maybe BlockHeight
-> DB
-> Maybe Snapshot
-> ConduitT () AddrOutput m ()
getAddrOutputs a h db s =
case s of
Nothing -> R.withSnapshotBracket db $ f . Just
Just _ -> f s
where
f s' = mergeSourcesBy (flip compare) [p s', u s']
u s' = getAddrUnspent a h db s' .| mapC (uncurry AddrOutput)
p s' = getAddrSpent a h db s' .| mapC (uncurry AddrOutput)
getAddrsOutputs ::
(MonadResource m, MonadUnliftIO m)
=> [Address]
-> Maybe BlockHeight
-> DB
-> Maybe Snapshot
-> ConduitT () AddrOutput m ()
getAddrsOutputs as h db s =
if isJust s
then f s
else R.withSnapshotBracket db $ \s' -> f (Just s')
where
f s' = forM_ as $ \a -> getAddrOutputs a h db s'
getUnspents ::
(MonadResource m, MonadUnliftIO m)
=> [Address]
-> Maybe BlockHeight
-> DB
-> Maybe Snapshot
-> ConduitT () AddrOutput m ()
getUnspents as h db s =
case s of
Nothing -> R.withSnapshotBracket db $ f . Just
Just _ -> f s
where
f s' = forM_ as $ \a -> getUnspent a h db s'
getUnspent ::
(MonadResource m, MonadUnliftIO m)
=> Address
-> Maybe BlockHeight
-> DB
-> Maybe Snapshot
-> ConduitT () AddrOutput m ()
getUnspent addr h db s =
getAddrUnspent addr h db s .| mapC (uncurry AddrOutput)
syncMempool :: MonadBlock m => Peer -> m ()
syncMempool p =
void . runMaybeT $ do
guard =<< isAtHeight
$(logInfoS) "Block" "Syncing mempool..."
MMempool `sendMessage` p
isAtHeight :: MonadBlock m => m Bool
isAtHeight = do
db <- asks myBlockDB
bb <- getBestBlockHash db Nothing
ch <- asks myChain
cb <- chainGetBest ch
time <- liftIO getPOSIXTime
let recent = floor time - blockTimestamp (nodeHeader cb) < 60 * 60 * 4
return (recent && headerHash (nodeHeader cb) == bb)
zero :: TxHash
zero = "0000000000000000000000000000000000000000000000000000000000000000"
showOutPoint :: (IsString a, ConvertibleStrings Text a) => OutPoint -> a
showOutPoint OutPoint {..} =
cs $ txHashToHex outPointHash <> ":" <> cs (show outPointIndex)
peerString :: (MonadBlock m, IsString a) => Peer -> m a
peerString p = do
mgr <- asks myManager
managerGetPeer mgr p >>= \case
Nothing -> return "[unknown]"
Just o -> return $ fromString $ show $ onlinePeerAddress o
mergeSourcesBy ::
(Foldable f, Monad m)
=> (a -> a -> Ordering)
-> f (ConduitT () a m ())
-> ConduitT i a m ()
mergeSourcesBy f = mergeSealed . fmap sealConduitT . toList
where
mergeSealed sources = do
prefetchedSources <- lift $ traverse ($$++ await) sources
go [(a, s) | (s, Just a) <- prefetchedSources]
go [] = pure ()
go sources = do
let (a, src1):sources1 = sortBy (f `on` fst) sources
yield a
(src2, mb) <- lift $ src1 $$++ await
let sources2 =
case mb of
Nothing -> sources1
Just b -> (b, src2) : sources1
go sources2