{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE TemplateHaskell #-}
module Network.Haskoin.Wallet.Server
( runSPVServer
, stopSPVServer
, runSPVServerWithContext
) where
import Control.Concurrent.Async.Lifted (async, link,
waitAnyCancel)
import Control.Concurrent.STM (atomically, retry)
import Control.Concurrent.STM.TBMChan (TBMChan, newTBMChan,
readTBMChan)
import Control.DeepSeq (NFData (..))
import Control.Exception.Lifted (ErrorCall (..),
SomeException (..),
catches)
import qualified Control.Exception.Lifted as E (Handler (..))
import Control.Monad (forM_, forever, unless,
void, when)
import Control.Monad.Base (MonadBase)
import Control.Monad.Catch (MonadThrow)
import Control.Monad.Fix (fix)
import Control.Monad.Logger (MonadLoggerIO,
filterLogger, logDebug,
logError, logInfo,
logWarn,
runStdoutLoggingT)
import Control.Monad.Trans (lift, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl,
liftBaseOpDiscard)
import Control.Monad.Trans.Resource (MonadResource,
runResourceT)
import Data.Aeson (Value, decode, encode)
import Data.ByteString (ByteString)
import qualified Data.ByteString.Lazy as BL (fromStrict,
toStrict)
import Data.Conduit (await, awaitForever,
($$))
import qualified Data.HashMap.Strict as H (lookup)
import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.Map.Strict as M (Map, assocs, elems,
empty,
fromListWith, null,
unionWith)
import Data.Maybe (fromJust, fromMaybe,
isJust)
import Data.Monoid ((<>))
import Data.String.Conversions (cs)
import Data.Text (pack)
import Data.Word (Word32)
import Database.Esqueleto (from, val, where_,
(&&.), (<=.), (==.),
(^.))
import Database.Persist.Sql (ConnectionPool,
runMigration)
import Network.Haskoin.Block
import Network.Haskoin.Constants
import Network.Haskoin.Node.BlockChain
import Network.Haskoin.Node.HeaderTree
import Network.Haskoin.Node.Peer
import Network.Haskoin.Node.STM
import Network.Haskoin.Transaction
import Network.Haskoin.Wallet.Accounts
import Network.Haskoin.Wallet.Database
import Network.Haskoin.Wallet.Model
import Network.Haskoin.Wallet.Server.Handler
import Network.Haskoin.Wallet.Settings
import Network.Haskoin.Wallet.Transaction
import Network.Haskoin.Wallet.Types
import System.Posix.Daemon (Redirection (ToFile),
killAndWait,
runDetached)
import System.ZMQ4 (Context, KeyFormat (..),
Pub (..), Rep (..),
Socket, bind, receive,
receiveMulti, restrict,
send, sendMulti,
setCurveSecretKey,
setCurveServer,
setLinger, withContext,
withSocket, z85Decode)
data EventSession = EventSession
{ eventBatchSize :: !Int
, eventNewAddrs :: !(M.Map AccountId Word32)
}
deriving (Eq, Show, Read)
instance NFData EventSession where
rnf EventSession{..} =
rnf eventBatchSize `seq`
rnf (M.elems eventNewAddrs)
runSPVServer :: Config -> IO ()
runSPVServer cfg = maybeDetach cfg $
withContext (run . runSPVServerWithContext cfg)
where
run = runResourceT . runLogging
runLogging = runStdoutLoggingT . filterLogger logFilter
logFilter _ level = level >= configLogLevel cfg
runSPVServerWithContext :: ( MonadLoggerIO m
, MonadBaseControl IO m
, MonadBase IO m
, MonadThrow m
, MonadResource m
)
=> Config -> Context -> m ()
runSPVServerWithContext cfg ctx = do
pool <- initDatabase cfg
notif <- liftIO $ atomically $ newTBMChan 1000
case configMode cfg of
SPVOffline -> do
let session = HandlerSession cfg pool Nothing notif
as <- mapM async
[ runWalletCmd ctx session
, runWalletNotif ctx session
]
mapM_ link as
(_,r) <- waitAnyCancel as
return r
SPVOnline -> do
node <- getNodeState (Right pool)
let session = HandlerSession cfg pool (Just node) notif
as <- mapM async
[ runNodeT (spv pool) node
, runNodeT (runMerkleSync pool notif) node
, runNodeT (txSource $$ processTx pool notif) node
, runNodeT (handleGetData $ (`runDBPool` pool) . getTx) node
, runNodeT (broadcastPendingTxs pool) node
, runWalletCmd ctx session
, runWalletNotif ctx session
]
mapM_ link as
(_,r) <- waitAnyCancel as
$(logDebug) "Exiting main thread"
return r
where
spv pool = do
(bloom, elems, _) <- runDBPool getBloomFilter pool
startSPVNode hosts bloom elems
nodes = fromMaybe
(error $ "BTC nodes for " ++ networkName ++ " not found")
(pack networkName `H.lookup` configBTCNodes cfg)
hosts = map (\x -> PeerHost (btcNodeHost x) (btcNodePort x)) nodes
runMerkleSync pool notif = do
$(logDebug) "Waiting for a valid bloom filter for merkle downloads..."
_ <- atomicallyNodeT waitBloomFilter
fcM <- fmap (fmap adjustFCTime) $ (`runDBPool` pool) $ do
(_, h) <- walletBestBlock
if h == 0 then firstAddrTime else return Nothing
maybe (return ()) (atomicallyNodeT . rescanTs) fcM
merkleSync pool 500 notif
$(logDebug) "Exiting Merkle-sync thread"
broadcastPendingTxs pool = forever $ do
(hash, _) <- runSqlNodeT $ walletBestBlock
atomicallyNodeT $ do
synced <- areBlocksSynced hash
unless synced $ lift retry
broadcastTxs =<< runDBPool (getPendingTxs 0) pool
atomicallyNodeT $ do
synced <- areBlocksSynced hash
when synced $ lift retry
$(logDebug) "Exiting tx-broadcast thread"
processTx pool notif = do
awaitForever $ \tx -> lift $ do
(_, newAddrs) <- runDBPool (importNetTx tx (Just notif)) pool
unless (null newAddrs) $ do
$(logInfo) $ pack $ unwords
[ "Generated", show $ length newAddrs
, "new addresses while importing the tx."
, "Updating the bloom filter"
]
(bloom, elems, _) <- runDBPool getBloomFilter pool
atomicallyNodeT $ sendBloomFilter bloom elems
$(logDebug) "Exiting tx-import thread"
initDatabase :: (MonadBaseControl IO m, MonadLoggerIO m)
=> Config -> m ConnectionPool
initDatabase cfg = do
let dbCfg = fromMaybe
(error $ "DB config settings for " ++ networkName ++ " not found")
(pack networkName `H.lookup` configDatabase cfg)
pool <- getDatabasePool dbCfg
flip runDBPool pool $ do
_ <- runMigration migrateWallet
_ <- runMigration migrateHeaderTree
initWallet $ configBloomFP cfg
return pool
merkleSync
:: (MonadLoggerIO m, MonadBaseControl IO m, MonadThrow m, MonadResource m)
=> ConnectionPool
-> Word32
-> TBMChan Notif
-> NodeT m ()
merkleSync pool bSize notif = do
(hash, _) <- runDBPool walletBestBlock pool
$(logDebug) "Starting merkle batch download"
(action, source) <- merkleDownload hash bSize
$(logDebug) "Received a merkle action and source. Processing the source..."
(lastMerkleM, mTxsAcc, aMap) <- source $$ go Nothing [] M.empty
$(logDebug) "Merkle source processed and closed"
unless (M.null aMap) $ do
$(logInfo) $ pack $ unwords
[ "Generated", show $ sum $ M.elems aMap
, "new addresses while importing the merkle block."
, "Sending our bloom filter."
]
(bloom, elems, _) <- runDBPool getBloomFilter pool
atomicallyNodeT $ sendBloomFilter bloom elems
$(logDebug) "Checking if we need to rescan the current batch..."
rescan <- shouldRescan aMap
when rescan $ $(logDebug) "We need to rescan the current batch"
let newBSize | rescan = max 1 $ bSize `div` 2
| otherwise = min 500 $ bSize + max 1 (bSize `div` 20)
when (newBSize /= bSize) $ $(logDebug) $ pack $ unwords
[ "Changing block batch size from", show bSize, "to", show newBSize ]
let missing = (headerHash <$> lastMerkleM) /=
Just (nodeHash $ last $ actionNodes action)
when missing $ $(logWarn) $ pack $ unwords
[ "Merkle block stream closed prematurely"
, show lastMerkleM
]
unless (rescan || missing) $ do
$(logDebug) "Importing merkles into the wallet..."
runDBPool (importMerkles action mTxsAcc (Just notif)) pool
$(logDebug) "Done importing merkles into the wallet"
logBlockChainAction action
merkleSync pool newBSize notif
where
go lastMerkleM mTxsAcc aMap = await >>= \resM -> case resM of
Just (Right tx) -> do
$(logDebug) $ pack $ unwords
[ "Importing merkle tx", cs $ txHashToHex $ txHash tx ]
(_, newAddrs) <- lift $ runDBPool (importNetTx tx Nothing) pool
$(logDebug) $ pack $ unwords
[ "Generated", show $ length newAddrs
, "new addresses while importing tx"
, cs $ txHashToHex $ txHash tx
]
let newMap = M.unionWith (+) aMap $ groupByAcc newAddrs
go lastMerkleM mTxsAcc newMap
Just (Left (MerkleBlock mHead _ _ _, mTxs)) -> do
$(logDebug) $ pack $ unwords
[ "Buffering merkle block"
, cs $ blockHashToHex $ headerHash mHead
]
go (Just mHead) (mTxs:mTxsAcc) aMap
_ -> return (lastMerkleM, reverse mTxsAcc, aMap)
groupByAcc addrs =
let xs = map (\a -> (walletAddrAccount a, 1)) addrs
in M.fromListWith (+) xs
shouldRescan aMap = do
res <- (`runDBPool` pool) $ splitSelect (M.assocs aMap) $ \ks ->
from $ \a -> do
let andCond (ai, cnt) =
a ^. AccountId ==. val ai &&.
a ^. AccountGap <=. val cnt
where_ $ join2 $ map andCond ks
return $ a ^. AccountId
return $ not $ null res
logBlockChainAction action = case action of
BestChain nodes -> $(logInfo) $ pack $ unwords
[ "Best chain height"
, show $ nodeBlockHeight $ last nodes
, "(", cs $ blockHashToHex $ nodeHash $ last nodes
, ")"
]
ChainReorg _ o n -> $(logInfo) $ pack $ unlines $
[ "Chain reorg."
, "Orphaned blocks:"
]
++ map ((" " ++) . cs . blockHashToHex . nodeHash) o
++ [ "New blocks:" ]
++ map ((" " ++) . cs . blockHashToHex . nodeHash) n
++ [ unwords [ "Best merkle chain height"
, show $ nodeBlockHeight $ last n
]
]
SideChain n -> $(logWarn) $ pack $ unlines $
"Side chain:" :
map ((" " ++) . cs . blockHashToHex . nodeHash) n
KnownChain n -> $(logWarn) $ pack $ unlines $
"Known chain:" :
map ((" " ++) . cs . blockHashToHex . nodeHash) n
maybeDetach :: Config -> IO () -> IO ()
maybeDetach cfg action =
if configDetach cfg then runDetached pidFile logFile action >> logStarted else action
where
logStarted = putStrLn "Process started"
pidFile = Just $ configPidFile cfg
logFile = ToFile $ configLogFile cfg
stopSPVServer :: Config -> IO ()
stopSPVServer cfg =
killAndWait $ configPidFile cfg
runWalletNotif :: ( MonadLoggerIO m
, MonadBaseControl IO m
, MonadBase IO m
, MonadThrow m
, MonadResource m
)
=> Context -> HandlerSession -> m ()
runWalletNotif ctx session =
liftBaseOpDiscard (withSocket ctx Pub) $ \sock -> do
liftIO $ setLinger (restrict (0 :: Int)) sock
setupCrypto ctx sock session
liftIO $ bind sock $ configBindNotif $ handlerConfig session
forever $ do
xM <- liftIO $ atomically $ readTBMChan $ handlerNotifChan session
forM_ xM $ \x ->
let (typ, pay) = case x of
NotifBlock _ ->
("[block]", cs $ encode x)
NotifTx JsonTx{..} ->
("{" <> cs jsonTxAccount <> "}", cs $ encode x)
in liftIO $ sendMulti sock $ typ :| [pay]
runWalletCmd :: ( MonadLoggerIO m
, MonadBaseControl IO m
, MonadBase IO m
, MonadThrow m
, MonadResource m
)
=> Context -> HandlerSession -> m ()
runWalletCmd ctx session = do
liftBaseOpDiscard (withSocket ctx Rep) $ \sock -> do
liftIO $ setLinger (restrict (0 :: Int)) sock
setupCrypto ctx sock session
liftIO $ bind sock $ configBind $ handlerConfig session
fix $ \loop -> do
bs <- liftIO $ receive sock
let msg = decode $ BL.fromStrict bs
res <- case msg of
Just StopServerR -> do
$(logInfo) "Received StopServer request"
return (ResponseValid Nothing)
Just r -> catchErrors $
runHandler (dispatchRequest r) session
Nothing -> return $ ResponseError "Could not decode request"
liftIO $ send sock [] $ BL.toStrict $ encode res
unless (msg == Just StopServerR) loop
$(logInfo) "Exiting ZMQ command thread..."
where
catchErrors m = catches m
[ E.Handler $ \(WalletException err) -> do
$(logError) $ pack err
return $ ResponseError $ pack err
, E.Handler $ \(ErrorCall err) -> do
$(logError) $ pack err
return $ ResponseError $ pack err
, E.Handler $ \(SomeException exc) -> do
$(logError) $ pack $ show exc
return $ ResponseError $ pack $ show exc
]
setupCrypto :: (MonadLoggerIO m, MonadBaseControl IO m)
=> Context -> Socket a -> HandlerSession -> m ()
setupCrypto ctx' sock session = do
when (isJust serverKeyM) $ liftIO $ do
let k = fromJust $ configServerKey $ handlerConfig session
setCurveServer True sock
setCurveSecretKey TextFormat k sock
when (isJust clientKeyPubM) $ do
k <- z85Decode (fromJust clientKeyPubM)
void $ async $ runZapAuth ctx' k
where
cfg = handlerConfig session
serverKeyM = configServerKey cfg
clientKeyPubM = configClientKeyPub cfg
runZapAuth :: ( MonadLoggerIO m
, MonadBaseControl IO m
, MonadBase IO m
)
=> Context -> ByteString -> m ()
runZapAuth ctx k = do
$(logDebug) $ "Starting ØMQ authentication thread"
liftBaseOpDiscard (withSocket ctx Rep) $ \zap -> do
liftIO $ setLinger (restrict (0 :: Int)) zap
liftIO $ bind zap "inproc://zeromq.zap.01"
forever $ do
buffer <- liftIO $ receiveMulti zap
let actionE =
case buffer of
v:q:_:_:_:m:p:_ -> do
when (v /= "1.0") $
Left (q, "500", "Version number not valid")
when (m /= "CURVE") $
Left (q, "400", "Mechanism not supported")
when (p /= k) $
Left (q, "400", "Invalid client public key")
return q
_ -> Left ("", "500", "Malformed request")
case actionE of
Right q -> do
$(logInfo) "Authenticated client successfully"
liftIO $ sendMulti zap $
"1.0" :| [q, "200", "OK", "client", ""]
Left (q, c, m) -> do
$(logError) $ pack $ unwords
[ "Failed to authenticate client:" , cs c, cs m ]
liftIO $ sendMulti zap $
"1.0" :| [q, c, m, "", ""]
dispatchRequest :: ( MonadLoggerIO m
, MonadBaseControl IO m
, MonadBase IO m
, MonadThrow m
, MonadResource m
)
=> WalletRequest -> Handler m (WalletResponse Value)
dispatchRequest req = fmap ResponseValid $ case req of
GetAccountsR p -> getAccountsR p
PostAccountsR na -> postAccountsR na
PostAccountRenameR n n' -> postAccountRenameR n n'
GetAccountR n -> getAccountR n
PostAccountKeysR n ks -> postAccountKeysR n ks
PostAccountGapR n g -> postAccountGapR n g
GetAddressesR n t m o p -> getAddressesR n t m o p
GetAddressesUnusedR n t p -> getAddressesUnusedR n t p
GetAddressR n i t m o -> getAddressR n i t m o
GetIndexR n k t -> getIndexR n k t
PutAddressR n i t l -> putAddressR n i t l
PostAddressesR n i t -> postAddressesR n i t
GetTxsR n p -> getTxsR n p
GetAddrTxsR n i t p -> getAddrTxsR n i t p
PostTxsR n k a -> postTxsR n k a
GetTxR n h -> getTxR n h
GetOfflineTxR n h -> getOfflineTxR n h
PostOfflineTxR n k t c -> postOfflineTxR n k t c
GetBalanceR n mc o -> getBalanceR n mc o
PostNodeR na -> postNodeR na
DeleteTxIdR t -> deleteTxIdR t
GetSyncR a n b -> getSyncR a (Right n) b
GetSyncHeightR a n b -> getSyncR a (Left n) b
GetPendingR a p -> getPendingR a p
GetDeadR a p -> getDeadR a p
GetBlockInfoR l -> getBlockInfoR l
StopServerR -> error "Should be handled elsewhere"