module Network.Haskoin.Wallet.Server
( runSPVServer
, stopSPVServer
) where
import Control.Concurrent.Async.Lifted (async, link,
waitAnyCancel)
import Control.Concurrent.STM (atomically, retry)
import Control.Concurrent.STM.TBMChan (TBMChan, newTBMChan,
readTBMChan)
import Control.DeepSeq (NFData (..))
import Control.Exception.Lifted (ErrorCall (..),
SomeException (..),
catches)
import qualified Control.Exception.Lifted as E (Handler (..))
import Control.Monad (forM_, forever, unless,
void, when)
import Control.Monad.Base (MonadBase)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.Logger (MonadLoggerIO,
filterLogger, logDebug,
logError, logInfo,
logWarn,
runStdoutLoggingT)
import Control.Monad.Trans (lift, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl,
liftBaseOpDiscard)
import Control.Monad.Trans.Resource (MonadResource,
runResourceT)
import Data.Aeson (Value, decode, encode)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as BL (fromStrict,
toStrict)
import Data.Conduit (await, awaitForever,
($$))
import qualified Data.HashMap.Strict as H (lookup)
import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.Map.Strict as M (Map, assocs, elems,
empty,
fromListWith, null,
unionWith)
import Data.Maybe (fromJust, fromMaybe,
isJust)
import Data.Monoid ((<>))
import Data.String.Conversions (cs)
import Data.Text (pack)
import Data.Word (Word32)
import Database.Esqueleto (from, val, where_,
(&&.), (<=.), (==.),
(^.))
import Database.Persist.Sql (ConnectionPool,
runMigration)
import Network.Haskoin.Block
import Network.Haskoin.Constants
import Network.Haskoin.Node.BlockChain
import Network.Haskoin.Node.HeaderTree
import Network.Haskoin.Node.Peer
import Network.Haskoin.Node.STM
import Network.Haskoin.Transaction
import Network.Haskoin.Wallet.Accounts
import Network.Haskoin.Wallet.Database
import Network.Haskoin.Wallet.Model
import Network.Haskoin.Wallet.Server.Handler
import Network.Haskoin.Wallet.Settings
import Network.Haskoin.Wallet.Transaction
import Network.Haskoin.Wallet.Types
import System.Posix.Daemon (Redirection (ToFile),
killAndWait,
runDetached)
import System.ZMQ4 (Context, KeyFormat (..),
Pub (..), Rep (..),
Socket, bind, receive,
receiveMulti, restrict,
send, sendMulti,
setCurveSecretKey,
setCurveServer,
setLinger, withContext,
withSocket, z85Decode)
data EventSession = EventSession
{ eventBatchSize :: !Int
, eventNewAddrs :: !(M.Map AccountId Word32)
}
deriving (Eq, Show, Read)
instance NFData EventSession where
rnf EventSession{..} =
rnf eventBatchSize `seq`
rnf (M.elems eventNewAddrs)
runSPVServer :: Config -> IO ()
runSPVServer cfg = maybeDetach cfg $ run $ do
pool <- initDatabase cfg
notif <- liftIO $ atomically $ newTBMChan 1000
case configMode cfg of
SPVOffline ->
runWalletApp $ HandlerSession cfg pool Nothing notif
SPVOnline -> do
node <- getNodeState (Right pool)
let session = HandlerSession cfg pool (Just node) notif
as <- mapM async
[ runNodeT (spv pool) node
, runNodeT (runMerkleSync pool notif) node
, runNodeT (txSource $$ processTx pool notif) node
, runNodeT (handleGetData $ (`runDBPool` pool) . getTx) node
, runNodeT (broadcastPendingTxs pool) node
, runWalletApp session
]
mapM_ link as
_ <- waitAnyCancel as
return ()
where
spv pool = do
(bloom, elems, _) <- runDBPool getBloomFilter pool
startSPVNode hosts bloom elems
run = runResourceT . runLogging
runLogging = runStdoutLoggingT . filterLogger logFilter
logFilter _ level = level >= configLogLevel cfg
nodes = fromMaybe
(error $ "BTC nodes for " ++ networkName ++ " not found")
(pack networkName `H.lookup` configBTCNodes cfg)
hosts = map (\x -> PeerHost (btcNodeHost x) (btcNodePort x)) nodes
runMerkleSync pool notif = do
$(logDebug) "Waiting for a valid bloom filter for merkle downloads..."
_ <- atomicallyNodeT waitBloomFilter
fcM <- fmap (fmap adjustFCTime) $ (`runDBPool` pool) $ do
(_, h) <- walletBestBlock
if h == 0 then firstAddrTime else return Nothing
maybe (return ()) (atomicallyNodeT . rescanTs) fcM
merkleSync pool 500 notif
broadcastPendingTxs pool = forever $ do
atomicallyNodeT $ do
synced <- areBlocksSynced
unless synced $ lift retry
broadcastTxs =<< runDBPool (getPendingTxs 0) pool
atomicallyNodeT $ do
synced <- areBlocksSynced
when synced $ lift retry
processTx pool notif = awaitForever $ \tx -> lift $ do
(_, newAddrs) <- runDBPool (importNetTx tx (Just notif)) pool
unless (null newAddrs) $ do
$(logInfo) $ pack $ unwords
[ "Generated", show $ length newAddrs
, "new addresses while importing the tx."
, "Updating the bloom filter"
]
(bloom, elems, _) <- runDBPool getBloomFilter pool
atomicallyNodeT $ sendBloomFilter bloom elems
initDatabase :: (MonadBaseControl IO m, MonadLoggerIO m)
=> Config -> m ConnectionPool
initDatabase cfg = do
let dbCfg = fromMaybe
(error $ "DB config settings for " ++ networkName ++ " not found")
(pack networkName `H.lookup` configDatabase cfg)
pool <- getDatabasePool dbCfg
flip runDBPool pool $ do
_ <- runMigration migrateWallet
_ <- runMigration migrateHeaderTree
initWallet $ configBloomFP cfg
return pool
merkleSync
:: (MonadLoggerIO m, MonadBaseControl IO m, MonadThrow m, MonadResource m)
=> ConnectionPool
-> Word32
-> TBMChan Notif
-> NodeT m ()
merkleSync pool bSize notif = do
(hash, _) <- runDBPool walletBestBlock pool
$(logDebug) "Starting merkle batch download"
bestM <- runSqlNodeT $ getBlockByHash hash
let best = fromMaybe (error "Best wallet block not found") bestM
(action, source) <- merkleDownload best bSize
$(logDebug) "Received a merkle action and source. Processing the source..."
(lastMerkleM, mTxsAcc, aMap) <- source $$ go Nothing [] M.empty
$(logDebug) "Merkle source processed and closed"
unless (M.null aMap) $ do
$(logInfo) $ pack $ unwords
[ "Generated", show $ sum $ M.elems aMap
, "new addresses while importing the merkle block."
, "Sending our bloom filter."
]
(bloom, elems, _) <- runDBPool getBloomFilter pool
atomicallyNodeT $ sendBloomFilter bloom elems
$(logDebug) "Checking if we need to rescan the current batch..."
rescan <- shouldRescan aMap
when rescan $ $(logDebug) "We need to rescan the current batch"
let newBSize | rescan = max 1 $ bSize `div` 2
| otherwise = min 500 $ bSize + max 1 (bSize `div` 20)
when (newBSize /= bSize) $ $(logDebug) $ pack $ unwords
[ "Changing block batch size from", show bSize, "to", show newBSize ]
let missing = (headerHash <$> lastMerkleM) /=
Just (nodeHash $ last $ actionNodes action)
when missing $ $(logWarn) $ pack $ unwords
[ "Merkle block stream closed prematurely"
, show lastMerkleM
]
unless (rescan || missing) $ do
$(logDebug) "Importing merkles into the wallet..."
runDBPool (importMerkles action mTxsAcc (Just notif)) pool
$(logDebug) "Done importing merkles into the wallet"
logBlockChainAction action
merkleSync pool newBSize notif
where
go lastMerkleM mTxsAcc aMap = await >>= \resM -> case resM of
Just (Right tx) -> do
$(logDebug) $ pack $ unwords
[ "Importing merkle tx", cs $ txHashToHex $ txHash tx ]
(_, newAddrs) <- lift $ runDBPool (importNetTx tx Nothing) pool
$(logDebug) $ pack $ unwords
[ "Generated", show $ length newAddrs
, "new addresses while importing tx"
, cs $ txHashToHex $ txHash tx
]
let newMap = M.unionWith (+) aMap $ groupByAcc newAddrs
go lastMerkleM mTxsAcc newMap
Just (Left (MerkleBlock mHead _ _ _, mTxs)) -> do
$(logDebug) $ pack $ unwords
[ "Buffering merkle block"
, cs $ blockHashToHex $ headerHash mHead
]
go (Just mHead) (mTxs:mTxsAcc) aMap
_ -> return (lastMerkleM, reverse mTxsAcc, aMap)
groupByAcc addrs =
let xs = map (\a -> (walletAddrAccount a, 1)) addrs
in M.fromListWith (+) xs
shouldRescan aMap = do
res <- (`runDBPool` pool) $ splitSelect (M.assocs aMap) $ \ks ->
from $ \a -> do
let andCond (ai, cnt) =
a ^. AccountId ==. val ai &&.
a ^. AccountGap <=. val cnt
where_ $ join2 $ map andCond ks
return $ a ^. AccountId
return $ not $ null res
logBlockChainAction action = case action of
BestChain nodes -> $(logInfo) $ pack $ unwords
[ "Best chain height"
, show $ nodeBlockHeight $ last nodes
, "(", cs $ blockHashToHex $ nodeHash $ last nodes
, ")"
]
ChainReorg _ o n -> $(logInfo) $ pack $ unlines $
[ "Chain reorg."
, "Orphaned blocks:"
]
++ map ((" " ++) . cs . blockHashToHex . nodeHash) o
++ [ "New blocks:" ]
++ map ((" " ++) . cs . blockHashToHex . nodeHash) n
++ [ unwords [ "Best merkle chain height"
, show $ nodeBlockHeight $ last n
]
]
SideChain n -> $(logWarn) $ pack $ unlines $
"Side chain:" :
map ((" " ++) . cs . blockHashToHex . nodeHash) n
KnownChain n -> $(logWarn) $ pack $ unlines $
"Known chain:" :
map ((" " ++) . cs . blockHashToHex . nodeHash) n
maybeDetach :: Config -> IO () -> IO ()
maybeDetach cfg action =
if configDetach cfg then runDetached pidFile logFile action else action
where
pidFile = Just $ configPidFile cfg
logFile = ToFile $ configLogFile cfg
stopSPVServer :: Config -> IO ()
stopSPVServer cfg =
killAndWait $ configPidFile cfg
runWalletApp :: ( MonadLoggerIO m
, MonadBaseControl IO m
, MonadBase IO m
, MonadThrow m
, MonadResource m
)
=> HandlerSession -> m ()
runWalletApp session = do
na <- async $ liftBaseOpDiscard withContext $ \ctx ->
liftBaseOpDiscard (withSocket ctx Pub) $ \sock -> do
liftIO $ setLinger (restrict (0 :: Int)) sock
setupCrypto ctx sock
liftIO $ bind sock $ configBindNotif $ handlerConfig session
forever $ do
xM <- liftIO $ atomically $ readTBMChan $ handlerNotifChan session
forM_ xM $ \x ->
let (typ, pay) = case x of
NotifBlock _ ->
("[block]", cs $ encode x)
NotifTx JsonTx{..} ->
("{" <> cs jsonTxAccount <> "}", cs $ encode x)
in liftIO $ sendMulti sock $ typ :| [pay]
link na
liftBaseOpDiscard withContext $ \ctx ->
liftBaseOpDiscard (withSocket ctx Rep) $ \sock -> do
liftIO $ setLinger (restrict (0 :: Int)) sock
setupCrypto ctx sock
liftIO $ bind sock $ configBind $ handlerConfig session
forever $ do
bs <- liftIO $ receive sock
res <- case decode $ BL.fromStrict bs of
Just r -> catchErrors $
runHandler (dispatchRequest r) session
Nothing -> return $ ResponseError "Could not decode request"
liftIO $ send sock [] $ BL.toStrict $ encode res
where
setupCrypto :: (MonadLoggerIO m, MonadBaseControl IO m)
=> Context -> Socket a -> m ()
setupCrypto ctx sock = do
when (isJust serverKeyM) $ liftIO $ do
let k = fromJust $ configServerKey $ handlerConfig session
setCurveServer True sock
setCurveSecretKey TextFormat k sock
when (isJust clientKeyPubM) $ do
k <- z85Decode (fromJust clientKeyPubM)
void $ async $ runZapAuth ctx k
cfg = handlerConfig session
serverKeyM = configServerKey cfg
clientKeyPubM = configClientKeyPub cfg
catchErrors m = catches m
[ E.Handler $ \(WalletException err) -> do
$(logError) $ pack err
return $ ResponseError $ pack err
, E.Handler $ \(ErrorCall err) -> do
$(logError) $ pack err
return $ ResponseError $ pack err
, E.Handler $ \(SomeException exc) -> do
$(logError) $ pack $ show exc
return $ ResponseError $ pack $ show exc
]
runZapAuth :: ( MonadLoggerIO m
, MonadBaseControl IO m
, MonadBase IO m
)
=> Context -> ByteString -> m ()
runZapAuth ctx k = do
$(logDebug) $ "Starting ØMQ authentication thread"
liftBaseOpDiscard (withSocket ctx Rep) $ \zap -> do
liftIO $ setLinger (restrict (0 :: Int)) zap
liftIO $ bind zap "inproc://zeromq.zap.01"
forever $ do
buffer <- liftIO $ receiveMulti zap
let actionE =
case buffer of
v:q:_:_:_:m:p:_ -> do
when (v /= "1.0") $
Left (q, "500", "Version number not valid")
when (m /= "CURVE") $
Left (q, "400", "Mechanism not supported")
when (p /= k) $
Left (q, "400", "Invalid client public key")
return q
_ -> Left ("", "500", "Malformed request")
case actionE of
Right q -> do
$(logInfo) "Authenticated client successfully"
liftIO $ sendMulti zap $
"1.0" :| [q, "200", "OK", "client", ""]
Left (q, c, m) -> do
$(logError) $ pack $ unwords
[ "Failed to authenticate client:" , cs c, cs m ]
liftIO $ sendMulti zap $
"1.0" :| [q, c, m, "", ""]
dispatchRequest :: ( MonadLoggerIO m
, MonadBaseControl IO m
, MonadBase IO m
, MonadThrow m
, MonadResource m
)
=> WalletRequest -> Handler m (WalletResponse Value)
dispatchRequest req = fmap ResponseValid $ case req of
GetAccountsR p -> getAccountsR p
PostAccountsR na -> postAccountsR na
PostAccountRenameR n n' -> postAccountRenameR n n'
GetAccountR n -> getAccountR n
PostAccountKeysR n ks -> postAccountKeysR n ks
PostAccountGapR n g -> postAccountGapR n g
GetAddressesR n t m o p -> getAddressesR n t m o p
GetAddressesUnusedR n t p -> getAddressesUnusedR n t p
GetAddressR n i t m o -> getAddressR n i t m o
PutAddressR n i t l -> putAddressR n i t l
PostAddressesR n i t -> postAddressesR n i t
GetTxsR n p -> getTxsR n p
GetAddrTxsR n i t p -> getAddrTxsR n i t p
PostTxsR n k a -> postTxsR n k a
GetTxR n h -> getTxR n h
GetOfflineTxR n h -> getOfflineTxR n h
PostOfflineTxR n k t c -> postOfflineTxR n k t c
GetBalanceR n mc o -> getBalanceR n mc o
PostNodeR na -> postNodeR na
DeleteTxIdR t -> deleteTxIdR t
GetSyncR a n b -> getSyncR a (Right n) b
GetSyncHeightR a n b -> getSyncR a (Left n) b
GetPendingR a p -> getPendingR a p
GetDeadR a p -> getDeadR a p