{-# LANGUAGE FlexibleContexts #-} module Haskoin.Store.Manager ( StoreConfig (..), Store (..), withStore, ) where import Control.Monad (forever, unless, when) import Control.Monad.Logger (MonadLoggerIO) import Control.Monad.Reader (ReaderT (ReaderT), runReaderT) import Data.Serialize (decode) import Data.Time.Clock (NominalDiffTime) import Data.Word (Word32) import Haskoin ( BlockHash (..), Inv (..), InvType (..), InvVector (..), Message (..), MessageCommand (..), Network, NetworkAddress (..), NotFound (..), Pong (..), Reject (..), TxHash (..), VarString (..), sockToHostAddress, ) import Haskoin.Node ( Chain, ChainEvent (..), HostPort, Node (..), NodeConfig (..), NodeEvent (..), PeerEvent (..), PeerManager, WithConnection, withNode, ) import Haskoin.Store.BlockStore ( BlockStore, BlockStoreConfig (..), blockStoreBlockSTM, blockStoreHeadSTM, blockStoreNotFoundSTM, blockStorePeerConnectSTM, blockStorePeerDisconnectSTM, blockStoreTxHashSTM, blockStoreTxSTM, withBlockStore, ) import Haskoin.Store.Cache ( CacheConfig (..), CacheWriter, cacheNewBlock, cacheNewTx, cacheWriter, connectRedis, newCacheMetrics, ) import Haskoin.Store.Common ( StoreEvent (..), createDataMetrics, ) import Haskoin.Store.Database.Reader ( DatabaseReader (..), DatabaseReaderT, withDatabaseReader, ) import NQE ( Inbox, Process (..), Publisher, publishSTM, receive, withProcess, withPublisher, withSubscription, ) import Network.Socket (SockAddr (..)) import qualified System.Metrics as Metrics (Store) import UnliftIO ( MonadIO, MonadUnliftIO, STM, atomically, link, withAsync, ) import UnliftIO.Concurrent (threadDelay) -- | Store mailboxes. data Store = Store { storeManager :: !PeerManager , storeChain :: !Chain , storeBlock :: !BlockStore , storeDB :: !DatabaseReader , storeCache :: !(Maybe CacheConfig) , storePublisher :: !(Publisher StoreEvent) , storeNetwork :: !Network } -- | Configuration for a 'Store'. data StoreConfig = StoreConfig { -- | max peers to connect to storeConfMaxPeers :: !Int , -- | static set of peers to connect to storeConfInitPeers :: ![HostPort] , -- | discover new peers storeConfDiscover :: !Bool , -- | RocksDB database path storeConfDB :: !FilePath , -- | network constants storeConfNetwork :: !Network , -- | Redis cache configuration storeConfCache :: !(Maybe String) , -- | gap on extended public key with no transactions storeConfInitialGap :: !Word32 , -- | gap for extended public keys storeConfGap :: !Word32 , -- | cache xpubs with more than this many used addresses storeConfCacheMin :: !Int , -- | maximum number of keys in Redis cache storeConfMaxKeys :: !Integer , -- | do not index new mempool transactions storeConfNoMempool :: !Bool , -- | wipe mempool when starting storeConfWipeMempool :: !Bool , -- | sync mempool from peers storeConfSyncMempool :: !Bool , -- | disconnect peer if message not received for this many seconds storeConfPeerTimeout :: !NominalDiffTime , -- | disconnect peer if it has been connected this long storeConfPeerMaxLife :: !NominalDiffTime , -- | connect to peers using the function 'withConnection' storeConfConnect :: !(SockAddr -> WithConnection) , -- | delay in microseconds to retry getting cache lock storeConfCacheRetryDelay :: !Int , -- | stats store storeConfStats :: !(Maybe Metrics.Store) } withStore :: (MonadLoggerIO m, MonadUnliftIO m) => StoreConfig -> (Store -> m a) -> m a withStore cfg action = connectDB cfg $ ReaderT $ \db -> withPublisher $ \pub -> withPublisher $ \node_pub -> withSubscription node_pub $ \node_sub -> withNode (nodeCfg cfg db node_pub) $ \node -> withCache cfg (nodeChain node) db pub $ \mcache -> withBlockStore (blockStoreCfg cfg node pub db) $ \b -> withAsync (nodeForwarder b pub node_sub) $ \a1 -> link a1 >> action Store { storeManager = nodeManager node , storeChain = nodeChain node , storeBlock = b , storeDB = db , storeCache = mcache , storePublisher = pub , storeNetwork = storeConfNetwork cfg } connectDB :: MonadUnliftIO m => StoreConfig -> DatabaseReaderT m a -> m a connectDB cfg f = do stats <- mapM createDataMetrics (storeConfStats cfg) withDatabaseReader (storeConfNetwork cfg) (storeConfInitialGap cfg) (storeConfGap cfg) (storeConfDB cfg) stats f blockStoreCfg :: StoreConfig -> Node -> Publisher StoreEvent -> DatabaseReader -> BlockStoreConfig blockStoreCfg cfg node pub db = BlockStoreConfig { blockConfChain = nodeChain node , blockConfManager = nodeManager node , blockConfListener = pub , blockConfDB = db , blockConfNet = storeConfNetwork cfg , blockConfNoMempool = storeConfNoMempool cfg , blockConfWipeMempool = storeConfWipeMempool cfg , blockConfSyncMempool = storeConfSyncMempool cfg , blockConfPeerTimeout = storeConfPeerTimeout cfg , blockConfStats = storeConfStats cfg } nodeCfg :: StoreConfig -> DatabaseReader -> Publisher NodeEvent -> NodeConfig nodeCfg cfg db pub = NodeConfig { nodeConfMaxPeers = storeConfMaxPeers cfg , nodeConfDB = databaseHandle db , nodeConfColumnFamily = Nothing , nodeConfPeers = storeConfInitPeers cfg , nodeConfDiscover = storeConfDiscover cfg , nodeConfEvents = pub , nodeConfNetAddr = NetworkAddress 0 (sockToHostAddress (SockAddrInet 0 0)) , nodeConfNet = storeConfNetwork cfg , nodeConfTimeout = storeConfPeerTimeout cfg , nodeConfPeerMaxLife = storeConfPeerMaxLife cfg , nodeConfConnect = storeConfConnect cfg } withCache :: (MonadUnliftIO m, MonadLoggerIO m) => StoreConfig -> Chain -> DatabaseReader -> Publisher StoreEvent -> (Maybe CacheConfig -> m a) -> m a withCache cfg chain db pub action = case storeConfCache cfg of Nothing -> action Nothing Just redisurl -> mapM newCacheMetrics (storeConfStats cfg) >>= \metrics -> connectRedis redisurl >>= \conn -> withSubscription pub $ \evts -> let conf = c conn metrics in withProcess (f conf) $ \p -> cacheWriterProcesses evts (getProcessMailbox p) $ do action (Just conf) where f conf cwinbox = runReaderT (cacheWriter conf cwinbox) db c conn metrics = CacheConfig { cacheConn = conn , cacheMin = storeConfCacheMin cfg , cacheChain = chain , cacheMax = storeConfMaxKeys cfg , cacheRetryDelay = storeConfCacheRetryDelay cfg , cacheMetrics = metrics } cacheWriterProcesses :: MonadUnliftIO m => Inbox StoreEvent -> CacheWriter -> m a -> m a cacheWriterProcesses evts cwm action = withAsync events $ \a1 -> link a1 >> action where events = cacheWriterEvents evts cwm cacheWriterEvents :: MonadIO m => Inbox StoreEvent -> CacheWriter -> m () cacheWriterEvents evts cwm = forever $ receive evts >>= \e -> e `cacheWriterDispatch` cwm cacheWriterDispatch :: MonadIO m => StoreEvent -> CacheWriter -> m () cacheWriterDispatch (StoreBestBlock _) = cacheNewBlock cacheWriterDispatch (StoreMempoolNew t) = cacheNewTx t cacheWriterDispatch (StoreMempoolDelete t) = cacheNewTx t cacheWriterDispatch _ = const (return ()) nodeForwarder :: MonadIO m => BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m () nodeForwarder b pub sub = forever $ receive sub >>= atomically . storeDispatch b pub -- | Dispatcher of node events. storeDispatch :: BlockStore -> Publisher StoreEvent -> NodeEvent -> STM () storeDispatch b pub (PeerEvent (PeerConnected p)) = do publishSTM (StorePeerConnected p) pub blockStorePeerConnectSTM p b storeDispatch b pub (PeerEvent (PeerDisconnected p)) = do publishSTM (StorePeerDisconnected p) pub blockStorePeerDisconnectSTM p b storeDispatch b _ (ChainEvent (ChainBestBlock bn)) = blockStoreHeadSTM bn b storeDispatch _ _ (ChainEvent _) = return () storeDispatch _ pub (PeerMessage p (MPong (Pong n))) = publishSTM (StorePeerPong p n) pub storeDispatch b _ (PeerMessage p (MBlock block)) = blockStoreBlockSTM p block b storeDispatch b _ (PeerMessage p (MTx tx)) = blockStoreTxSTM p tx b storeDispatch b _ (PeerMessage p (MNotFound (NotFound is))) = do let blocks = [ BlockHash h | InvVector t h <- is , t == InvBlock || t == InvWitnessBlock ] unless (null blocks) $ blockStoreNotFoundSTM p blocks b storeDispatch b pub (PeerMessage p (MInv (Inv is))) = do let txs = [TxHash h | InvVector t h <- is, t == InvTx || t == InvWitnessTx] publishSTM (StoreTxAnnounce p txs) pub unless (null txs) $ blockStoreTxHashSTM p txs b storeDispatch _ pub (PeerMessage p (MReject r)) = when (rejectMessage r == MCTx) $ case decode (rejectData r) of Left _ -> return () Right th -> let reject = StoreTxReject p th (rejectCode r) (getVarString (rejectReason r)) in publishSTM reject pub storeDispatch _ _ _ = return ()