{-# LANGUAGE FlexibleContexts #-}
module Haskoin.Store.Manager (
StoreConfig (..),
Store (..),
withStore,
) where
import Control.Monad (forever, unless, when)
import Control.Monad.Logger (MonadLoggerIO)
import Control.Monad.Reader (ReaderT (ReaderT), runReaderT)
import Data.Serialize (decode)
import Data.Time.Clock (NominalDiffTime)
import Data.Word (Word32)
import Haskoin (
BlockHash (..),
Inv (..),
InvType (..),
InvVector (..),
Message (..),
MessageCommand (..),
Network,
NetworkAddress (..),
NotFound (..),
Pong (..),
Reject (..),
TxHash (..),
VarString (..),
sockToHostAddress,
)
import Haskoin.Node (
Chain,
ChainEvent (..),
HostPort,
Node (..),
NodeConfig (..),
NodeEvent (..),
PeerEvent (..),
PeerManager,
WithConnection,
withNode,
)
import Haskoin.Store.BlockStore (
BlockStore,
BlockStoreConfig (..),
blockStoreBlockSTM,
blockStoreHeadSTM,
blockStoreNotFoundSTM,
blockStorePeerConnectSTM,
blockStorePeerDisconnectSTM,
blockStoreTxHashSTM,
blockStoreTxSTM,
withBlockStore,
)
import Haskoin.Store.Cache (
CacheConfig (..),
CacheWriter,
cacheNewBlock,
cacheNewTx,
cacheWriter,
connectRedis,
newCacheMetrics,
)
import Haskoin.Store.Common (
StoreEvent (..),
createDataMetrics,
)
import Haskoin.Store.Database.Reader (
DatabaseReader (..),
DatabaseReaderT,
withDatabaseReader,
)
import NQE (
Inbox,
Process (..),
Publisher,
publishSTM,
receive,
withProcess,
withPublisher,
withSubscription,
)
import Network.Socket (SockAddr (..))
import qualified System.Metrics as Metrics (Store)
import UnliftIO (
MonadIO,
MonadUnliftIO,
STM,
atomically,
link,
withAsync,
)
import UnliftIO.Concurrent (threadDelay)
data Store = Store
{ Store -> PeerManager
storeManager :: !PeerManager
, Store -> Chain
storeChain :: !Chain
, Store -> BlockStore
storeBlock :: !BlockStore
, Store -> DatabaseReader
storeDB :: !DatabaseReader
, Store -> Maybe CacheConfig
storeCache :: !(Maybe CacheConfig)
, Store -> Publisher StoreEvent
storePublisher :: !(Publisher StoreEvent)
, Store -> Network
storeNetwork :: !Network
}
data StoreConfig = StoreConfig
{
StoreConfig -> Int
storeConfMaxPeers :: !Int
,
StoreConfig -> [HostPort]
storeConfInitPeers :: ![HostPort]
,
StoreConfig -> Bool
storeConfDiscover :: !Bool
,
StoreConfig -> FilePath
storeConfDB :: !FilePath
,
StoreConfig -> Network
storeConfNetwork :: !Network
,
StoreConfig -> Maybe FilePath
storeConfCache :: !(Maybe String)
,
StoreConfig -> Word32
storeConfInitialGap :: !Word32
,
StoreConfig -> Word32
storeConfGap :: !Word32
,
StoreConfig -> Int
storeConfCacheMin :: !Int
,
StoreConfig -> Integer
storeConfMaxKeys :: !Integer
,
StoreConfig -> Bool
storeConfNoMempool :: !Bool
,
StoreConfig -> Bool
storeConfWipeMempool :: !Bool
,
StoreConfig -> Bool
storeConfSyncMempool :: !Bool
,
StoreConfig -> NominalDiffTime
storeConfPeerTimeout :: !NominalDiffTime
,
StoreConfig -> NominalDiffTime
storeConfPeerMaxLife :: !NominalDiffTime
,
StoreConfig -> SockAddr -> WithConnection
storeConfConnect :: !(SockAddr -> WithConnection)
,
StoreConfig -> Int
storeConfCacheRetryDelay :: !Int
,
StoreConfig -> Maybe Store
storeConfStats :: !(Maybe Metrics.Store)
}
withStore ::
(MonadLoggerIO m, MonadUnliftIO m) =>
StoreConfig ->
(Store -> m a) ->
m a
withStore :: StoreConfig -> (Store -> m a) -> m a
withStore StoreConfig
cfg Store -> m a
action =
StoreConfig -> DatabaseReaderT m a -> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
StoreConfig -> DatabaseReaderT m a -> m a
connectDB StoreConfig
cfg (DatabaseReaderT m a -> m a) -> DatabaseReaderT m a -> m a
forall a b. (a -> b) -> a -> b
$
(DatabaseReader -> m a) -> DatabaseReaderT m a
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT ((DatabaseReader -> m a) -> DatabaseReaderT m a)
-> (DatabaseReader -> m a) -> DatabaseReaderT m a
forall a b. (a -> b) -> a -> b
$ \DatabaseReader
db ->
(Publisher StoreEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher StoreEvent -> m a) -> m a)
-> (Publisher StoreEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Publisher StoreEvent
pub ->
(Publisher NodeEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher NodeEvent -> m a) -> m a)
-> (Publisher NodeEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Publisher NodeEvent
node_pub ->
Publisher NodeEvent -> (Inbox NodeEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher NodeEvent
node_pub ((Inbox NodeEvent -> m a) -> m a)
-> (Inbox NodeEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Inbox NodeEvent
node_sub ->
NodeConfig -> (Node -> m a) -> m a
forall (m :: * -> *) a.
(MonadLoggerIO m, MonadUnliftIO m) =>
NodeConfig -> (Node -> m a) -> m a
withNode (StoreConfig -> DatabaseReader -> Publisher NodeEvent -> NodeConfig
nodeCfg StoreConfig
cfg DatabaseReader
db Publisher NodeEvent
node_pub) ((Node -> m a) -> m a) -> (Node -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Node
node ->
StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache StoreConfig
cfg (Node -> Chain
nodeChain Node
node) DatabaseReader
db Publisher StoreEvent
pub ((Maybe CacheConfig -> m a) -> m a)
-> (Maybe CacheConfig -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Maybe CacheConfig
mcache ->
BlockStoreConfig -> (BlockStore -> m a) -> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
BlockStoreConfig -> (BlockStore -> m a) -> m a
withBlockStore (StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg StoreConfig
cfg Node
node Publisher StoreEvent
pub DatabaseReader
db) ((BlockStore -> m a) -> m a) -> (BlockStore -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \BlockStore
b ->
m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
forall (m :: * -> *).
MonadIO m =>
BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
nodeForwarder BlockStore
b Publisher StoreEvent
pub Inbox NodeEvent
node_sub) ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async ()
a1 ->
Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a1
m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Store -> m a
action
Store :: PeerManager
-> Chain
-> BlockStore
-> DatabaseReader
-> Maybe CacheConfig
-> Publisher StoreEvent
-> Network
-> Store
Store
{ storeManager :: PeerManager
storeManager = Node -> PeerManager
nodeManager Node
node
, storeChain :: Chain
storeChain = Node -> Chain
nodeChain Node
node
, storeBlock :: BlockStore
storeBlock = BlockStore
b
, storeDB :: DatabaseReader
storeDB = DatabaseReader
db
, storeCache :: Maybe CacheConfig
storeCache = Maybe CacheConfig
mcache
, storePublisher :: Publisher StoreEvent
storePublisher = Publisher StoreEvent
pub
, storeNetwork :: Network
storeNetwork = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
}
connectDB :: MonadUnliftIO m => StoreConfig -> DatabaseReaderT m a -> m a
connectDB :: StoreConfig -> DatabaseReaderT m a -> m a
connectDB StoreConfig
cfg DatabaseReaderT m a
f = do
Maybe DataMetrics
stats <- (Store -> m DataMetrics) -> Maybe Store -> m (Maybe DataMetrics)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM Store -> m DataMetrics
forall (m :: * -> *). MonadIO m => Store -> m DataMetrics
createDataMetrics (StoreConfig -> Maybe Store
storeConfStats StoreConfig
cfg)
Network
-> Word32
-> Word32
-> FilePath
-> Maybe DataMetrics
-> DatabaseReaderT m a
-> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
Network
-> Word32
-> Word32
-> FilePath
-> Maybe DataMetrics
-> DatabaseReaderT m a
-> m a
withDatabaseReader
(StoreConfig -> Network
storeConfNetwork StoreConfig
cfg)
(StoreConfig -> Word32
storeConfInitialGap StoreConfig
cfg)
(StoreConfig -> Word32
storeConfGap StoreConfig
cfg)
(StoreConfig -> FilePath
storeConfDB StoreConfig
cfg)
Maybe DataMetrics
stats
DatabaseReaderT m a
f
blockStoreCfg ::
StoreConfig ->
Node ->
Publisher StoreEvent ->
DatabaseReader ->
BlockStoreConfig
blockStoreCfg :: StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg StoreConfig
cfg Node
node Publisher StoreEvent
pub DatabaseReader
db =
BlockStoreConfig :: PeerManager
-> Chain
-> Publisher StoreEvent
-> DatabaseReader
-> Network
-> Bool
-> Bool
-> Bool
-> NominalDiffTime
-> Maybe Store
-> BlockStoreConfig
BlockStoreConfig
{ blockConfChain :: Chain
blockConfChain = Node -> Chain
nodeChain Node
node
, blockConfManager :: PeerManager
blockConfManager = Node -> PeerManager
nodeManager Node
node
, blockConfListener :: Publisher StoreEvent
blockConfListener = Publisher StoreEvent
pub
, blockConfDB :: DatabaseReader
blockConfDB = DatabaseReader
db
, blockConfNet :: Network
blockConfNet = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
, blockConfNoMempool :: Bool
blockConfNoMempool = StoreConfig -> Bool
storeConfNoMempool StoreConfig
cfg
, blockConfWipeMempool :: Bool
blockConfWipeMempool = StoreConfig -> Bool
storeConfWipeMempool StoreConfig
cfg
, blockConfSyncMempool :: Bool
blockConfSyncMempool = StoreConfig -> Bool
storeConfSyncMempool StoreConfig
cfg
, blockConfPeerTimeout :: NominalDiffTime
blockConfPeerTimeout = StoreConfig -> NominalDiffTime
storeConfPeerTimeout StoreConfig
cfg
, blockConfStats :: Maybe Store
blockConfStats = StoreConfig -> Maybe Store
storeConfStats StoreConfig
cfg
}
nodeCfg ::
StoreConfig ->
DatabaseReader ->
Publisher NodeEvent ->
NodeConfig
nodeCfg :: StoreConfig -> DatabaseReader -> Publisher NodeEvent -> NodeConfig
nodeCfg StoreConfig
cfg DatabaseReader
db Publisher NodeEvent
pub =
NodeConfig :: Int
-> DB
-> Maybe ColumnFamily
-> [HostPort]
-> Bool
-> NetworkAddress
-> Network
-> Publisher NodeEvent
-> NominalDiffTime
-> NominalDiffTime
-> (SockAddr -> WithConnection)
-> NodeConfig
NodeConfig
{ nodeConfMaxPeers :: Int
nodeConfMaxPeers = StoreConfig -> Int
storeConfMaxPeers StoreConfig
cfg
, nodeConfDB :: DB
nodeConfDB = DatabaseReader -> DB
databaseHandle DatabaseReader
db
, nodeConfColumnFamily :: Maybe ColumnFamily
nodeConfColumnFamily = Maybe ColumnFamily
forall a. Maybe a
Nothing
, nodeConfPeers :: [HostPort]
nodeConfPeers = StoreConfig -> [HostPort]
storeConfInitPeers StoreConfig
cfg
, nodeConfDiscover :: Bool
nodeConfDiscover = StoreConfig -> Bool
storeConfDiscover StoreConfig
cfg
, nodeConfEvents :: Publisher NodeEvent
nodeConfEvents = Publisher NodeEvent
pub
, nodeConfNetAddr :: NetworkAddress
nodeConfNetAddr =
Word64 -> HostAddress -> NetworkAddress
NetworkAddress
Word64
0
(SockAddr -> HostAddress
sockToHostAddress (PortNumber -> Word32 -> SockAddr
SockAddrInet PortNumber
0 Word32
0))
, nodeConfNet :: Network
nodeConfNet = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
, nodeConfTimeout :: NominalDiffTime
nodeConfTimeout = StoreConfig -> NominalDiffTime
storeConfPeerTimeout StoreConfig
cfg
, nodeConfPeerMaxLife :: NominalDiffTime
nodeConfPeerMaxLife = StoreConfig -> NominalDiffTime
storeConfPeerMaxLife StoreConfig
cfg
, nodeConfConnect :: SockAddr -> WithConnection
nodeConfConnect = StoreConfig -> SockAddr -> WithConnection
storeConfConnect StoreConfig
cfg
}
withCache ::
(MonadUnliftIO m, MonadLoggerIO m) =>
StoreConfig ->
Chain ->
DatabaseReader ->
Publisher StoreEvent ->
(Maybe CacheConfig -> m a) ->
m a
withCache :: StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache StoreConfig
cfg Chain
chain DatabaseReader
db Publisher StoreEvent
pub Maybe CacheConfig -> m a
action =
case StoreConfig -> Maybe FilePath
storeConfCache StoreConfig
cfg of
Maybe FilePath
Nothing ->
Maybe CacheConfig -> m a
action Maybe CacheConfig
forall a. Maybe a
Nothing
Just FilePath
redisurl ->
(Store -> m CacheMetrics) -> Maybe Store -> m (Maybe CacheMetrics)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM Store -> m CacheMetrics
forall (m :: * -> *). MonadIO m => Store -> m CacheMetrics
newCacheMetrics (StoreConfig -> Maybe Store
storeConfStats StoreConfig
cfg) m (Maybe CacheMetrics) -> (Maybe CacheMetrics -> m a) -> m a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Maybe CacheMetrics
metrics ->
FilePath -> m Connection
forall (m :: * -> *). MonadIO m => FilePath -> m Connection
connectRedis FilePath
redisurl m Connection -> (Connection -> m a) -> m a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \Connection
conn ->
Publisher StoreEvent -> (Inbox StoreEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher StoreEvent
pub ((Inbox StoreEvent -> m a) -> m a)
-> (Inbox StoreEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Inbox StoreEvent
evts ->
let conf :: CacheConfig
conf = Connection -> Maybe CacheMetrics -> CacheConfig
c Connection
conn Maybe CacheMetrics
metrics
in (Inbox CacheWriterMessage -> m ())
-> (Process CacheWriterMessage -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Inbox msg -> m ()) -> (Process msg -> m a) -> m a
withProcess (CacheConfig -> Inbox CacheWriterMessage -> m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadLoggerIO m) =>
CacheConfig -> Inbox CacheWriterMessage -> m ()
f CacheConfig
conf) ((Process CacheWriterMessage -> m a) -> m a)
-> (Process CacheWriterMessage -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Process CacheWriterMessage
p ->
Inbox StoreEvent -> CacheWriter -> m a -> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
Inbox StoreEvent -> CacheWriter -> m a -> m a
cacheWriterProcesses Inbox StoreEvent
evts (Process CacheWriterMessage -> CacheWriter
forall msg. Process msg -> Mailbox msg
getProcessMailbox Process CacheWriterMessage
p) (m a -> m a) -> m a -> m a
forall a b. (a -> b) -> a -> b
$ do
Maybe CacheConfig -> m a
action (CacheConfig -> Maybe CacheConfig
forall a. a -> Maybe a
Just CacheConfig
conf)
where
f :: CacheConfig -> Inbox CacheWriterMessage -> m ()
f CacheConfig
conf Inbox CacheWriterMessage
cwinbox = ReaderT DatabaseReader m () -> DatabaseReader -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (CacheConfig
-> Inbox CacheWriterMessage -> ReaderT DatabaseReader m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadLoggerIO m, StoreReadExtra m) =>
CacheConfig -> Inbox CacheWriterMessage -> m ()
cacheWriter CacheConfig
conf Inbox CacheWriterMessage
cwinbox) DatabaseReader
db
c :: Connection -> Maybe CacheMetrics -> CacheConfig
c Connection
conn Maybe CacheMetrics
metrics =
CacheConfig :: Connection
-> Int
-> Integer
-> Chain
-> Int
-> Maybe CacheMetrics
-> CacheConfig
CacheConfig
{ cacheConn :: Connection
cacheConn = Connection
conn
, cacheMin :: Int
cacheMin = StoreConfig -> Int
storeConfCacheMin StoreConfig
cfg
, cacheChain :: Chain
cacheChain = Chain
chain
, cacheMax :: Integer
cacheMax = StoreConfig -> Integer
storeConfMaxKeys StoreConfig
cfg
, cacheRetryDelay :: Int
cacheRetryDelay = StoreConfig -> Int
storeConfCacheRetryDelay StoreConfig
cfg
, cacheMetrics :: Maybe CacheMetrics
cacheMetrics = Maybe CacheMetrics
metrics
}
cacheWriterProcesses ::
MonadUnliftIO m =>
Inbox StoreEvent ->
CacheWriter ->
m a ->
m a
cacheWriterProcesses :: Inbox StoreEvent -> CacheWriter -> m a -> m a
cacheWriterProcesses Inbox StoreEvent
evts CacheWriter
cwm m a
action =
m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync m ()
events ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async ()
a1 -> Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a1 m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m a
action
where
events :: m ()
events = Inbox StoreEvent -> CacheWriter -> m ()
forall (m :: * -> *).
MonadIO m =>
Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents Inbox StoreEvent
evts CacheWriter
cwm
cacheWriterEvents :: MonadIO m => Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents :: Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents Inbox StoreEvent
evts CacheWriter
cwm =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Inbox StoreEvent -> m StoreEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox StoreEvent
evts m StoreEvent -> (StoreEvent -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \StoreEvent
e ->
StoreEvent
e StoreEvent -> CacheWriter -> m ()
forall (m :: * -> *).
MonadIO m =>
StoreEvent -> CacheWriter -> m ()
`cacheWriterDispatch` CacheWriter
cwm
cacheWriterDispatch :: MonadIO m => StoreEvent -> CacheWriter -> m ()
cacheWriterDispatch :: StoreEvent -> CacheWriter -> m ()
cacheWriterDispatch (StoreBestBlock BlockHash
_) = CacheWriter -> m ()
forall (m :: * -> *). MonadIO m => CacheWriter -> m ()
cacheNewBlock
cacheWriterDispatch (StoreMempoolNew TxHash
t) = TxHash -> CacheWriter -> m ()
forall (m :: * -> *). MonadIO m => TxHash -> CacheWriter -> m ()
cacheNewTx TxHash
t
cacheWriterDispatch (StoreMempoolDelete TxHash
t) = TxHash -> CacheWriter -> m ()
forall (m :: * -> *). MonadIO m => TxHash -> CacheWriter -> m ()
cacheNewTx TxHash
t
cacheWriterDispatch StoreEvent
_ = m () -> CacheWriter -> m ()
forall a b. a -> b -> a
const (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
nodeForwarder ::
MonadIO m =>
BlockStore ->
Publisher StoreEvent ->
Inbox NodeEvent ->
m ()
nodeForwarder :: BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
nodeForwarder BlockStore
b Publisher StoreEvent
pub Inbox NodeEvent
sub =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Inbox NodeEvent -> m NodeEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox NodeEvent
sub m NodeEvent -> (NodeEvent -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> (NodeEvent -> STM ()) -> NodeEvent -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BlockStore -> Publisher StoreEvent -> NodeEvent -> STM ()
storeDispatch BlockStore
b Publisher StoreEvent
pub
storeDispatch ::
BlockStore ->
Publisher StoreEvent ->
NodeEvent ->
STM ()
storeDispatch :: BlockStore -> Publisher StoreEvent -> NodeEvent -> STM ()
storeDispatch BlockStore
b Publisher StoreEvent
pub (PeerEvent (PeerConnected Peer
p)) = do
StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> StoreEvent
StorePeerConnected Peer
p) Publisher StoreEvent
pub
Peer -> BlockStore -> STM ()
blockStorePeerConnectSTM Peer
p BlockStore
b
storeDispatch BlockStore
b Publisher StoreEvent
pub (PeerEvent (PeerDisconnected Peer
p)) = do
StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> StoreEvent
StorePeerDisconnected Peer
p) Publisher StoreEvent
pub
Peer -> BlockStore -> STM ()
blockStorePeerDisconnectSTM Peer
p BlockStore
b
storeDispatch BlockStore
b Publisher StoreEvent
_ (ChainEvent (ChainBestBlock BlockNode
bn)) =
BlockNode -> BlockStore -> STM ()
blockStoreHeadSTM BlockNode
bn BlockStore
b
storeDispatch BlockStore
_ Publisher StoreEvent
_ (ChainEvent ChainEvent
_) =
() -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
storeDispatch BlockStore
_ Publisher StoreEvent
pub (PeerMessage Peer
p (MPong (Pong Word64
n))) =
StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> Word64 -> StoreEvent
StorePeerPong Peer
p Word64
n) Publisher StoreEvent
pub
storeDispatch BlockStore
b Publisher StoreEvent
_ (PeerMessage Peer
p (MBlock Block
block)) =
Peer -> Block -> BlockStore -> STM ()
blockStoreBlockSTM Peer
p Block
block BlockStore
b
storeDispatch BlockStore
b Publisher StoreEvent
_ (PeerMessage Peer
p (MTx Tx
tx)) =
Peer -> Tx -> BlockStore -> STM ()
blockStoreTxSTM Peer
p Tx
tx BlockStore
b
storeDispatch BlockStore
b Publisher StoreEvent
_ (PeerMessage Peer
p (MNotFound (NotFound [InvVector]
is))) = do
let blocks :: [BlockHash]
blocks =
[ Hash256 -> BlockHash
BlockHash Hash256
h
| InvVector InvType
t Hash256
h <- [InvVector]
is
, InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvBlock Bool -> Bool -> Bool
|| InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvWitnessBlock
]
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([BlockHash] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [BlockHash]
blocks) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ Peer -> [BlockHash] -> BlockStore -> STM ()
blockStoreNotFoundSTM Peer
p [BlockHash]
blocks BlockStore
b
storeDispatch BlockStore
b Publisher StoreEvent
pub (PeerMessage Peer
p (MInv (Inv [InvVector]
is))) = do
let txs :: [TxHash]
txs = [Hash256 -> TxHash
TxHash Hash256
h | InvVector InvType
t Hash256
h <- [InvVector]
is, InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvTx Bool -> Bool -> Bool
|| InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvWitnessTx]
StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> [TxHash] -> StoreEvent
StoreTxAnnounce Peer
p [TxHash]
txs) Publisher StoreEvent
pub
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([TxHash] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [TxHash]
txs) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ Peer -> [TxHash] -> BlockStore -> STM ()
blockStoreTxHashSTM Peer
p [TxHash]
txs BlockStore
b
storeDispatch BlockStore
_ Publisher StoreEvent
pub (PeerMessage Peer
p (MReject Reject
r)) =
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Reject -> MessageCommand
rejectMessage Reject
r MessageCommand -> MessageCommand -> Bool
forall a. Eq a => a -> a -> Bool
== MessageCommand
MCTx) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
case ByteString -> Either FilePath TxHash
forall a. Serialize a => ByteString -> Either FilePath a
decode (Reject -> ByteString
rejectData Reject
r) of
Left FilePath
_ -> () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Right TxHash
th ->
let reject :: StoreEvent
reject =
Peer -> TxHash -> RejectCode -> ByteString -> StoreEvent
StoreTxReject
Peer
p
TxHash
th
(Reject -> RejectCode
rejectCode Reject
r)
(VarString -> ByteString
getVarString (Reject -> VarString
rejectReason Reject
r))
in StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM StoreEvent
reject Publisher StoreEvent
pub
storeDispatch BlockStore
_ Publisher StoreEvent
_ NodeEvent
_ =
() -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()