{-# LANGUAGE FlexibleContexts #-}
module Haskoin.Store.Manager
    ( StoreConfig(..)
    , Store(..)
    , withStore
    ) where

import           Control.Monad                 (forever, unless, when)
import           Control.Monad.Logger          (MonadLoggerIO)
import           Control.Monad.Reader          (ReaderT (ReaderT), runReaderT)
import           Data.Serialize                (decode)
import           Data.Time.Clock               (NominalDiffTime)
import           Data.Word                     (Word32)
import           Haskoin                       (BlockHash (..), Inv (..),
                                                InvType (..), InvVector (..),
                                                Message (..),
                                                MessageCommand (..), Network,
                                                NetworkAddress (..),
                                                NotFound (..), Pong (..),
                                                Reject (..), TxHash (..),
                                                VarString (..),
                                                sockToHostAddress)
import           Haskoin.Node                  (Chain, ChainEvent (..),
                                                HostPort, Node (..),
                                                NodeConfig (..), NodeEvent (..),
                                                PeerEvent (..), PeerManager,
                                                WithConnection, withNode)
import           Haskoin.Store.BlockStore      (BlockStore,
                                                BlockStoreConfig (..),
                                                blockStoreBlockSTM,
                                                blockStoreHeadSTM,
                                                blockStoreNotFoundSTM,
                                                blockStorePeerConnectSTM,
                                                blockStorePeerDisconnectSTM,
                                                blockStoreTxHashSTM,
                                                blockStoreTxSTM, withBlockStore)
import           Haskoin.Store.Cache           (CacheConfig (..), CacheWriter,
                                                cacheNewBlock, cacheWriter,
                                                connectRedis, newCacheMetrics)
import           Haskoin.Store.Common          (StoreEvent (..))
import           Haskoin.Store.Database.Reader (DatabaseReader (..),
                                                DatabaseReaderT,
                                                createDatabaseStats,
                                                withDatabaseReader)
import           NQE                           (Inbox, Process (..), Publisher,
                                                publishSTM, receive,
                                                withProcess, withPublisher,
                                                withSubscription)
import           Network.Socket                (SockAddr (..))
import qualified System.Metrics                as Metrics (Store)
import           UnliftIO                      (MonadIO, MonadUnliftIO, STM,
                                                atomically, link, withAsync)
import           UnliftIO.Concurrent           (threadDelay)

-- | Store mailboxes.
data Store =
    Store
        { Store -> PeerManager
storeManager   :: !PeerManager
        , Store -> Chain
storeChain     :: !Chain
        , Store -> BlockStore
storeBlock     :: !BlockStore
        , Store -> DatabaseReader
storeDB        :: !DatabaseReader
        , Store -> Maybe CacheConfig
storeCache     :: !(Maybe CacheConfig)
        , Store -> Publisher StoreEvent
storePublisher :: !(Publisher StoreEvent)
        , Store -> Network
storeNetwork   :: !Network
        }

-- | Configuration for a 'Store'.
data StoreConfig =
    StoreConfig
        { StoreConfig -> Int
storeConfMaxPeers        :: !Int
      -- ^ max peers to connect to
        , StoreConfig -> [HostPort]
storeConfInitPeers       :: ![HostPort]
      -- ^ static set of peers to connect to
        , StoreConfig -> Bool
storeConfDiscover        :: !Bool
      -- ^ discover new peers
        , StoreConfig -> FilePath
storeConfDB              :: !FilePath
      -- ^ RocksDB database path
        , StoreConfig -> Network
storeConfNetwork         :: !Network
      -- ^ network constants
        , StoreConfig -> Maybe FilePath
storeConfCache           :: !(Maybe String)
      -- ^ Redis cache configuration
        , StoreConfig -> Word32
storeConfInitialGap      :: !Word32
      -- ^ gap on extended public key with no transactions
        , StoreConfig -> Word32
storeConfGap             :: !Word32
      -- ^ gap for extended public keys
        , StoreConfig -> Int
storeConfCacheMin        :: !Int
      -- ^ cache xpubs with more than this many used addresses
        , StoreConfig -> Integer
storeConfMaxKeys         :: !Integer
      -- ^ maximum number of keys in Redis cache
        , StoreConfig -> Bool
storeConfNoMempool       :: !Bool
      -- ^ do not index new mempool transactions
        , StoreConfig -> Bool
storeConfWipeMempool     :: !Bool
      -- ^ wipe mempool when starting
        , StoreConfig -> Bool
storeConfSyncMempool     :: !Bool
      -- ^ sync mempool from peers
        , StoreConfig -> NominalDiffTime
storeConfPeerTimeout     :: !NominalDiffTime
      -- ^ disconnect peer if message not received for this many seconds
        , StoreConfig -> NominalDiffTime
storeConfPeerMaxLife     :: !NominalDiffTime
      -- ^ disconnect peer if it has been connected this long
        , StoreConfig -> SockAddr -> WithConnection
storeConfConnect         :: !(SockAddr -> WithConnection)
      -- ^ connect to peers using the function 'withConnection'
        , StoreConfig -> Int
storeConfCacheRefresh    :: !Int
      -- ^ refresh the cache this often (milliseconds)
        , StoreConfig -> Int
storeConfCacheRetryDelay :: !Int
      -- ^ delay in microseconds to retry getting cache lock
        , StoreConfig -> Maybe Store
storeConfStats           :: !(Maybe Metrics.Store)
      -- ^ stats store
        }

withStore :: (MonadLoggerIO m, MonadUnliftIO m)
          => StoreConfig -> (Store -> m a) -> m a
withStore :: StoreConfig -> (Store -> m a) -> m a
withStore StoreConfig
cfg Store -> m a
action =
    StoreConfig -> DatabaseReaderT m a -> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
StoreConfig -> DatabaseReaderT m a -> m a
connectDB StoreConfig
cfg (DatabaseReaderT m a -> m a) -> DatabaseReaderT m a -> m a
forall a b. (a -> b) -> a -> b
$ (DatabaseReader -> m a) -> DatabaseReaderT m a
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT ((DatabaseReader -> m a) -> DatabaseReaderT m a)
-> (DatabaseReader -> m a) -> DatabaseReaderT m a
forall a b. (a -> b) -> a -> b
$ \DatabaseReader
db ->
    (Publisher StoreEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher StoreEvent -> m a) -> m a)
-> (Publisher StoreEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Publisher StoreEvent
pub ->
    (Publisher NodeEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher NodeEvent -> m a) -> m a)
-> (Publisher NodeEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Publisher NodeEvent
node_pub ->
    Publisher NodeEvent -> (Inbox NodeEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher NodeEvent
node_pub ((Inbox NodeEvent -> m a) -> m a)
-> (Inbox NodeEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Inbox NodeEvent
node_sub ->
    NodeConfig -> (Node -> m a) -> m a
forall (m :: * -> *) a.
(MonadLoggerIO m, MonadUnliftIO m) =>
NodeConfig -> (Node -> m a) -> m a
withNode (StoreConfig -> DatabaseReader -> Publisher NodeEvent -> NodeConfig
nodeCfg StoreConfig
cfg DatabaseReader
db Publisher NodeEvent
node_pub) ((Node -> m a) -> m a) -> (Node -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Node
node ->
    StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache StoreConfig
cfg (Node -> Chain
nodeChain Node
node) DatabaseReader
db Publisher StoreEvent
pub ((Maybe CacheConfig -> m a) -> m a)
-> (Maybe CacheConfig -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Maybe CacheConfig
mcache ->
    BlockStoreConfig -> (BlockStore -> m a) -> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
BlockStoreConfig -> (BlockStore -> m a) -> m a
withBlockStore (StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg StoreConfig
cfg Node
node Publisher StoreEvent
pub DatabaseReader
db) ((BlockStore -> m a) -> m a) -> (BlockStore -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \BlockStore
b ->
    m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
forall (m :: * -> *).
MonadIO m =>
BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
nodeForwarder BlockStore
b Publisher StoreEvent
pub Inbox NodeEvent
node_sub) ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async ()
a1 -> Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a1 m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
    Store -> m a
action Store :: PeerManager
-> Chain
-> BlockStore
-> DatabaseReader
-> Maybe CacheConfig
-> Publisher StoreEvent
-> Network
-> Store
Store { storeManager :: PeerManager
storeManager = Node -> PeerManager
nodeManager Node
node
                 , storeChain :: Chain
storeChain = Node -> Chain
nodeChain Node
node
                 , storeBlock :: BlockStore
storeBlock = BlockStore
b
                 , storeDB :: DatabaseReader
storeDB = DatabaseReader
db
                 , storeCache :: Maybe CacheConfig
storeCache = Maybe CacheConfig
mcache
                 , storePublisher :: Publisher StoreEvent
storePublisher = Publisher StoreEvent
pub
                 , storeNetwork :: Network
storeNetwork = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
                 }

connectDB :: MonadUnliftIO m => StoreConfig -> DatabaseReaderT m a -> m a
connectDB :: StoreConfig -> DatabaseReaderT m a -> m a
connectDB StoreConfig
cfg DatabaseReaderT m a
f = do
    Maybe DatabaseStats
stats <- (Store -> m DatabaseStats)
-> Maybe Store -> m (Maybe DatabaseStats)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM Store -> m DatabaseStats
forall (m :: * -> *). MonadIO m => Store -> m DatabaseStats
createDatabaseStats (StoreConfig -> Maybe Store
storeConfStats StoreConfig
cfg)
    Network
-> Word32
-> Word32
-> FilePath
-> Maybe DatabaseStats
-> DatabaseReaderT m a
-> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
Network
-> Word32
-> Word32
-> FilePath
-> Maybe DatabaseStats
-> DatabaseReaderT m a
-> m a
withDatabaseReader
          (StoreConfig -> Network
storeConfNetwork StoreConfig
cfg)
          (StoreConfig -> Word32
storeConfInitialGap StoreConfig
cfg)
          (StoreConfig -> Word32
storeConfGap StoreConfig
cfg)
          (StoreConfig -> FilePath
storeConfDB StoreConfig
cfg)
          Maybe DatabaseStats
stats
          DatabaseReaderT m a
f

blockStoreCfg :: StoreConfig
              -> Node
              -> Publisher StoreEvent
              -> DatabaseReader
              -> BlockStoreConfig
blockStoreCfg :: StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg StoreConfig
cfg Node
node Publisher StoreEvent
pub DatabaseReader
db =
    BlockStoreConfig :: PeerManager
-> Chain
-> Publisher StoreEvent
-> DatabaseReader
-> Network
-> Bool
-> Bool
-> Bool
-> NominalDiffTime
-> Maybe Store
-> BlockStoreConfig
BlockStoreConfig
        { blockConfChain :: Chain
blockConfChain = Node -> Chain
nodeChain Node
node
        , blockConfManager :: PeerManager
blockConfManager = Node -> PeerManager
nodeManager Node
node
        , blockConfListener :: Publisher StoreEvent
blockConfListener = Publisher StoreEvent
pub
        , blockConfDB :: DatabaseReader
blockConfDB = DatabaseReader
db
        , blockConfNet :: Network
blockConfNet = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
        , blockConfNoMempool :: Bool
blockConfNoMempool = StoreConfig -> Bool
storeConfNoMempool StoreConfig
cfg
        , blockConfWipeMempool :: Bool
blockConfWipeMempool = StoreConfig -> Bool
storeConfWipeMempool StoreConfig
cfg
        , blockConfSyncMempool :: Bool
blockConfSyncMempool = StoreConfig -> Bool
storeConfSyncMempool StoreConfig
cfg
        , blockConfPeerTimeout :: NominalDiffTime
blockConfPeerTimeout = StoreConfig -> NominalDiffTime
storeConfPeerTimeout StoreConfig
cfg
        , blockConfStats :: Maybe Store
blockConfStats = StoreConfig -> Maybe Store
storeConfStats StoreConfig
cfg
        }

nodeCfg :: StoreConfig
        -> DatabaseReader
        -> Publisher NodeEvent
        -> NodeConfig
nodeCfg :: StoreConfig -> DatabaseReader -> Publisher NodeEvent -> NodeConfig
nodeCfg StoreConfig
cfg DatabaseReader
db Publisher NodeEvent
pub =
    NodeConfig :: Int
-> DB
-> Maybe ColumnFamily
-> [HostPort]
-> Bool
-> NetworkAddress
-> Network
-> Publisher NodeEvent
-> NominalDiffTime
-> NominalDiffTime
-> (SockAddr -> WithConnection)
-> NodeConfig
NodeConfig
        { nodeConfMaxPeers :: Int
nodeConfMaxPeers = StoreConfig -> Int
storeConfMaxPeers StoreConfig
cfg
        , nodeConfDB :: DB
nodeConfDB = DatabaseReader -> DB
databaseHandle DatabaseReader
db
        , nodeConfColumnFamily :: Maybe ColumnFamily
nodeConfColumnFamily = Maybe ColumnFamily
forall a. Maybe a
Nothing
        , nodeConfPeers :: [HostPort]
nodeConfPeers = StoreConfig -> [HostPort]
storeConfInitPeers StoreConfig
cfg
        , nodeConfDiscover :: Bool
nodeConfDiscover = StoreConfig -> Bool
storeConfDiscover StoreConfig
cfg
        , nodeConfEvents :: Publisher NodeEvent
nodeConfEvents = Publisher NodeEvent
pub
        , nodeConfNetAddr :: NetworkAddress
nodeConfNetAddr =
                Word64 -> HostAddress -> NetworkAddress
NetworkAddress
                Word64
0
                (SockAddr -> HostAddress
sockToHostAddress (PortNumber -> Word32 -> SockAddr
SockAddrInet PortNumber
0 Word32
0))
        , nodeConfNet :: Network
nodeConfNet = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
        , nodeConfTimeout :: NominalDiffTime
nodeConfTimeout = StoreConfig -> NominalDiffTime
storeConfPeerTimeout StoreConfig
cfg
        , nodeConfPeerMaxLife :: NominalDiffTime
nodeConfPeerMaxLife = StoreConfig -> NominalDiffTime
storeConfPeerMaxLife StoreConfig
cfg
        , nodeConfConnect :: SockAddr -> WithConnection
nodeConfConnect = StoreConfig -> SockAddr -> WithConnection
storeConfConnect StoreConfig
cfg
        }

withCache :: (MonadUnliftIO m, MonadLoggerIO m)
          => StoreConfig
          -> Chain
          -> DatabaseReader
          -> Publisher StoreEvent
          -> (Maybe CacheConfig -> m a)
          -> m a
withCache :: StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache StoreConfig
cfg Chain
chain DatabaseReader
db Publisher StoreEvent
pub Maybe CacheConfig -> m a
action =
    case StoreConfig -> Maybe FilePath
storeConfCache StoreConfig
cfg of
        Maybe FilePath
Nothing ->
            Maybe CacheConfig -> m a
action Maybe CacheConfig
forall a. Maybe a
Nothing
        Just FilePath
redisurl ->
            (Store -> m CacheMetrics) -> Maybe Store -> m (Maybe CacheMetrics)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM Store -> m CacheMetrics
forall (m :: * -> *). MonadIO m => Store -> m CacheMetrics
newCacheMetrics (StoreConfig -> Maybe Store
storeConfStats StoreConfig
cfg) m (Maybe CacheMetrics) -> (Maybe CacheMetrics -> m a) -> m a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe CacheMetrics
metrics ->
            FilePath -> m Connection
forall (m :: * -> *). MonadIO m => FilePath -> m Connection
connectRedis FilePath
redisurl m Connection -> (Connection -> m a) -> m a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Connection
conn ->
            Publisher StoreEvent -> (Inbox StoreEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher StoreEvent
pub ((Inbox StoreEvent -> m a) -> m a)
-> (Inbox StoreEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Inbox StoreEvent
evts ->
            let conf :: CacheConfig
conf = Connection -> Maybe CacheMetrics -> CacheConfig
c Connection
conn Maybe CacheMetrics
metrics
            in  (Inbox CacheWriterMessage -> m ())
-> (Process CacheWriterMessage -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Inbox msg -> m ()) -> (Process msg -> m a) -> m a
withProcess (CacheConfig -> Inbox CacheWriterMessage -> m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadLoggerIO m) =>
CacheConfig -> Inbox CacheWriterMessage -> m ()
f CacheConfig
conf) ((Process CacheWriterMessage -> m a) -> m a)
-> (Process CacheWriterMessage -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Process CacheWriterMessage
p ->
                Int -> Inbox StoreEvent -> CacheWriter -> m a -> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> Inbox StoreEvent -> CacheWriter -> m a -> m a
cacheWriterProcesses Int
crefresh Inbox StoreEvent
evts (Process CacheWriterMessage -> CacheWriter
forall msg. Process msg -> Mailbox msg
getProcessMailbox Process CacheWriterMessage
p) (m a -> m a) -> m a -> m a
forall a b. (a -> b) -> a -> b
$ do
                Maybe CacheConfig -> m a
action (CacheConfig -> Maybe CacheConfig
forall a. a -> Maybe a
Just CacheConfig
conf)
  where
    crefresh :: Int
crefresh = StoreConfig -> Int
storeConfCacheRefresh StoreConfig
cfg
    f :: CacheConfig -> Inbox CacheWriterMessage -> m ()
f CacheConfig
conf Inbox CacheWriterMessage
cwinbox = ReaderT DatabaseReader m () -> DatabaseReader -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (CacheConfig
-> Inbox CacheWriterMessage -> ReaderT DatabaseReader m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadLoggerIO m, StoreReadExtra m) =>
CacheConfig -> Inbox CacheWriterMessage -> m ()
cacheWriter CacheConfig
conf Inbox CacheWriterMessage
cwinbox) DatabaseReader
db
    c :: Connection -> Maybe CacheMetrics -> CacheConfig
c Connection
conn Maybe CacheMetrics
metrics =
        CacheConfig :: Connection
-> Int
-> Integer
-> Chain
-> Int
-> Int
-> Maybe CacheMetrics
-> CacheConfig
CacheConfig
           { cacheConn :: Connection
cacheConn = Connection
conn
           , cacheMin :: Int
cacheMin = StoreConfig -> Int
storeConfCacheMin StoreConfig
cfg
           , cacheChain :: Chain
cacheChain = Chain
chain
           , cacheMax :: Integer
cacheMax = StoreConfig -> Integer
storeConfMaxKeys StoreConfig
cfg
           , cacheRefresh :: Int
cacheRefresh = StoreConfig -> Int
storeConfCacheRefresh StoreConfig
cfg
           , cacheRetryDelay :: Int
cacheRetryDelay = StoreConfig -> Int
storeConfCacheRetryDelay StoreConfig
cfg
           , cacheMetrics :: Maybe CacheMetrics
cacheMetrics = Maybe CacheMetrics
metrics
           }

cacheWriterProcesses :: MonadUnliftIO m
                     => Int
                     -> Inbox StoreEvent
                     -> CacheWriter
                     -> m a
                     -> m a
cacheWriterProcesses :: Int -> Inbox StoreEvent -> CacheWriter -> m a -> m a
cacheWriterProcesses Int
crefresh Inbox StoreEvent
evts CacheWriter
cwm m a
action =
    m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync m ()
events ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async ()
a1 -> Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a1 m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m a
action
  where
    events :: m ()
events = Inbox StoreEvent -> CacheWriter -> m ()
forall (m :: * -> *).
MonadIO m =>
Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents Inbox StoreEvent
evts CacheWriter
cwm

cacheWriterEvents :: MonadIO m => Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents :: Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents Inbox StoreEvent
evts CacheWriter
cwm =
    m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
    Inbox StoreEvent -> m StoreEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox StoreEvent
evts m StoreEvent -> (StoreEvent -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \StoreEvent
e ->
    StoreEvent
e StoreEvent -> CacheWriter -> m ()
forall (m :: * -> *).
MonadIO m =>
StoreEvent -> CacheWriter -> m ()
`cacheWriterDispatch` CacheWriter
cwm

cacheWriterDispatch :: MonadIO m => StoreEvent -> CacheWriter -> m ()
cacheWriterDispatch :: StoreEvent -> CacheWriter -> m ()
cacheWriterDispatch (StoreBestBlock BlockHash
_) = CacheWriter -> m ()
forall (m :: * -> *). MonadIO m => CacheWriter -> m ()
cacheNewBlock
cacheWriterDispatch StoreEvent
_                  = m () -> CacheWriter -> m ()
forall a b. a -> b -> a
const (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())

nodeForwarder :: MonadIO m
              => BlockStore
              -> Publisher StoreEvent
              -> Inbox NodeEvent
              -> m ()
nodeForwarder :: BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
nodeForwarder BlockStore
b Publisher StoreEvent
pub Inbox NodeEvent
sub =
    m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Inbox NodeEvent -> m NodeEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox NodeEvent
sub m NodeEvent -> (NodeEvent -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> (NodeEvent -> STM ()) -> NodeEvent -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BlockStore -> Publisher StoreEvent -> NodeEvent -> STM ()
storeDispatch BlockStore
b Publisher StoreEvent
pub

-- | Dispatcher of node events.
storeDispatch :: BlockStore
              -> Publisher StoreEvent
              -> NodeEvent
              -> STM ()

storeDispatch :: BlockStore -> Publisher StoreEvent -> NodeEvent -> STM ()
storeDispatch BlockStore
b Publisher StoreEvent
pub (PeerEvent (PeerConnected Peer
p)) = do
    StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> StoreEvent
StorePeerConnected Peer
p) Publisher StoreEvent
pub
    Peer -> BlockStore -> STM ()
blockStorePeerConnectSTM Peer
p BlockStore
b

storeDispatch BlockStore
b Publisher StoreEvent
pub (PeerEvent (PeerDisconnected Peer
p)) = do
    StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> StoreEvent
StorePeerDisconnected Peer
p) Publisher StoreEvent
pub
    Peer -> BlockStore -> STM ()
blockStorePeerDisconnectSTM Peer
p BlockStore
b

storeDispatch BlockStore
b Publisher StoreEvent
_ (ChainEvent (ChainBestBlock BlockNode
bn)) =
    BlockNode -> BlockStore -> STM ()
blockStoreHeadSTM BlockNode
bn BlockStore
b

storeDispatch BlockStore
_ Publisher StoreEvent
_ (ChainEvent ChainEvent
_) =
    () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

storeDispatch BlockStore
_ Publisher StoreEvent
pub (PeerMessage Peer
p (MPong (Pong Word64
n))) =
    StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> Word64 -> StoreEvent
StorePeerPong Peer
p Word64
n) Publisher StoreEvent
pub

storeDispatch BlockStore
b Publisher StoreEvent
_ (PeerMessage Peer
p (MBlock Block
block)) =
    Peer -> Block -> BlockStore -> STM ()
blockStoreBlockSTM Peer
p Block
block BlockStore
b

storeDispatch BlockStore
b Publisher StoreEvent
_ (PeerMessage Peer
p (MTx Tx
tx)) =
    Peer -> Tx -> BlockStore -> STM ()
blockStoreTxSTM Peer
p Tx
tx BlockStore
b

storeDispatch BlockStore
b Publisher StoreEvent
_ (PeerMessage Peer
p (MNotFound (NotFound [InvVector]
is))) = do
    let blocks :: [BlockHash]
blocks =
            [ Hash256 -> BlockHash
BlockHash Hash256
h
            | InvVector InvType
t Hash256
h <- [InvVector]
is
            , InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvBlock Bool -> Bool -> Bool
|| InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvWitnessBlock
            ]
    Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([BlockHash] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [BlockHash]
blocks) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ Peer -> [BlockHash] -> BlockStore -> STM ()
blockStoreNotFoundSTM Peer
p [BlockHash]
blocks BlockStore
b

storeDispatch BlockStore
b Publisher StoreEvent
pub (PeerMessage Peer
p (MInv (Inv [InvVector]
is))) = do
    let txs :: [TxHash]
txs = [Hash256 -> TxHash
TxHash Hash256
h | InvVector InvType
t Hash256
h <- [InvVector]
is, InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvTx Bool -> Bool -> Bool
|| InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvWitnessTx]
    StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> [TxHash] -> StoreEvent
StoreTxAvailable Peer
p [TxHash]
txs) Publisher StoreEvent
pub
    Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([TxHash] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [TxHash]
txs) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ Peer -> [TxHash] -> BlockStore -> STM ()
blockStoreTxHashSTM Peer
p [TxHash]
txs BlockStore
b

storeDispatch BlockStore
_ Publisher StoreEvent
pub (PeerMessage Peer
p (MReject Reject
r)) =
    Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Reject -> MessageCommand
rejectMessage Reject
r MessageCommand -> MessageCommand -> Bool
forall a. Eq a => a -> a -> Bool
== MessageCommand
MCTx) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
    case ByteString -> Either FilePath TxHash
forall a. Serialize a => ByteString -> Either FilePath a
decode (Reject -> ByteString
rejectData Reject
r) of
        Left FilePath
_ -> () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Right TxHash
th ->
            let reject :: StoreEvent
reject =
                    Peer -> TxHash -> RejectCode -> ByteString -> StoreEvent
StoreTxReject
                    Peer
p
                    TxHash
th
                    (Reject -> RejectCode
rejectCode Reject
r)
                    (VarString -> ByteString
getVarString (Reject -> VarString
rejectReason Reject
r))
            in StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM StoreEvent
reject Publisher StoreEvent
pub

storeDispatch BlockStore
_ Publisher StoreEvent
_ NodeEvent
_ =
    () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()