{-# LANGUAGE FlexibleContexts #-}
module Haskoin.Store.Manager
( StoreConfig(..)
, Store(..)
, withStore
) where
import Control.Monad (forever, unless, when)
import Control.Monad.Logger (MonadLoggerIO)
import Control.Monad.Reader (ReaderT (ReaderT), runReaderT)
import Data.Serialize (decode)
import Data.Time.Clock (NominalDiffTime)
import Data.Word (Word32)
import Haskoin (BlockHash (..), Inv (..),
InvType (..), InvVector (..),
Message (..),
MessageCommand (..), Network,
NetworkAddress (..),
NotFound (..), Pong (..),
Reject (..), TxHash (..),
VarString (..),
sockToHostAddress)
import Haskoin.Node (Chain, ChainEvent (..),
HostPort, Node (..),
NodeConfig (..), NodeEvent (..),
PeerEvent (..), PeerManager,
WithConnection, withNode)
import Haskoin.Store.BlockStore (BlockStore,
BlockStoreConfig (..),
blockStoreBlockSTM,
blockStoreHeadSTM,
blockStoreNotFoundSTM,
blockStorePeerConnectSTM,
blockStorePeerDisconnectSTM,
blockStoreTxHashSTM,
blockStoreTxSTM, withBlockStore)
import Haskoin.Store.Cache (CacheConfig (..), CacheWriter,
cacheNewBlock, cacheWriter,
connectRedis, newCacheMetrics)
import Haskoin.Store.Common (StoreEvent (..))
import Haskoin.Store.Database.Reader (DatabaseReader (..),
DatabaseReaderT,
createDatabaseStats,
withDatabaseReader)
import NQE (Inbox, Process (..), Publisher,
publishSTM, receive,
withProcess, withPublisher,
withSubscription)
import Network.Socket (SockAddr (..))
import qualified System.Metrics as Metrics (Store)
import UnliftIO (MonadIO, MonadUnliftIO, STM,
atomically, link, withAsync)
import UnliftIO.Concurrent (threadDelay)
data Store =
Store
{ Store -> PeerManager
storeManager :: !PeerManager
, Store -> Chain
storeChain :: !Chain
, Store -> BlockStore
storeBlock :: !BlockStore
, Store -> DatabaseReader
storeDB :: !DatabaseReader
, Store -> Maybe CacheConfig
storeCache :: !(Maybe CacheConfig)
, Store -> Publisher StoreEvent
storePublisher :: !(Publisher StoreEvent)
, Store -> Network
storeNetwork :: !Network
}
data StoreConfig =
StoreConfig
{ StoreConfig -> Int
storeConfMaxPeers :: !Int
, StoreConfig -> [HostPort]
storeConfInitPeers :: ![HostPort]
, StoreConfig -> Bool
storeConfDiscover :: !Bool
, StoreConfig -> FilePath
storeConfDB :: !FilePath
, StoreConfig -> Network
storeConfNetwork :: !Network
, StoreConfig -> Maybe FilePath
storeConfCache :: !(Maybe String)
, StoreConfig -> Word32
storeConfInitialGap :: !Word32
, StoreConfig -> Word32
storeConfGap :: !Word32
, StoreConfig -> Int
storeConfCacheMin :: !Int
, StoreConfig -> Integer
storeConfMaxKeys :: !Integer
, StoreConfig -> Bool
storeConfNoMempool :: !Bool
, StoreConfig -> Bool
storeConfWipeMempool :: !Bool
, StoreConfig -> Bool
storeConfSyncMempool :: !Bool
, StoreConfig -> NominalDiffTime
storeConfPeerTimeout :: !NominalDiffTime
, StoreConfig -> NominalDiffTime
storeConfPeerMaxLife :: !NominalDiffTime
, StoreConfig -> SockAddr -> WithConnection
storeConfConnect :: !(SockAddr -> WithConnection)
, StoreConfig -> Int
storeConfCacheRefresh :: !Int
, StoreConfig -> Int
storeConfCacheRetryDelay :: !Int
, StoreConfig -> Maybe Store
storeConfStats :: !(Maybe Metrics.Store)
}
withStore :: (MonadLoggerIO m, MonadUnliftIO m)
=> StoreConfig -> (Store -> m a) -> m a
withStore :: StoreConfig -> (Store -> m a) -> m a
withStore StoreConfig
cfg Store -> m a
action =
StoreConfig -> DatabaseReaderT m a -> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
StoreConfig -> DatabaseReaderT m a -> m a
connectDB StoreConfig
cfg (DatabaseReaderT m a -> m a) -> DatabaseReaderT m a -> m a
forall a b. (a -> b) -> a -> b
$ (DatabaseReader -> m a) -> DatabaseReaderT m a
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT ((DatabaseReader -> m a) -> DatabaseReaderT m a)
-> (DatabaseReader -> m a) -> DatabaseReaderT m a
forall a b. (a -> b) -> a -> b
$ \DatabaseReader
db ->
(Publisher StoreEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher StoreEvent -> m a) -> m a)
-> (Publisher StoreEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Publisher StoreEvent
pub ->
(Publisher NodeEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher NodeEvent -> m a) -> m a)
-> (Publisher NodeEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Publisher NodeEvent
node_pub ->
Publisher NodeEvent -> (Inbox NodeEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher NodeEvent
node_pub ((Inbox NodeEvent -> m a) -> m a)
-> (Inbox NodeEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Inbox NodeEvent
node_sub ->
NodeConfig -> (Node -> m a) -> m a
forall (m :: * -> *) a.
(MonadLoggerIO m, MonadUnliftIO m) =>
NodeConfig -> (Node -> m a) -> m a
withNode (StoreConfig -> DatabaseReader -> Publisher NodeEvent -> NodeConfig
nodeCfg StoreConfig
cfg DatabaseReader
db Publisher NodeEvent
node_pub) ((Node -> m a) -> m a) -> (Node -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Node
node ->
StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache StoreConfig
cfg (Node -> Chain
nodeChain Node
node) DatabaseReader
db Publisher StoreEvent
pub ((Maybe CacheConfig -> m a) -> m a)
-> (Maybe CacheConfig -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Maybe CacheConfig
mcache ->
BlockStoreConfig -> (BlockStore -> m a) -> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
BlockStoreConfig -> (BlockStore -> m a) -> m a
withBlockStore (StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg StoreConfig
cfg Node
node Publisher StoreEvent
pub DatabaseReader
db) ((BlockStore -> m a) -> m a) -> (BlockStore -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \BlockStore
b ->
m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
forall (m :: * -> *).
MonadIO m =>
BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
nodeForwarder BlockStore
b Publisher StoreEvent
pub Inbox NodeEvent
node_sub) ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async ()
a1 -> Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a1 m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
Store -> m a
action Store :: PeerManager
-> Chain
-> BlockStore
-> DatabaseReader
-> Maybe CacheConfig
-> Publisher StoreEvent
-> Network
-> Store
Store { storeManager :: PeerManager
storeManager = Node -> PeerManager
nodeManager Node
node
, storeChain :: Chain
storeChain = Node -> Chain
nodeChain Node
node
, storeBlock :: BlockStore
storeBlock = BlockStore
b
, storeDB :: DatabaseReader
storeDB = DatabaseReader
db
, storeCache :: Maybe CacheConfig
storeCache = Maybe CacheConfig
mcache
, storePublisher :: Publisher StoreEvent
storePublisher = Publisher StoreEvent
pub
, storeNetwork :: Network
storeNetwork = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
}
connectDB :: MonadUnliftIO m => StoreConfig -> DatabaseReaderT m a -> m a
connectDB :: StoreConfig -> DatabaseReaderT m a -> m a
connectDB StoreConfig
cfg DatabaseReaderT m a
f = do
Maybe DatabaseStats
stats <- (Store -> m DatabaseStats)
-> Maybe Store -> m (Maybe DatabaseStats)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM Store -> m DatabaseStats
forall (m :: * -> *). MonadIO m => Store -> m DatabaseStats
createDatabaseStats (StoreConfig -> Maybe Store
storeConfStats StoreConfig
cfg)
Network
-> Word32
-> Word32
-> FilePath
-> Maybe DatabaseStats
-> DatabaseReaderT m a
-> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
Network
-> Word32
-> Word32
-> FilePath
-> Maybe DatabaseStats
-> DatabaseReaderT m a
-> m a
withDatabaseReader
(StoreConfig -> Network
storeConfNetwork StoreConfig
cfg)
(StoreConfig -> Word32
storeConfInitialGap StoreConfig
cfg)
(StoreConfig -> Word32
storeConfGap StoreConfig
cfg)
(StoreConfig -> FilePath
storeConfDB StoreConfig
cfg)
Maybe DatabaseStats
stats
DatabaseReaderT m a
f
blockStoreCfg :: StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg :: StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg StoreConfig
cfg Node
node Publisher StoreEvent
pub DatabaseReader
db =
BlockStoreConfig :: PeerManager
-> Chain
-> Publisher StoreEvent
-> DatabaseReader
-> Network
-> Bool
-> Bool
-> Bool
-> NominalDiffTime
-> Maybe Store
-> BlockStoreConfig
BlockStoreConfig
{ blockConfChain :: Chain
blockConfChain = Node -> Chain
nodeChain Node
node
, blockConfManager :: PeerManager
blockConfManager = Node -> PeerManager
nodeManager Node
node
, blockConfListener :: Publisher StoreEvent
blockConfListener = Publisher StoreEvent
pub
, blockConfDB :: DatabaseReader
blockConfDB = DatabaseReader
db
, blockConfNet :: Network
blockConfNet = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
, blockConfNoMempool :: Bool
blockConfNoMempool = StoreConfig -> Bool
storeConfNoMempool StoreConfig
cfg
, blockConfWipeMempool :: Bool
blockConfWipeMempool = StoreConfig -> Bool
storeConfWipeMempool StoreConfig
cfg
, blockConfSyncMempool :: Bool
blockConfSyncMempool = StoreConfig -> Bool
storeConfSyncMempool StoreConfig
cfg
, blockConfPeerTimeout :: NominalDiffTime
blockConfPeerTimeout = StoreConfig -> NominalDiffTime
storeConfPeerTimeout StoreConfig
cfg
, blockConfStats :: Maybe Store
blockConfStats = StoreConfig -> Maybe Store
storeConfStats StoreConfig
cfg
}
nodeCfg :: StoreConfig
-> DatabaseReader
-> Publisher NodeEvent
-> NodeConfig
nodeCfg :: StoreConfig -> DatabaseReader -> Publisher NodeEvent -> NodeConfig
nodeCfg StoreConfig
cfg DatabaseReader
db Publisher NodeEvent
pub =
NodeConfig :: Int
-> DB
-> Maybe ColumnFamily
-> [HostPort]
-> Bool
-> NetworkAddress
-> Network
-> Publisher NodeEvent
-> NominalDiffTime
-> NominalDiffTime
-> (SockAddr -> WithConnection)
-> NodeConfig
NodeConfig
{ nodeConfMaxPeers :: Int
nodeConfMaxPeers = StoreConfig -> Int
storeConfMaxPeers StoreConfig
cfg
, nodeConfDB :: DB
nodeConfDB = DatabaseReader -> DB
databaseHandle DatabaseReader
db
, nodeConfColumnFamily :: Maybe ColumnFamily
nodeConfColumnFamily = Maybe ColumnFamily
forall a. Maybe a
Nothing
, nodeConfPeers :: [HostPort]
nodeConfPeers = StoreConfig -> [HostPort]
storeConfInitPeers StoreConfig
cfg
, nodeConfDiscover :: Bool
nodeConfDiscover = StoreConfig -> Bool
storeConfDiscover StoreConfig
cfg
, nodeConfEvents :: Publisher NodeEvent
nodeConfEvents = Publisher NodeEvent
pub
, nodeConfNetAddr :: NetworkAddress
nodeConfNetAddr =
Word64 -> HostAddress -> NetworkAddress
NetworkAddress
Word64
0
(SockAddr -> HostAddress
sockToHostAddress (PortNumber -> Word32 -> SockAddr
SockAddrInet PortNumber
0 Word32
0))
, nodeConfNet :: Network
nodeConfNet = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
, nodeConfTimeout :: NominalDiffTime
nodeConfTimeout = StoreConfig -> NominalDiffTime
storeConfPeerTimeout StoreConfig
cfg
, nodeConfPeerMaxLife :: NominalDiffTime
nodeConfPeerMaxLife = StoreConfig -> NominalDiffTime
storeConfPeerMaxLife StoreConfig
cfg
, nodeConfConnect :: SockAddr -> WithConnection
nodeConfConnect = StoreConfig -> SockAddr -> WithConnection
storeConfConnect StoreConfig
cfg
}
withCache :: (MonadUnliftIO m, MonadLoggerIO m)
=> StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache :: StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache StoreConfig
cfg Chain
chain DatabaseReader
db Publisher StoreEvent
pub Maybe CacheConfig -> m a
action =
case StoreConfig -> Maybe FilePath
storeConfCache StoreConfig
cfg of
Maybe FilePath
Nothing ->
Maybe CacheConfig -> m a
action Maybe CacheConfig
forall a. Maybe a
Nothing
Just FilePath
redisurl ->
(Store -> m CacheMetrics) -> Maybe Store -> m (Maybe CacheMetrics)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM Store -> m CacheMetrics
forall (m :: * -> *). MonadIO m => Store -> m CacheMetrics
newCacheMetrics (StoreConfig -> Maybe Store
storeConfStats StoreConfig
cfg) m (Maybe CacheMetrics) -> (Maybe CacheMetrics -> m a) -> m a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe CacheMetrics
metrics ->
FilePath -> m Connection
forall (m :: * -> *). MonadIO m => FilePath -> m Connection
connectRedis FilePath
redisurl m Connection -> (Connection -> m a) -> m a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Connection
conn ->
Publisher StoreEvent -> (Inbox StoreEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher StoreEvent
pub ((Inbox StoreEvent -> m a) -> m a)
-> (Inbox StoreEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Inbox StoreEvent
evts ->
let conf :: CacheConfig
conf = Connection -> Maybe CacheMetrics -> CacheConfig
c Connection
conn Maybe CacheMetrics
metrics
in (Inbox CacheWriterMessage -> m ())
-> (Process CacheWriterMessage -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Inbox msg -> m ()) -> (Process msg -> m a) -> m a
withProcess (CacheConfig -> Inbox CacheWriterMessage -> m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadLoggerIO m) =>
CacheConfig -> Inbox CacheWriterMessage -> m ()
f CacheConfig
conf) ((Process CacheWriterMessage -> m a) -> m a)
-> (Process CacheWriterMessage -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Process CacheWriterMessage
p ->
Int -> Inbox StoreEvent -> CacheWriter -> m a -> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> Inbox StoreEvent -> CacheWriter -> m a -> m a
cacheWriterProcesses Int
crefresh Inbox StoreEvent
evts (Process CacheWriterMessage -> CacheWriter
forall msg. Process msg -> Mailbox msg
getProcessMailbox Process CacheWriterMessage
p) (m a -> m a) -> m a -> m a
forall a b. (a -> b) -> a -> b
$ do
Maybe CacheConfig -> m a
action (CacheConfig -> Maybe CacheConfig
forall a. a -> Maybe a
Just CacheConfig
conf)
where
crefresh :: Int
crefresh = StoreConfig -> Int
storeConfCacheRefresh StoreConfig
cfg
f :: CacheConfig -> Inbox CacheWriterMessage -> m ()
f CacheConfig
conf Inbox CacheWriterMessage
cwinbox = ReaderT DatabaseReader m () -> DatabaseReader -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (CacheConfig
-> Inbox CacheWriterMessage -> ReaderT DatabaseReader m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadLoggerIO m, StoreReadExtra m) =>
CacheConfig -> Inbox CacheWriterMessage -> m ()
cacheWriter CacheConfig
conf Inbox CacheWriterMessage
cwinbox) DatabaseReader
db
c :: Connection -> Maybe CacheMetrics -> CacheConfig
c Connection
conn Maybe CacheMetrics
metrics =
CacheConfig :: Connection
-> Int
-> Integer
-> Chain
-> Int
-> Int
-> Maybe CacheMetrics
-> CacheConfig
CacheConfig
{ cacheConn :: Connection
cacheConn = Connection
conn
, cacheMin :: Int
cacheMin = StoreConfig -> Int
storeConfCacheMin StoreConfig
cfg
, cacheChain :: Chain
cacheChain = Chain
chain
, cacheMax :: Integer
cacheMax = StoreConfig -> Integer
storeConfMaxKeys StoreConfig
cfg
, cacheRefresh :: Int
cacheRefresh = StoreConfig -> Int
storeConfCacheRefresh StoreConfig
cfg
, cacheRetryDelay :: Int
cacheRetryDelay = StoreConfig -> Int
storeConfCacheRetryDelay StoreConfig
cfg
, cacheMetrics :: Maybe CacheMetrics
cacheMetrics = Maybe CacheMetrics
metrics
}
cacheWriterProcesses :: MonadUnliftIO m
=> Int
-> Inbox StoreEvent
-> CacheWriter
-> m a
-> m a
cacheWriterProcesses :: Int -> Inbox StoreEvent -> CacheWriter -> m a -> m a
cacheWriterProcesses Int
crefresh Inbox StoreEvent
evts CacheWriter
cwm m a
action =
m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync m ()
events ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async ()
a1 -> Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a1 m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m a
action
where
events :: m ()
events = Inbox StoreEvent -> CacheWriter -> m ()
forall (m :: * -> *).
MonadIO m =>
Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents Inbox StoreEvent
evts CacheWriter
cwm
cacheWriterEvents :: MonadIO m => Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents :: Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents Inbox StoreEvent
evts CacheWriter
cwm =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Inbox StoreEvent -> m StoreEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox StoreEvent
evts m StoreEvent -> (StoreEvent -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \StoreEvent
e ->
StoreEvent
e StoreEvent -> CacheWriter -> m ()
forall (m :: * -> *).
MonadIO m =>
StoreEvent -> CacheWriter -> m ()
`cacheWriterDispatch` CacheWriter
cwm
cacheWriterDispatch :: MonadIO m => StoreEvent -> CacheWriter -> m ()
cacheWriterDispatch :: StoreEvent -> CacheWriter -> m ()
cacheWriterDispatch (StoreBestBlock BlockHash
_) = CacheWriter -> m ()
forall (m :: * -> *). MonadIO m => CacheWriter -> m ()
cacheNewBlock
cacheWriterDispatch StoreEvent
_ = m () -> CacheWriter -> m ()
forall a b. a -> b -> a
const (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
nodeForwarder :: MonadIO m
=> BlockStore
-> Publisher StoreEvent
-> Inbox NodeEvent
-> m ()
nodeForwarder :: BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
nodeForwarder BlockStore
b Publisher StoreEvent
pub Inbox NodeEvent
sub =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Inbox NodeEvent -> m NodeEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox NodeEvent
sub m NodeEvent -> (NodeEvent -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> (NodeEvent -> STM ()) -> NodeEvent -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BlockStore -> Publisher StoreEvent -> NodeEvent -> STM ()
storeDispatch BlockStore
b Publisher StoreEvent
pub
storeDispatch :: BlockStore
-> Publisher StoreEvent
-> NodeEvent
-> STM ()
storeDispatch :: BlockStore -> Publisher StoreEvent -> NodeEvent -> STM ()
storeDispatch BlockStore
b Publisher StoreEvent
pub (PeerEvent (PeerConnected Peer
p)) = do
StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> StoreEvent
StorePeerConnected Peer
p) Publisher StoreEvent
pub
Peer -> BlockStore -> STM ()
blockStorePeerConnectSTM Peer
p BlockStore
b
storeDispatch BlockStore
b Publisher StoreEvent
pub (PeerEvent (PeerDisconnected Peer
p)) = do
StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> StoreEvent
StorePeerDisconnected Peer
p) Publisher StoreEvent
pub
Peer -> BlockStore -> STM ()
blockStorePeerDisconnectSTM Peer
p BlockStore
b
storeDispatch BlockStore
b Publisher StoreEvent
_ (ChainEvent (ChainBestBlock BlockNode
bn)) =
BlockNode -> BlockStore -> STM ()
blockStoreHeadSTM BlockNode
bn BlockStore
b
storeDispatch BlockStore
_ Publisher StoreEvent
_ (ChainEvent ChainEvent
_) =
() -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
storeDispatch BlockStore
_ Publisher StoreEvent
pub (PeerMessage Peer
p (MPong (Pong Word64
n))) =
StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> Word64 -> StoreEvent
StorePeerPong Peer
p Word64
n) Publisher StoreEvent
pub
storeDispatch BlockStore
b Publisher StoreEvent
_ (PeerMessage Peer
p (MBlock Block
block)) =
Peer -> Block -> BlockStore -> STM ()
blockStoreBlockSTM Peer
p Block
block BlockStore
b
storeDispatch BlockStore
b Publisher StoreEvent
_ (PeerMessage Peer
p (MTx Tx
tx)) =
Peer -> Tx -> BlockStore -> STM ()
blockStoreTxSTM Peer
p Tx
tx BlockStore
b
storeDispatch BlockStore
b Publisher StoreEvent
_ (PeerMessage Peer
p (MNotFound (NotFound [InvVector]
is))) = do
let blocks :: [BlockHash]
blocks =
[ Hash256 -> BlockHash
BlockHash Hash256
h
| InvVector InvType
t Hash256
h <- [InvVector]
is
, InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvBlock Bool -> Bool -> Bool
|| InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvWitnessBlock
]
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([BlockHash] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [BlockHash]
blocks) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ Peer -> [BlockHash] -> BlockStore -> STM ()
blockStoreNotFoundSTM Peer
p [BlockHash]
blocks BlockStore
b
storeDispatch BlockStore
b Publisher StoreEvent
pub (PeerMessage Peer
p (MInv (Inv [InvVector]
is))) = do
let txs :: [TxHash]
txs = [Hash256 -> TxHash
TxHash Hash256
h | InvVector InvType
t Hash256
h <- [InvVector]
is, InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvTx Bool -> Bool -> Bool
|| InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvWitnessTx]
StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> [TxHash] -> StoreEvent
StoreTxAvailable Peer
p [TxHash]
txs) Publisher StoreEvent
pub
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([TxHash] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [TxHash]
txs) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ Peer -> [TxHash] -> BlockStore -> STM ()
blockStoreTxHashSTM Peer
p [TxHash]
txs BlockStore
b
storeDispatch BlockStore
_ Publisher StoreEvent
pub (PeerMessage Peer
p (MReject Reject
r)) =
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Reject -> MessageCommand
rejectMessage Reject
r MessageCommand -> MessageCommand -> Bool
forall a. Eq a => a -> a -> Bool
== MessageCommand
MCTx) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
case ByteString -> Either FilePath TxHash
forall a. Serialize a => ByteString -> Either FilePath a
decode (Reject -> ByteString
rejectData Reject
r) of
Left FilePath
_ -> () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Right TxHash
th ->
let reject :: StoreEvent
reject =
Peer -> TxHash -> RejectCode -> ByteString -> StoreEvent
StoreTxReject
Peer
p
TxHash
th
(Reject -> RejectCode
rejectCode Reject
r)
(VarString -> ByteString
getVarString (Reject -> VarString
rejectReason Reject
r))
in StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM StoreEvent
reject Publisher StoreEvent
pub
storeDispatch BlockStore
_ Publisher StoreEvent
_ NodeEvent
_ =
() -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()