{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE NoFieldSelectors #-}

module Haskoin.Store.Manager
  ( StoreConfig (..),
    Store (..),
    withStore,
  )
where

import Control.Monad (forever, unless)
import Control.Monad.Cont
  ( ContT (..),
    runContT,
  )
import Control.Monad.Logger (MonadLoggerIO)
import Control.Monad.Reader (ReaderT (ReaderT), runReaderT)
import Control.Monad.Trans (lift)
import Data.Serialize (decode)
import Data.Time.Clock (NominalDiffTime)
import Data.Word (Word32)
import Haskoin
  ( BlockHash (..),
    Ctx,
    Inv (..),
    InvType (..),
    InvVector (..),
    Message (..),
    MessageCommand (..),
    Network,
    NetworkAddress (..),
    NotFound (..),
    Pong (..),
    Reject (..),
    TxHash (..),
    VarString (..),
    sockToHostAddress,
  )
import Haskoin.Node
  ( Chain,
    ChainEvent (..),
    Node (..),
    NodeConfig (..),
    NodeEvent (..),
    PeerEvent (..),
    PeerMgr,
    WithConnection,
    withNode,
  )
import Haskoin.Store.BlockStore
  ( BlockStore,
    BlockStoreConfig (..),
    blockStoreBlockSTM,
    blockStoreHeadSTM,
    blockStoreNotFoundSTM,
    blockStorePeerConnectSTM,
    blockStorePeerDisconnectSTM,
    blockStoreTxHashSTM,
    blockStoreTxSTM,
    withBlockStore,
  )
import Haskoin.Store.Cache
  ( CacheConfig (..),
    CacheWriter,
    cacheNewBlock,
    cacheNewTx,
    cacheSyncMempool,
    cacheWriter,
    connectRedis,
    newCacheMetrics,
  )
import Haskoin.Store.Common
  ( StoreEvent (..),
  )
import Haskoin.Store.Database.Reader
  ( DatabaseReader (..),
    DatabaseReaderT,
    createDataMetrics,
    withDatabaseReader,
  )
import NQE
  ( Inbox,
    Process (..),
    Publisher,
    publishSTM,
    receive,
    withProcess,
    withPublisher,
    withSubscription,
  )
import Network.Socket (SockAddr (..))
import System.Metrics.StatsD
import UnliftIO
  ( MonadIO,
    MonadUnliftIO,
    STM,
    atomically,
    link,
    withAsync,
  )
import UnliftIO.Concurrent (threadDelay)

-- | Store mailboxes.
data Store = Store
  { Store -> PeerMgr
peerMgr :: !PeerMgr,
    Store -> Chain
chain :: !Chain,
    Store -> BlockStore
block :: !BlockStore,
    Store -> DatabaseReader
db :: !DatabaseReader,
    Store -> Maybe CacheConfig
cache :: !(Maybe CacheConfig),
    Store -> Publisher StoreEvent
pub :: !(Publisher StoreEvent),
    Store -> Network
net :: !Network,
    Store -> Ctx
ctx :: !Ctx
  }

-- | Configuration for a 'Store'.
data StoreConfig = StoreConfig
  { -- | max peers to connect to
    StoreConfig -> Int
maxPeers :: !Int,
    -- | static set of peers to connect to
    StoreConfig -> [String]
initPeers :: ![String],
    -- | discover new peers
    StoreConfig -> Bool
discover :: !Bool,
    -- | RocksDB database path
    StoreConfig -> String
db :: !FilePath,
    -- | network constants
    StoreConfig -> Network
net :: !Network,
    -- | Redis cache configuration
    StoreConfig -> Maybe String
redis :: !(Maybe String),
    -- | Secp256k1 context
    StoreConfig -> Ctx
ctx :: !Ctx,
    -- | gap on extended public key with no transactions
    StoreConfig -> HostAddress
initGap :: !Word32,
    -- | gap for extended public keys
    StoreConfig -> HostAddress
gap :: !Word32,
    -- | cache xpubs with more than this many used addresses
    StoreConfig -> Int
redisMinAddrs :: !Int,
    -- | maximum number of keys in Redis cache
    StoreConfig -> Integer
redisMaxKeys :: !Integer,
    -- | do not index new mempool transactions
    StoreConfig -> Bool
noMempool :: !Bool,
    -- | wipe mempool when starting
    StoreConfig -> Bool
wipeMempool :: !Bool,
    -- | sync mempool from peers
    StoreConfig -> Bool
syncMempool :: !Bool,
    -- | disconnect peer if message not received for this many seconds
    StoreConfig -> NominalDiffTime
peerTimeout :: !NominalDiffTime,
    -- | disconnect peer if it has been connected this long
    StoreConfig -> NominalDiffTime
maxPeerLife :: !NominalDiffTime,
    -- | connect to peers using the function 'withConnection'
    StoreConfig -> SockAddr -> WithConnection
connect :: !(SockAddr -> WithConnection),
    -- | StatsD
    StoreConfig -> Maybe Stats
stats :: !(Maybe Stats),
    -- | sync mempool against cache every this many seconds
    StoreConfig -> Int
redisSyncInterval :: !Int
  }

withStore ::
  (MonadLoggerIO m, MonadUnliftIO m) =>
  StoreConfig ->
  (Store -> m a) ->
  m a
withStore :: forall (m :: * -> *) a.
(MonadLoggerIO m, MonadUnliftIO m) =>
StoreConfig -> (Store -> m a) -> m a
withStore StoreConfig
cfg Store -> m a
action =
  StoreConfig -> DatabaseReaderT m a -> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
StoreConfig -> DatabaseReaderT m a -> m a
connectDB StoreConfig
cfg (DatabaseReaderT m a -> m a) -> DatabaseReaderT m a -> m a
forall a b. (a -> b) -> a -> b
$
    (DatabaseReader -> m a) -> DatabaseReaderT m a
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT ((DatabaseReader -> m a) -> DatabaseReaderT m a)
-> (DatabaseReader -> m a) -> DatabaseReaderT m a
forall a b. (a -> b) -> a -> b
$ \DatabaseReader
db -> (ContT a m a -> (a -> m a) -> m a)
-> (a -> m a) -> ContT a m a -> m a
forall a b c. (a -> b -> c) -> b -> a -> c
flip ContT a m a -> (a -> m a) -> m a
forall {k} (r :: k) (m :: k -> *) a.
ContT r m a -> (a -> m r) -> m r
runContT a -> m a
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (ContT a m a -> m a) -> ContT a m a -> m a
forall a b. (a -> b) -> a -> b
$ do
      Publisher StoreEvent
pub <- ((Publisher StoreEvent -> m a) -> m a)
-> ContT a m (Publisher StoreEvent)
forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (Publisher StoreEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher
      Publisher NodeEvent
node_pub <- ((Publisher NodeEvent -> m a) -> m a)
-> ContT a m (Publisher NodeEvent)
forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (Publisher NodeEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher
      Inbox NodeEvent
node_sub <- ((Inbox NodeEvent -> m a) -> m a) -> ContT a m (Inbox NodeEvent)
forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Inbox NodeEvent -> m a) -> m a) -> ContT a m (Inbox NodeEvent))
-> ((Inbox NodeEvent -> m a) -> m a) -> ContT a m (Inbox NodeEvent)
forall a b. (a -> b) -> a -> b
$ Publisher NodeEvent -> (Inbox NodeEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher NodeEvent
node_pub
      Node
node <- ((Node -> m a) -> m a) -> ContT a m Node
forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Node -> m a) -> m a) -> ContT a m Node)
-> ((Node -> m a) -> m a) -> ContT a m Node
forall a b. (a -> b) -> a -> b
$ NodeConfig -> (Node -> m a) -> m a
forall (m :: * -> *) a.
(MonadLoggerIO m, MonadUnliftIO m) =>
NodeConfig -> (Node -> m a) -> m a
withNode (NodeConfig -> (Node -> m a) -> m a)
-> NodeConfig -> (Node -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ StoreConfig -> DatabaseReader -> Publisher NodeEvent -> NodeConfig
nodeCfg StoreConfig
cfg DatabaseReader
db Publisher NodeEvent
node_pub
      Maybe CacheConfig
cache_cfg <- ((Maybe CacheConfig -> m a) -> m a)
-> ContT a m (Maybe CacheConfig)
forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Maybe CacheConfig -> m a) -> m a)
 -> ContT a m (Maybe CacheConfig))
-> ((Maybe CacheConfig -> m a) -> m a)
-> ContT a m (Maybe CacheConfig)
forall a b. (a -> b) -> a -> b
$ StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache StoreConfig
cfg Node
node.chain DatabaseReader
db Publisher StoreEvent
pub
      BlockStore
block_store <- ((BlockStore -> m a) -> m a) -> ContT a m BlockStore
forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((BlockStore -> m a) -> m a) -> ContT a m BlockStore)
-> ((BlockStore -> m a) -> m a) -> ContT a m BlockStore
forall a b. (a -> b) -> a -> b
$ BlockStoreConfig -> (BlockStore -> m a) -> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
BlockStoreConfig -> (BlockStore -> m a) -> m a
withBlockStore (BlockStoreConfig -> (BlockStore -> m a) -> m a)
-> BlockStoreConfig -> (BlockStore -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg StoreConfig
cfg Node
node Publisher StoreEvent
pub DatabaseReader
db
      Async ()
fwd <- ((Async () -> m a) -> m a) -> ContT a m (Async ())
forall {k} (r :: k) (m :: k -> *) a.
((a -> m r) -> m r) -> ContT r m a
ContT (((Async () -> m a) -> m a) -> ContT a m (Async ()))
-> ((Async () -> m a) -> m a) -> ContT a m (Async ())
forall a b. (a -> b) -> a -> b
$ m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (m () -> (Async () -> m a) -> m a)
-> m () -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
forall (m :: * -> *).
MonadIO m =>
BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
nodeForwarder BlockStore
block_store Publisher StoreEvent
pub Inbox NodeEvent
node_sub
      Async () -> ContT a m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
fwd
      m a -> ContT a m a
forall (m :: * -> *) a. Monad m => m a -> ContT a m a
forall (t :: (* -> *) -> * -> *) (m :: * -> *) a.
(MonadTrans t, Monad m) =>
m a -> t m a
lift (m a -> ContT a m a) -> m a -> ContT a m a
forall a b. (a -> b) -> a -> b
$
        Store -> m a
action
          Store
            { $sel:peerMgr:Store :: PeerMgr
peerMgr = Node
node.peerMgr,
              $sel:chain:Store :: Chain
chain = Node
node.chain,
              $sel:block:Store :: BlockStore
block = BlockStore
block_store,
              $sel:db:Store :: DatabaseReader
db = DatabaseReader
db,
              $sel:cache:Store :: Maybe CacheConfig
cache = Maybe CacheConfig
cache_cfg,
              $sel:pub:Store :: Publisher StoreEvent
pub = Publisher StoreEvent
pub,
              $sel:net:Store :: Network
net = StoreConfig
cfg.net,
              $sel:ctx:Store :: Ctx
ctx = StoreConfig
cfg.ctx
            }

connectDB ::
  (MonadUnliftIO m) =>
  StoreConfig ->
  DatabaseReaderT m a ->
  m a
connectDB :: forall (m :: * -> *) a.
MonadUnliftIO m =>
StoreConfig -> DatabaseReaderT m a -> m a
connectDB StoreConfig
cfg DatabaseReaderT m a
f = do
  Maybe DataMetrics
stats <- (Stats -> m DataMetrics) -> Maybe Stats -> m (Maybe DataMetrics)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Maybe a -> m (Maybe b)
mapM Stats -> m DataMetrics
forall (m :: * -> *). MonadIO m => Stats -> m DataMetrics
createDataMetrics StoreConfig
cfg.stats
  Network
-> Ctx
-> HostAddress
-> HostAddress
-> String
-> Maybe DataMetrics
-> DatabaseReaderT m a
-> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
Network
-> Ctx
-> HostAddress
-> HostAddress
-> String
-> Maybe DataMetrics
-> DatabaseReaderT m a
-> m a
withDatabaseReader
    StoreConfig
cfg.net
    StoreConfig
cfg.ctx
    StoreConfig
cfg.initGap
    StoreConfig
cfg.gap
    StoreConfig
cfg.db
    Maybe DataMetrics
stats
    DatabaseReaderT m a
f

blockStoreCfg ::
  StoreConfig ->
  Node ->
  Publisher StoreEvent ->
  DatabaseReader ->
  BlockStoreConfig
blockStoreCfg :: StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg StoreConfig
cfg Node
node Publisher StoreEvent
pub DatabaseReader
db =
  BlockStoreConfig
    { $sel:chain:BlockStoreConfig :: Chain
chain = Node
node.chain,
      $sel:peerMgr:BlockStoreConfig :: PeerMgr
peerMgr = Node
node.peerMgr,
      $sel:pub:BlockStoreConfig :: Publisher StoreEvent
pub = Publisher StoreEvent
pub,
      $sel:db:BlockStoreConfig :: DatabaseReader
db = DatabaseReader
db,
      $sel:net:BlockStoreConfig :: Network
net = StoreConfig
cfg.net,
      $sel:ctx:BlockStoreConfig :: Ctx
ctx = StoreConfig
cfg.ctx,
      $sel:noMempool:BlockStoreConfig :: Bool
noMempool = StoreConfig
cfg.noMempool,
      $sel:wipeMempool:BlockStoreConfig :: Bool
wipeMempool = StoreConfig
cfg.wipeMempool,
      $sel:syncMempool:BlockStoreConfig :: Bool
syncMempool = StoreConfig
cfg.syncMempool,
      $sel:peerTimeout:BlockStoreConfig :: NominalDiffTime
peerTimeout = StoreConfig
cfg.peerTimeout,
      $sel:stats:BlockStoreConfig :: Maybe Stats
stats = StoreConfig
cfg.stats
    }

nodeCfg ::
  StoreConfig ->
  DatabaseReader ->
  Publisher NodeEvent ->
  NodeConfig
nodeCfg :: StoreConfig -> DatabaseReader -> Publisher NodeEvent -> NodeConfig
nodeCfg StoreConfig
cfg DatabaseReader
db Publisher NodeEvent
pub =
  NodeConfig
    { $sel:maxPeers:NodeConfig :: Int
maxPeers = StoreConfig
cfg.maxPeers,
      $sel:db:NodeConfig :: DB
db = DatabaseReader
db.db,
      $sel:cf:NodeConfig :: Maybe ColumnFamily
cf = Maybe ColumnFamily
forall a. Maybe a
Nothing,
      $sel:peers:NodeConfig :: [String]
peers = StoreConfig
cfg.initPeers,
      $sel:discover:NodeConfig :: Bool
discover = StoreConfig
cfg.discover,
      $sel:pub:NodeConfig :: Publisher NodeEvent
pub = Publisher NodeEvent
pub,
      $sel:address:NodeConfig :: NetworkAddress
address =
        Word64 -> HostAddress -> NetworkAddress
NetworkAddress
          Word64
0
          (SockAddr -> HostAddress
sockToHostAddress (PortNumber -> HostAddress -> SockAddr
SockAddrInet PortNumber
0 HostAddress
0)),
      $sel:net:NodeConfig :: Network
net = StoreConfig
cfg.net,
      $sel:timeout:NodeConfig :: NominalDiffTime
timeout = StoreConfig
cfg.peerTimeout,
      $sel:maxPeerLife:NodeConfig :: NominalDiffTime
maxPeerLife = StoreConfig
cfg.maxPeerLife,
      $sel:connect:NodeConfig :: SockAddr -> WithConnection
connect = StoreConfig
cfg.connect
    }

withCache ::
  (MonadUnliftIO m, MonadLoggerIO m) =>
  StoreConfig ->
  Chain ->
  DatabaseReader ->
  Publisher StoreEvent ->
  (Maybe CacheConfig -> m a) ->
  m a
withCache :: forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache StoreConfig
cfg Chain
chain DatabaseReader
db Publisher StoreEvent
pub Maybe CacheConfig -> m a
action =
  case StoreConfig
cfg.redis of
    Maybe String
Nothing ->
      Maybe CacheConfig -> m a
action Maybe CacheConfig
forall a. Maybe a
Nothing
    Just String
redisurl -> do
      Connection
conn <- String -> m Connection
forall (m :: * -> *). MonadIO m => String -> m Connection
connectRedis String
redisurl
      Maybe CacheMetrics
metrics <- (Stats -> m CacheMetrics) -> Maybe Stats -> m (Maybe CacheMetrics)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
forall (m :: * -> *) a b.
Monad m =>
(a -> m b) -> Maybe a -> m (Maybe b)
mapM (\Stats
s -> Stats -> Connection -> DatabaseReader -> m CacheMetrics
forall (m :: * -> *).
MonadLoggerIO m =>
Stats -> Connection -> DatabaseReader -> m CacheMetrics
newCacheMetrics Stats
s Connection
conn DatabaseReader
db) StoreConfig
cfg.stats
      Publisher StoreEvent -> (Inbox StoreEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher StoreEvent
pub ((Inbox StoreEvent -> m a) -> m a)
-> (Inbox StoreEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Inbox StoreEvent
evts ->
        let conf :: CacheConfig
conf = Connection -> Maybe CacheMetrics -> CacheConfig
c Connection
conn Maybe CacheMetrics
metrics
         in (Inbox CacheWriterMessage -> m ())
-> (Process CacheWriterMessage -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Inbox msg -> m ()) -> (Process msg -> m a) -> m a
withProcess (CacheConfig -> Inbox CacheWriterMessage -> m ()
forall {m :: * -> *}.
(MonadUnliftIO m, MonadLoggerIO m) =>
CacheConfig -> Inbox CacheWriterMessage -> m ()
f CacheConfig
conf) ((Process CacheWriterMessage -> m a) -> m a)
-> (Process CacheWriterMessage -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Process CacheWriterMessage
p ->
              Int -> Inbox StoreEvent -> CacheWriter -> m a -> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> Inbox StoreEvent -> CacheWriter -> m a -> m a
cacheWriterProcesses
                StoreConfig
cfg.redisSyncInterval
                Inbox StoreEvent
evts
                (Process CacheWriterMessage -> CacheWriter
forall msg. Process msg -> Mailbox msg
getProcessMailbox Process CacheWriterMessage
p)
                (Maybe CacheConfig -> m a
action (CacheConfig -> Maybe CacheConfig
forall a. a -> Maybe a
Just CacheConfig
conf))
  where
    f :: CacheConfig -> Inbox CacheWriterMessage -> m ()
f CacheConfig
conf Inbox CacheWriterMessage
cwinbox = ReaderT DatabaseReader m () -> DatabaseReader -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (CacheConfig
-> Inbox CacheWriterMessage -> ReaderT DatabaseReader m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadLoggerIO m, StoreReadExtra m) =>
CacheConfig -> Inbox CacheWriterMessage -> m ()
cacheWriter CacheConfig
conf Inbox CacheWriterMessage
cwinbox) DatabaseReader
db
    c :: Connection -> Maybe CacheMetrics -> CacheConfig
c Connection
conn Maybe CacheMetrics
metrics =
      CacheConfig
        { $sel:redis:CacheConfig :: Connection
redis = Connection
conn,
          $sel:minAddrs:CacheConfig :: Int
minAddrs = StoreConfig
cfg.redisMinAddrs,
          $sel:chain:CacheConfig :: Chain
chain = Chain
chain,
          $sel:maxKeys:CacheConfig :: Integer
maxKeys = StoreConfig
cfg.redisMaxKeys,
          $sel:metrics:CacheConfig :: Maybe CacheMetrics
metrics = Maybe CacheMetrics
metrics
        }

cacheWriterProcesses ::
  (MonadUnliftIO m) =>
  Int ->
  Inbox StoreEvent ->
  CacheWriter ->
  m a ->
  m a
cacheWriterProcesses :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> Inbox StoreEvent -> CacheWriter -> m a -> m a
cacheWriterProcesses Int
interval Inbox StoreEvent
evts CacheWriter
cwm m a
action =
  m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (Int -> Inbox StoreEvent -> CacheWriter -> m ()
forall (m :: * -> *).
MonadUnliftIO m =>
Int -> Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents Int
interval Inbox StoreEvent
evts CacheWriter
cwm) ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async ()
a1 -> Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a1 m () -> m a -> m a
forall a b. m a -> m b -> m b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m a
action

cacheWriterEvents :: (MonadUnliftIO m) => Int -> Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents :: forall (m :: * -> *).
MonadUnliftIO m =>
Int -> Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents Int
interval Inbox StoreEvent
evts CacheWriter
cwm =
  m Any -> (Async Any -> m ()) -> m ()
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync m Any
forall {b}. m b
mempool ((Async Any -> m ()) -> m ())
-> (m () -> Async Any -> m ()) -> m () -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. m () -> Async Any -> m ()
forall a b. a -> b -> a
const (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
    m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
      Inbox StoreEvent -> m StoreEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox StoreEvent
evts m StoreEvent -> (StoreEvent -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \StoreEvent
e ->
        StoreEvent
e StoreEvent -> CacheWriter -> m ()
forall (m :: * -> *).
MonadIO m =>
StoreEvent -> CacheWriter -> m ()
`cacheWriterDispatch` CacheWriter
cwm
  where
    mempool :: m b
mempool = m () -> m b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m b) -> m () -> m b
forall a b. (a -> b) -> a -> b
$ do
      Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (Int
interval Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000 Int -> Int -> Int
forall a. Num a => a -> a -> a
* Int
1000)
      CacheWriter -> m ()
forall (m :: * -> *). MonadIO m => CacheWriter -> m ()
cacheSyncMempool CacheWriter
cwm

cacheWriterDispatch :: (MonadIO m) => StoreEvent -> CacheWriter -> m ()
cacheWriterDispatch :: forall (m :: * -> *).
MonadIO m =>
StoreEvent -> CacheWriter -> m ()
cacheWriterDispatch (StoreBestBlock BlockHash
_) = CacheWriter -> m ()
forall (m :: * -> *). MonadIO m => CacheWriter -> m ()
cacheNewBlock
cacheWriterDispatch (StoreMempoolNew TxHash
t) = TxHash -> CacheWriter -> m ()
forall (m :: * -> *). MonadIO m => TxHash -> CacheWriter -> m ()
cacheNewTx TxHash
t
cacheWriterDispatch (StoreMempoolDelete TxHash
t) = TxHash -> CacheWriter -> m ()
forall (m :: * -> *). MonadIO m => TxHash -> CacheWriter -> m ()
cacheNewTx TxHash
t
cacheWriterDispatch StoreEvent
_ = m () -> CacheWriter -> m ()
forall a b. a -> b -> a
const (() -> m ()
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return ())

nodeForwarder ::
  (MonadIO m) =>
  BlockStore ->
  Publisher StoreEvent ->
  Inbox NodeEvent ->
  m ()
nodeForwarder :: forall (m :: * -> *).
MonadIO m =>
BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
nodeForwarder BlockStore
b Publisher StoreEvent
pub Inbox NodeEvent
sub =
  m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Inbox NodeEvent -> m NodeEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox NodeEvent
sub m NodeEvent -> (NodeEvent -> m ()) -> m ()
forall a b. m a -> (a -> m b) -> m b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> (NodeEvent -> STM ()) -> NodeEvent -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BlockStore -> Publisher StoreEvent -> NodeEvent -> STM ()
storeDispatch BlockStore
b Publisher StoreEvent
pub

-- | Dispatcher of node events.
storeDispatch ::
  BlockStore ->
  Publisher StoreEvent ->
  NodeEvent ->
  STM ()
storeDispatch :: BlockStore -> Publisher StoreEvent -> NodeEvent -> STM ()
storeDispatch BlockStore
b Publisher StoreEvent
pub (PeerEvent PeerEvent
pe) =
  case PeerEvent
pe of
    PeerConnected Peer
p -> do
      StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> StoreEvent
StorePeerConnected Peer
p) Publisher StoreEvent
pub
      Peer -> BlockStore -> STM ()
blockStorePeerConnectSTM Peer
p BlockStore
b
    PeerDisconnected Peer
p -> do
      StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> StoreEvent
StorePeerDisconnected Peer
p) Publisher StoreEvent
pub
      Peer -> BlockStore -> STM ()
blockStorePeerDisconnectSTM Peer
p BlockStore
b
    PeerMessage Peer
p Message
msg ->
      case Message
msg of
        MPong (Pong Word64
n) ->
          StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> Word64 -> StoreEvent
StorePeerPong Peer
p Word64
n) Publisher StoreEvent
pub
        MBlock Block
block ->
          Peer -> Block -> BlockStore -> STM ()
blockStoreBlockSTM Peer
p Block
block BlockStore
b
        MTx Tx
tx ->
          Peer -> Tx -> BlockStore -> STM ()
blockStoreTxSTM Peer
p Tx
tx BlockStore
b
        MNotFound (NotFound [InvVector]
is) -> do
          let blocks :: [BlockHash]
blocks =
                [ Hash256 -> BlockHash
BlockHash Hash256
h
                  | InvVector InvType
t Hash256
h <- [InvVector]
is,
                    InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvBlock Bool -> Bool -> Bool
|| InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvWitnessBlock
                ]
          Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([BlockHash] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [BlockHash]
blocks) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ Peer -> [BlockHash] -> BlockStore -> STM ()
blockStoreNotFoundSTM Peer
p [BlockHash]
blocks BlockStore
b
        MInv (Inv [InvVector]
is) -> do
          let txs :: [TxHash]
txs =
                [ Hash256 -> TxHash
TxHash Hash256
h
                  | InvVector InvType
t Hash256
h <- [InvVector]
is,
                    InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvTx Bool -> Bool -> Bool
|| InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvWitnessTx
                ]
          StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> [TxHash] -> StoreEvent
StoreTxAnnounce Peer
p [TxHash]
txs) Publisher StoreEvent
pub
          Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([TxHash] -> Bool
forall a. [a] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [TxHash]
txs) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ Peer -> [TxHash] -> BlockStore -> STM ()
blockStoreTxHashSTM Peer
p [TxHash]
txs BlockStore
b
        MReject Reject {$sel:message:Reject :: Reject -> MessageCommand
message = MessageCommand
MCTx, ByteString
extra :: ByteString
$sel:extra:Reject :: Reject -> ByteString
extra, RejectCode
code :: RejectCode
$sel:code:Reject :: Reject -> RejectCode
code, VarString
reason :: VarString
$sel:reason:Reject :: Reject -> VarString
reason} ->
          case ByteString -> Either String TxHash
forall a. Serialize a => ByteString -> Either String a
decode ByteString
extra of
            Left String
_ -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            Right TxHash
th ->
              let reject :: StoreEvent
reject =
                    Peer -> TxHash -> RejectCode -> ByteString -> StoreEvent
StoreTxReject
                      Peer
p
                      TxHash
th
                      RejectCode
code
                      VarString
reason.get
               in StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM StoreEvent
reject Publisher StoreEvent
pub
        Message
_ -> () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
storeDispatch BlockStore
b Publisher StoreEvent
_ (ChainEvent (ChainBestBlock BlockNode
bn)) =
  BlockNode -> BlockStore -> STM ()
blockStoreHeadSTM BlockNode
bn BlockStore
b
storeDispatch BlockStore
_ Publisher StoreEvent
_ (ChainEvent ChainEvent
_) = () -> STM ()
forall a. a -> STM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()