{-# LANGUAGE FlexibleContexts #-}

module Haskoin.Store.Manager
  ( StoreConfig (..),
    Store (..),
    withStore,
  )
where

import Control.Monad (forever, unless, when)
import Control.Monad.Logger (MonadLoggerIO)
import Control.Monad.Reader (ReaderT (ReaderT), runReaderT)
import Data.Serialize (decode)
import Data.Time.Clock (NominalDiffTime)
import Data.Word (Word32)
import Haskoin
  ( BlockHash (..),
    Inv (..),
    InvType (..),
    InvVector (..),
    Message (..),
    MessageCommand (..),
    Network,
    NetworkAddress (..),
    NotFound (..),
    Pong (..),
    Reject (..),
    TxHash (..),
    VarString (..),
    sockToHostAddress,
  )
import Haskoin.Node
  ( Chain,
    ChainEvent (..),
    Node (..),
    NodeConfig (..),
    NodeEvent (..),
    PeerEvent (..),
    PeerManager,
    WithConnection,
    withNode,
  )
import Haskoin.Store.BlockStore
  ( BlockStore,
    BlockStoreConfig (..),
    blockStoreBlockSTM,
    blockStoreHeadSTM,
    blockStoreNotFoundSTM,
    blockStorePeerConnectSTM,
    blockStorePeerDisconnectSTM,
    blockStoreTxHashSTM,
    blockStoreTxSTM,
    withBlockStore,
  )
import Haskoin.Store.Cache
  ( CacheConfig (..),
    CacheWriter,
    cacheNewBlock,
    cacheNewTx,
    cacheWriter,
    connectRedis,
    newCacheMetrics,
  )
import Haskoin.Store.Common
  ( StoreEvent (..),
    createDataMetrics,
  )
import Haskoin.Store.Database.Reader
  ( DatabaseReader (..),
    DatabaseReaderT,
    withDatabaseReader,
  )
import NQE
  ( Inbox,
    Process (..),
    Publisher,
    publishSTM,
    receive,
    withProcess,
    withPublisher,
    withSubscription,
  )
import Network.Socket (SockAddr (..))
import qualified System.Metrics as Metrics (Store)
import UnliftIO
  ( MonadIO,
    MonadUnliftIO,
    STM,
    atomically,
    link,
    withAsync,
  )
import UnliftIO.Concurrent (threadDelay)

-- | Store mailboxes.
data Store = Store
  { Store -> PeerManager
storeManager :: !PeerManager,
    Store -> Chain
storeChain :: !Chain,
    Store -> BlockStore
storeBlock :: !BlockStore,
    Store -> DatabaseReader
storeDB :: !DatabaseReader,
    Store -> Maybe CacheConfig
storeCache :: !(Maybe CacheConfig),
    Store -> Publisher StoreEvent
storePublisher :: !(Publisher StoreEvent),
    Store -> Network
storeNetwork :: !Network
  }

-- | Configuration for a 'Store'.
data StoreConfig = StoreConfig
  { -- | max peers to connect to
    StoreConfig -> Int
storeConfMaxPeers :: !Int,
    -- | static set of peers to connect to
    StoreConfig -> [String]
storeConfInitPeers :: ![String],
    -- | discover new peers
    StoreConfig -> Bool
storeConfDiscover :: !Bool,
    -- | RocksDB database path
    StoreConfig -> String
storeConfDB :: !FilePath,
    -- | network constants
    StoreConfig -> Network
storeConfNetwork :: !Network,
    -- | Redis cache configuration
    StoreConfig -> Maybe String
storeConfCache :: !(Maybe String),
    -- | gap on extended public key with no transactions
    StoreConfig -> HostAddress
storeConfInitialGap :: !Word32,
    -- | gap for extended public keys
    StoreConfig -> HostAddress
storeConfGap :: !Word32,
    -- | cache xpubs with more than this many used addresses
    StoreConfig -> Int
storeConfCacheMin :: !Int,
    -- | maximum number of keys in Redis cache
    StoreConfig -> Integer
storeConfMaxKeys :: !Integer,
    -- | do not index new mempool transactions
    StoreConfig -> Bool
storeConfNoMempool :: !Bool,
    -- | wipe mempool when starting
    StoreConfig -> Bool
storeConfWipeMempool :: !Bool,
    -- | sync mempool from peers
    StoreConfig -> Bool
storeConfSyncMempool :: !Bool,
    -- | disconnect peer if message not received for this many seconds
    StoreConfig -> NominalDiffTime
storeConfPeerTimeout :: !NominalDiffTime,
    -- | disconnect peer if it has been connected this long
    StoreConfig -> NominalDiffTime
storeConfPeerMaxLife :: !NominalDiffTime,
    -- | connect to peers using the function 'withConnection'
    StoreConfig -> SockAddr -> WithConnection
storeConfConnect :: !(SockAddr -> WithConnection),
    -- | stats store
    StoreConfig -> Maybe Store
storeConfStats :: !(Maybe Metrics.Store)
  }

withStore ::
  (MonadLoggerIO m, MonadUnliftIO m) =>
  StoreConfig ->
  (Store -> m a) ->
  m a
withStore :: forall (m :: * -> *) a.
(MonadLoggerIO m, MonadUnliftIO m) =>
StoreConfig -> (Store -> m a) -> m a
withStore StoreConfig
cfg Store -> m a
action =
  forall (m :: * -> *) a.
MonadUnliftIO m =>
StoreConfig -> DatabaseReaderT m a -> m a
connectDB StoreConfig
cfg forall a b. (a -> b) -> a -> b
$
    forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT forall a b. (a -> b) -> a -> b
$ \DatabaseReader
db ->
      forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher forall a b. (a -> b) -> a -> b
$ \Publisher StoreEvent
pub ->
        forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher forall a b. (a -> b) -> a -> b
$ \Publisher NodeEvent
node_pub ->
          forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher NodeEvent
node_pub forall a b. (a -> b) -> a -> b
$ \Inbox NodeEvent
node_sub ->
            forall (m :: * -> *) a.
(MonadLoggerIO m, MonadUnliftIO m) =>
NodeConfig -> (Node -> m a) -> m a
withNode (StoreConfig -> DatabaseReader -> Publisher NodeEvent -> NodeConfig
nodeCfg StoreConfig
cfg DatabaseReader
db Publisher NodeEvent
node_pub) forall a b. (a -> b) -> a -> b
$ \Node
node ->
              forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache StoreConfig
cfg (Node -> Chain
nodeChain Node
node) DatabaseReader
db Publisher StoreEvent
pub forall a b. (a -> b) -> a -> b
$ \Maybe CacheConfig
mcache ->
                forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
BlockStoreConfig -> (BlockStore -> m a) -> m a
withBlockStore (StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg StoreConfig
cfg Node
node Publisher StoreEvent
pub DatabaseReader
db) forall a b. (a -> b) -> a -> b
$ \BlockStore
b ->
                  forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (forall (m :: * -> *).
MonadIO m =>
BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
nodeForwarder BlockStore
b Publisher StoreEvent
pub Inbox NodeEvent
node_sub) forall a b. (a -> b) -> a -> b
$ \Async ()
a1 ->
                    forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a1
                      forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Store -> m a
action
                        Store
                          { storeManager :: PeerManager
storeManager = Node -> PeerManager
nodeManager Node
node,
                            storeChain :: Chain
storeChain = Node -> Chain
nodeChain Node
node,
                            storeBlock :: BlockStore
storeBlock = BlockStore
b,
                            storeDB :: DatabaseReader
storeDB = DatabaseReader
db,
                            storeCache :: Maybe CacheConfig
storeCache = Maybe CacheConfig
mcache,
                            storePublisher :: Publisher StoreEvent
storePublisher = Publisher StoreEvent
pub,
                            storeNetwork :: Network
storeNetwork = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
                          }

connectDB :: MonadUnliftIO m => StoreConfig -> DatabaseReaderT m a -> m a
connectDB :: forall (m :: * -> *) a.
MonadUnliftIO m =>
StoreConfig -> DatabaseReaderT m a -> m a
connectDB StoreConfig
cfg DatabaseReaderT m a
f = do
  Maybe DataMetrics
stats <- forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM forall (m :: * -> *). MonadIO m => Store -> m DataMetrics
createDataMetrics (StoreConfig -> Maybe Store
storeConfStats StoreConfig
cfg)
  forall (m :: * -> *) a.
MonadUnliftIO m =>
Network
-> HostAddress
-> HostAddress
-> String
-> Maybe DataMetrics
-> DatabaseReaderT m a
-> m a
withDatabaseReader
    (StoreConfig -> Network
storeConfNetwork StoreConfig
cfg)
    (StoreConfig -> HostAddress
storeConfInitialGap StoreConfig
cfg)
    (StoreConfig -> HostAddress
storeConfGap StoreConfig
cfg)
    (StoreConfig -> String
storeConfDB StoreConfig
cfg)
    Maybe DataMetrics
stats
    DatabaseReaderT m a
f

blockStoreCfg ::
  StoreConfig ->
  Node ->
  Publisher StoreEvent ->
  DatabaseReader ->
  BlockStoreConfig
blockStoreCfg :: StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg StoreConfig
cfg Node
node Publisher StoreEvent
pub DatabaseReader
db =
  BlockStoreConfig
    { blockConfChain :: Chain
blockConfChain = Node -> Chain
nodeChain Node
node,
      blockConfManager :: PeerManager
blockConfManager = Node -> PeerManager
nodeManager Node
node,
      blockConfListener :: Publisher StoreEvent
blockConfListener = Publisher StoreEvent
pub,
      blockConfDB :: DatabaseReader
blockConfDB = DatabaseReader
db,
      blockConfNet :: Network
blockConfNet = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg,
      blockConfNoMempool :: Bool
blockConfNoMempool = StoreConfig -> Bool
storeConfNoMempool StoreConfig
cfg,
      blockConfWipeMempool :: Bool
blockConfWipeMempool = StoreConfig -> Bool
storeConfWipeMempool StoreConfig
cfg,
      blockConfSyncMempool :: Bool
blockConfSyncMempool = StoreConfig -> Bool
storeConfSyncMempool StoreConfig
cfg,
      blockConfPeerTimeout :: NominalDiffTime
blockConfPeerTimeout = StoreConfig -> NominalDiffTime
storeConfPeerTimeout StoreConfig
cfg,
      blockConfStats :: Maybe Store
blockConfStats = StoreConfig -> Maybe Store
storeConfStats StoreConfig
cfg
    }

nodeCfg ::
  StoreConfig ->
  DatabaseReader ->
  Publisher NodeEvent ->
  NodeConfig
nodeCfg :: StoreConfig -> DatabaseReader -> Publisher NodeEvent -> NodeConfig
nodeCfg StoreConfig
cfg DatabaseReader
db Publisher NodeEvent
pub =
  NodeConfig
    { nodeConfMaxPeers :: Int
nodeConfMaxPeers = StoreConfig -> Int
storeConfMaxPeers StoreConfig
cfg,
      nodeConfDB :: DB
nodeConfDB = DatabaseReader -> DB
databaseHandle DatabaseReader
db,
      nodeConfColumnFamily :: Maybe ColumnFamily
nodeConfColumnFamily = forall a. Maybe a
Nothing,
      nodeConfPeers :: [String]
nodeConfPeers = StoreConfig -> [String]
storeConfInitPeers StoreConfig
cfg,
      nodeConfDiscover :: Bool
nodeConfDiscover = StoreConfig -> Bool
storeConfDiscover StoreConfig
cfg,
      nodeConfEvents :: Publisher NodeEvent
nodeConfEvents = Publisher NodeEvent
pub,
      nodeConfNetAddr :: NetworkAddress
nodeConfNetAddr =
        Word64 -> HostAddress -> NetworkAddress
NetworkAddress
          Word64
0
          (SockAddr -> HostAddress
sockToHostAddress (PortNumber -> HostAddress -> SockAddr
SockAddrInet PortNumber
0 HostAddress
0)),
      nodeConfNet :: Network
nodeConfNet = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg,
      nodeConfTimeout :: NominalDiffTime
nodeConfTimeout = StoreConfig -> NominalDiffTime
storeConfPeerTimeout StoreConfig
cfg,
      nodeConfPeerMaxLife :: NominalDiffTime
nodeConfPeerMaxLife = StoreConfig -> NominalDiffTime
storeConfPeerMaxLife StoreConfig
cfg,
      nodeConfConnect :: SockAddr -> WithConnection
nodeConfConnect = StoreConfig -> SockAddr -> WithConnection
storeConfConnect StoreConfig
cfg
    }

withCache ::
  (MonadUnliftIO m, MonadLoggerIO m) =>
  StoreConfig ->
  Chain ->
  DatabaseReader ->
  Publisher StoreEvent ->
  (Maybe CacheConfig -> m a) ->
  m a
withCache :: forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache StoreConfig
cfg Chain
chain DatabaseReader
db Publisher StoreEvent
pub Maybe CacheConfig -> m a
action =
  case StoreConfig -> Maybe String
storeConfCache StoreConfig
cfg of
    Maybe String
Nothing ->
      Maybe CacheConfig -> m a
action forall a. Maybe a
Nothing
    Just String
redisurl ->
      forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM forall (m :: * -> *). MonadIO m => Store -> m CacheMetrics
newCacheMetrics (StoreConfig -> Maybe Store
storeConfStats StoreConfig
cfg) forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe CacheMetrics
metrics ->
        forall (m :: * -> *). MonadIO m => String -> m Connection
connectRedis String
redisurl forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Connection
conn ->
          forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher StoreEvent
pub forall a b. (a -> b) -> a -> b
$ \Inbox StoreEvent
evts ->
            let conf :: CacheConfig
conf = Connection -> Maybe CacheMetrics -> CacheConfig
c Connection
conn Maybe CacheMetrics
metrics
             in forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Inbox msg -> m ()) -> (Process msg -> m a) -> m a
withProcess (forall {m :: * -> *}.
(MonadUnliftIO m, MonadLoggerIO m) =>
CacheConfig -> CacheWriterInbox -> m ()
f CacheConfig
conf) forall a b. (a -> b) -> a -> b
$ \Process CacheWriterMessage
p ->
                  forall (m :: * -> *) a.
MonadUnliftIO m =>
Inbox StoreEvent -> CacheWriter -> m a -> m a
cacheWriterProcesses Inbox StoreEvent
evts (forall msg. Process msg -> Mailbox msg
getProcessMailbox Process CacheWriterMessage
p) forall a b. (a -> b) -> a -> b
$ do
                    Maybe CacheConfig -> m a
action (forall a. a -> Maybe a
Just CacheConfig
conf)
  where
    f :: CacheConfig -> CacheWriterInbox -> m ()
f CacheConfig
conf CacheWriterInbox
cwinbox = forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (forall (m :: * -> *).
(MonadUnliftIO m, MonadLoggerIO m, StoreReadExtra m) =>
CacheConfig -> CacheWriterInbox -> m ()
cacheWriter CacheConfig
conf CacheWriterInbox
cwinbox) DatabaseReader
db
    c :: Connection -> Maybe CacheMetrics -> CacheConfig
c Connection
conn Maybe CacheMetrics
metrics =
      CacheConfig
        { cacheConn :: Connection
cacheConn = Connection
conn,
          cacheMin :: Int
cacheMin = StoreConfig -> Int
storeConfCacheMin StoreConfig
cfg,
          cacheChain :: Chain
cacheChain = Chain
chain,
          cacheMax :: Integer
cacheMax = StoreConfig -> Integer
storeConfMaxKeys StoreConfig
cfg,
          cacheMetrics :: Maybe CacheMetrics
cacheMetrics = Maybe CacheMetrics
metrics
        }

cacheWriterProcesses ::
  MonadUnliftIO m =>
  Inbox StoreEvent ->
  CacheWriter ->
  m a ->
  m a
cacheWriterProcesses :: forall (m :: * -> *) a.
MonadUnliftIO m =>
Inbox StoreEvent -> CacheWriter -> m a -> m a
cacheWriterProcesses Inbox StoreEvent
evts CacheWriter
cwm m a
action =
  forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync m ()
events forall a b. (a -> b) -> a -> b
$ \Async ()
a1 -> forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a1 forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m a
action
  where
    events :: m ()
events = forall (m :: * -> *).
MonadIO m =>
Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents Inbox StoreEvent
evts CacheWriter
cwm

cacheWriterEvents :: MonadIO m => Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents :: forall (m :: * -> *).
MonadIO m =>
Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents Inbox StoreEvent
evts CacheWriter
cwm =
  forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$
    forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox StoreEvent
evts forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \StoreEvent
e ->
      StoreEvent
e forall (m :: * -> *).
MonadIO m =>
StoreEvent -> CacheWriter -> m ()
`cacheWriterDispatch` CacheWriter
cwm

cacheWriterDispatch :: MonadIO m => StoreEvent -> CacheWriter -> m ()
cacheWriterDispatch :: forall (m :: * -> *).
MonadIO m =>
StoreEvent -> CacheWriter -> m ()
cacheWriterDispatch (StoreBestBlock BlockHash
_) = forall (m :: * -> *). MonadIO m => CacheWriter -> m ()
cacheNewBlock
cacheWriterDispatch (StoreMempoolNew TxHash
t) = forall (m :: * -> *). MonadIO m => TxHash -> CacheWriter -> m ()
cacheNewTx TxHash
t
cacheWriterDispatch (StoreMempoolDelete TxHash
t) = forall (m :: * -> *). MonadIO m => TxHash -> CacheWriter -> m ()
cacheNewTx TxHash
t
cacheWriterDispatch StoreEvent
_ = forall a b. a -> b -> a
const (forall (m :: * -> *) a. Monad m => a -> m a
return ())

nodeForwarder ::
  MonadIO m =>
  BlockStore ->
  Publisher StoreEvent ->
  Inbox NodeEvent ->
  m ()
nodeForwarder :: forall (m :: * -> *).
MonadIO m =>
BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
nodeForwarder BlockStore
b Publisher StoreEvent
pub Inbox NodeEvent
sub =
  forall (f :: * -> *) a b. Applicative f => f a -> f b
forever forall a b. (a -> b) -> a -> b
$ forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox NodeEvent
sub forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically forall b c a. (b -> c) -> (a -> b) -> a -> c
. BlockStore -> Publisher StoreEvent -> NodeEvent -> STM ()
storeDispatch BlockStore
b Publisher StoreEvent
pub

-- | Dispatcher of node events.
storeDispatch ::
  BlockStore ->
  Publisher StoreEvent ->
  NodeEvent ->
  STM ()
storeDispatch :: BlockStore -> Publisher StoreEvent -> NodeEvent -> STM ()
storeDispatch BlockStore
b Publisher StoreEvent
pub (PeerEvent (PeerConnected Peer
p)) = do
  forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> StoreEvent
StorePeerConnected Peer
p) Publisher StoreEvent
pub
  Peer -> BlockStore -> STM ()
blockStorePeerConnectSTM Peer
p BlockStore
b
storeDispatch BlockStore
b Publisher StoreEvent
pub (PeerEvent (PeerDisconnected Peer
p)) = do
  forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> StoreEvent
StorePeerDisconnected Peer
p) Publisher StoreEvent
pub
  Peer -> BlockStore -> STM ()
blockStorePeerDisconnectSTM Peer
p BlockStore
b
storeDispatch BlockStore
b Publisher StoreEvent
_ (ChainEvent (ChainBestBlock BlockNode
bn)) =
  BlockNode -> BlockStore -> STM ()
blockStoreHeadSTM BlockNode
bn BlockStore
b
storeDispatch BlockStore
_ Publisher StoreEvent
_ (ChainEvent ChainEvent
_) =
  forall (m :: * -> *) a. Monad m => a -> m a
return ()
storeDispatch BlockStore
_ Publisher StoreEvent
pub (PeerMessage Peer
p (MPong (Pong Word64
n))) =
  forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> Word64 -> StoreEvent
StorePeerPong Peer
p Word64
n) Publisher StoreEvent
pub
storeDispatch BlockStore
b Publisher StoreEvent
_ (PeerMessage Peer
p (MBlock Block
block)) =
  Peer -> Block -> BlockStore -> STM ()
blockStoreBlockSTM Peer
p Block
block BlockStore
b
storeDispatch BlockStore
b Publisher StoreEvent
_ (PeerMessage Peer
p (MTx Tx
tx)) =
  Peer -> Tx -> BlockStore -> STM ()
blockStoreTxSTM Peer
p Tx
tx BlockStore
b
storeDispatch BlockStore
b Publisher StoreEvent
_ (PeerMessage Peer
p (MNotFound (NotFound [InvVector]
is))) = do
  let blocks :: [BlockHash]
blocks =
        [ Hash256 -> BlockHash
BlockHash Hash256
h
          | InvVector InvType
t Hash256
h <- [InvVector]
is,
            InvType
t forall a. Eq a => a -> a -> Bool
== InvType
InvBlock Bool -> Bool -> Bool
|| InvType
t forall a. Eq a => a -> a -> Bool
== InvType
InvWitnessBlock
        ]
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [BlockHash]
blocks) forall a b. (a -> b) -> a -> b
$ Peer -> [BlockHash] -> BlockStore -> STM ()
blockStoreNotFoundSTM Peer
p [BlockHash]
blocks BlockStore
b
storeDispatch BlockStore
b Publisher StoreEvent
pub (PeerMessage Peer
p (MInv (Inv [InvVector]
is))) = do
  let txs :: [TxHash]
txs = [Hash256 -> TxHash
TxHash Hash256
h | InvVector InvType
t Hash256
h <- [InvVector]
is, InvType
t forall a. Eq a => a -> a -> Bool
== InvType
InvTx Bool -> Bool -> Bool
|| InvType
t forall a. Eq a => a -> a -> Bool
== InvType
InvWitnessTx]
  forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> [TxHash] -> StoreEvent
StoreTxAnnounce Peer
p [TxHash]
txs) Publisher StoreEvent
pub
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (forall (t :: * -> *) a. Foldable t => t a -> Bool
null [TxHash]
txs) forall a b. (a -> b) -> a -> b
$ Peer -> [TxHash] -> BlockStore -> STM ()
blockStoreTxHashSTM Peer
p [TxHash]
txs BlockStore
b
storeDispatch BlockStore
_ Publisher StoreEvent
pub (PeerMessage Peer
p (MReject Reject
r)) =
  forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Reject -> MessageCommand
rejectMessage Reject
r forall a. Eq a => a -> a -> Bool
== MessageCommand
MCTx) forall a b. (a -> b) -> a -> b
$
    case forall a. Serialize a => ByteString -> Either String a
decode (Reject -> ByteString
rejectData Reject
r) of
      Left String
_ -> forall (m :: * -> *) a. Monad m => a -> m a
return ()
      Right TxHash
th ->
        let reject :: StoreEvent
reject =
              Peer -> TxHash -> RejectCode -> ByteString -> StoreEvent
StoreTxReject
                Peer
p
                TxHash
th
                (Reject -> RejectCode
rejectCode Reject
r)
                (VarString -> ByteString
getVarString (Reject -> VarString
rejectReason Reject
r))
         in forall msg. msg -> Publisher msg -> STM ()
publishSTM StoreEvent
reject Publisher StoreEvent
pub
storeDispatch BlockStore
_ Publisher StoreEvent
_ NodeEvent
_ =
  forall (m :: * -> *) a. Monad m => a -> m a
return ()