{-# LANGUAGE FlexibleContexts #-}
module Haskoin.Store.Manager
( StoreConfig(..)
, Store(..)
, withStore
) where
import Control.Monad (forever, unless, when)
import Control.Monad.Logger (MonadLoggerIO)
import Control.Monad.Reader (ReaderT (ReaderT), runReaderT)
import Data.Serialize (decode)
import Data.Time.Clock (NominalDiffTime)
import Data.Word (Word32)
import Haskoin (BlockHash (..), Inv (..),
InvType (..), InvVector (..),
Message (..),
MessageCommand (..), Network,
NetworkAddress (..),
NotFound (..), Pong (..),
Reject (..), TxHash (..),
VarString (..),
sockToHostAddress)
import Haskoin.Node (Chain, ChainEvent (..),
HostPort, Node (..),
NodeConfig (..), NodeEvent (..),
PeerEvent (..), PeerManager,
WithConnection, withNode)
import Haskoin.Store.BlockStore (BlockStore,
BlockStoreConfig (..),
blockStoreBlockSTM,
blockStoreHeadSTM,
blockStoreNotFoundSTM,
blockStorePeerConnectSTM,
blockStorePeerDisconnectSTM,
blockStoreTxHashSTM,
blockStoreTxSTM, withBlockStore)
import Haskoin.Store.Cache (CacheConfig (..), CacheWriter,
cacheNewBlock, cachePing,
cacheWriter, connectRedis)
import Haskoin.Store.Common (StoreEvent (..))
import Haskoin.Store.Database.Reader (DatabaseReader (..),
DatabaseReaderT,
withDatabaseReader)
import Network.Socket (SockAddr (..))
import NQE (Inbox, Process (..), Publisher,
publishSTM, receive,
withProcess, withPublisher,
withSubscription)
import UnliftIO (MonadIO, MonadUnliftIO, STM,
atomically, link, withAsync)
import UnliftIO.Concurrent (threadDelay)
data Store =
Store
{ Store -> PeerManager
storeManager :: !PeerManager
, Store -> Chain
storeChain :: !Chain
, Store -> BlockStore
storeBlock :: !BlockStore
, Store -> DatabaseReader
storeDB :: !DatabaseReader
, Store -> Maybe CacheConfig
storeCache :: !(Maybe CacheConfig)
, Store -> Publisher StoreEvent
storePublisher :: !(Publisher StoreEvent)
, Store -> Network
storeNetwork :: !Network
}
data StoreConfig =
StoreConfig
{ StoreConfig -> Int
storeConfMaxPeers :: !Int
, StoreConfig -> [HostPort]
storeConfInitPeers :: ![HostPort]
, StoreConfig -> Bool
storeConfDiscover :: !Bool
, StoreConfig -> FilePath
storeConfDB :: !FilePath
, StoreConfig -> Network
storeConfNetwork :: !Network
, StoreConfig -> Maybe FilePath
storeConfCache :: !(Maybe String)
, StoreConfig -> Word32
storeConfInitialGap :: !Word32
, StoreConfig -> Word32
storeConfGap :: !Word32
, StoreConfig -> Int
storeConfCacheMin :: !Int
, StoreConfig -> Integer
storeConfMaxKeys :: !Integer
, StoreConfig -> Bool
storeConfNoMempool :: !Bool
, StoreConfig -> Bool
storeConfWipeMempool :: !Bool
, StoreConfig -> NominalDiffTime
storeConfPeerTimeout :: !NominalDiffTime
, StoreConfig -> NominalDiffTime
storeConfPeerMaxLife :: !NominalDiffTime
, StoreConfig -> SockAddr -> WithConnection
storeConfConnect :: !(SockAddr -> WithConnection)
, StoreConfig -> Int
storeConfCacheRefresh :: !Int
}
withStore :: (MonadLoggerIO m, MonadUnliftIO m)
=> StoreConfig -> (Store -> m a) -> m a
withStore :: StoreConfig -> (Store -> m a) -> m a
withStore cfg :: StoreConfig
cfg action :: Store -> m a
action =
StoreConfig -> DatabaseReaderT m a -> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
StoreConfig -> DatabaseReaderT m a -> m a
connectDB StoreConfig
cfg (DatabaseReaderT m a -> m a) -> DatabaseReaderT m a -> m a
forall a b. (a -> b) -> a -> b
$ (DatabaseReader -> m a) -> DatabaseReaderT m a
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT ((DatabaseReader -> m a) -> DatabaseReaderT m a)
-> (DatabaseReader -> m a) -> DatabaseReaderT m a
forall a b. (a -> b) -> a -> b
$ \db :: DatabaseReader
db ->
(Publisher StoreEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher StoreEvent -> m a) -> m a)
-> (Publisher StoreEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \pub :: Publisher StoreEvent
pub ->
(Publisher NodeEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher NodeEvent -> m a) -> m a)
-> (Publisher NodeEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \node_pub :: Publisher NodeEvent
node_pub ->
Publisher NodeEvent -> (Inbox NodeEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher NodeEvent
node_pub ((Inbox NodeEvent -> m a) -> m a)
-> (Inbox NodeEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \node_sub :: Inbox NodeEvent
node_sub ->
NodeConfig -> (Node -> m a) -> m a
forall (m :: * -> *) a.
(MonadLoggerIO m, MonadUnliftIO m) =>
NodeConfig -> (Node -> m a) -> m a
withNode (StoreConfig -> DatabaseReader -> Publisher NodeEvent -> NodeConfig
nodeCfg StoreConfig
cfg DatabaseReader
db Publisher NodeEvent
node_pub) ((Node -> m a) -> m a) -> (Node -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \node :: Node
node ->
StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache StoreConfig
cfg (Node -> Chain
nodeChain Node
node) DatabaseReader
db Publisher StoreEvent
pub ((Maybe CacheConfig -> m a) -> m a)
-> (Maybe CacheConfig -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \mcache :: Maybe CacheConfig
mcache ->
BlockStoreConfig -> (BlockStore -> m a) -> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
BlockStoreConfig -> (BlockStore -> m a) -> m a
withBlockStore (StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg StoreConfig
cfg Node
node Publisher StoreEvent
pub DatabaseReader
db) ((BlockStore -> m a) -> m a) -> (BlockStore -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \b :: BlockStore
b ->
m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
forall (m :: * -> *).
MonadIO m =>
BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
nodeForwarder BlockStore
b Publisher StoreEvent
pub Inbox NodeEvent
node_sub) ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \a1 :: Async ()
a1 -> Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a1 m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
Store -> m a
action $WStore :: PeerManager
-> Chain
-> BlockStore
-> DatabaseReader
-> Maybe CacheConfig
-> Publisher StoreEvent
-> Network
-> Store
Store { storeManager :: PeerManager
storeManager = Node -> PeerManager
nodeManager Node
node
, storeChain :: Chain
storeChain = Node -> Chain
nodeChain Node
node
, storeBlock :: BlockStore
storeBlock = BlockStore
b
, storeDB :: DatabaseReader
storeDB = DatabaseReader
db
, storeCache :: Maybe CacheConfig
storeCache = Maybe CacheConfig
mcache
, storePublisher :: Publisher StoreEvent
storePublisher = Publisher StoreEvent
pub
, storeNetwork :: Network
storeNetwork = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
}
connectDB :: MonadUnliftIO m => StoreConfig -> DatabaseReaderT m a -> m a
connectDB :: StoreConfig -> DatabaseReaderT m a -> m a
connectDB cfg :: StoreConfig
cfg =
Network
-> Word32 -> Word32 -> FilePath -> DatabaseReaderT m a -> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
Network
-> Word32 -> Word32 -> FilePath -> DatabaseReaderT m a -> m a
withDatabaseReader
(StoreConfig -> Network
storeConfNetwork StoreConfig
cfg)
(StoreConfig -> Word32
storeConfInitialGap StoreConfig
cfg)
(StoreConfig -> Word32
storeConfGap StoreConfig
cfg)
(StoreConfig -> FilePath
storeConfDB StoreConfig
cfg)
blockStoreCfg :: StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg :: StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg cfg :: StoreConfig
cfg node :: Node
node pub :: Publisher StoreEvent
pub db :: DatabaseReader
db =
$WBlockStoreConfig :: PeerManager
-> Chain
-> Publisher StoreEvent
-> DatabaseReader
-> Network
-> Bool
-> Bool
-> NominalDiffTime
-> BlockStoreConfig
BlockStoreConfig
{ blockConfChain :: Chain
blockConfChain = Node -> Chain
nodeChain Node
node
, blockConfManager :: PeerManager
blockConfManager = Node -> PeerManager
nodeManager Node
node
, blockConfListener :: Publisher StoreEvent
blockConfListener = Publisher StoreEvent
pub
, blockConfDB :: DatabaseReader
blockConfDB = DatabaseReader
db
, blockConfNet :: Network
blockConfNet = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
, blockConfNoMempool :: Bool
blockConfNoMempool = StoreConfig -> Bool
storeConfNoMempool StoreConfig
cfg
, blockConfWipeMempool :: Bool
blockConfWipeMempool = StoreConfig -> Bool
storeConfWipeMempool StoreConfig
cfg
, blockConfPeerTimeout :: NominalDiffTime
blockConfPeerTimeout = StoreConfig -> NominalDiffTime
storeConfPeerTimeout StoreConfig
cfg
}
nodeCfg :: StoreConfig
-> DatabaseReader
-> Publisher NodeEvent
-> NodeConfig
nodeCfg :: StoreConfig -> DatabaseReader -> Publisher NodeEvent -> NodeConfig
nodeCfg cfg :: StoreConfig
cfg db :: DatabaseReader
db pub :: Publisher NodeEvent
pub =
$WNodeConfig :: Int
-> DB
-> Maybe ColumnFamily
-> [HostPort]
-> Bool
-> NetworkAddress
-> Network
-> Publisher NodeEvent
-> NominalDiffTime
-> NominalDiffTime
-> (SockAddr -> WithConnection)
-> NodeConfig
NodeConfig
{ nodeConfMaxPeers :: Int
nodeConfMaxPeers = StoreConfig -> Int
storeConfMaxPeers StoreConfig
cfg
, nodeConfDB :: DB
nodeConfDB = DatabaseReader -> DB
databaseHandle DatabaseReader
db
, nodeConfColumnFamily :: Maybe ColumnFamily
nodeConfColumnFamily = Maybe ColumnFamily
forall a. Maybe a
Nothing
, nodeConfPeers :: [HostPort]
nodeConfPeers = StoreConfig -> [HostPort]
storeConfInitPeers StoreConfig
cfg
, nodeConfDiscover :: Bool
nodeConfDiscover = StoreConfig -> Bool
storeConfDiscover StoreConfig
cfg
, nodeConfEvents :: Publisher NodeEvent
nodeConfEvents = Publisher NodeEvent
pub
, nodeConfNetAddr :: NetworkAddress
nodeConfNetAddr =
Word64 -> HostAddress -> NetworkAddress
NetworkAddress
0
(SockAddr -> HostAddress
sockToHostAddress (PortNumber -> Word32 -> SockAddr
SockAddrInet 0 0))
, nodeConfNet :: Network
nodeConfNet = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
, nodeConfTimeout :: NominalDiffTime
nodeConfTimeout = StoreConfig -> NominalDiffTime
storeConfPeerTimeout StoreConfig
cfg
, nodeConfPeerMaxLife :: NominalDiffTime
nodeConfPeerMaxLife = StoreConfig -> NominalDiffTime
storeConfPeerMaxLife StoreConfig
cfg
, nodeConfConnect :: SockAddr -> WithConnection
nodeConfConnect = StoreConfig -> SockAddr -> WithConnection
storeConfConnect StoreConfig
cfg
}
withCache :: (MonadUnliftIO m, MonadLoggerIO m)
=> StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache :: StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache cfg :: StoreConfig
cfg chain :: Chain
chain db :: DatabaseReader
db pub :: Publisher StoreEvent
pub action :: Maybe CacheConfig -> m a
action =
case StoreConfig -> Maybe FilePath
storeConfCache StoreConfig
cfg of
Nothing ->
Maybe CacheConfig -> m a
action Maybe CacheConfig
forall a. Maybe a
Nothing
Just redisurl :: FilePath
redisurl ->
FilePath -> m Connection
forall (m :: * -> *). MonadIO m => FilePath -> m Connection
connectRedis FilePath
redisurl m Connection -> (Connection -> m a) -> m a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \conn :: Connection
conn ->
Publisher StoreEvent -> (Inbox StoreEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher StoreEvent
pub ((Inbox StoreEvent -> m a) -> m a)
-> (Inbox StoreEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \evts :: Inbox StoreEvent
evts ->
(Inbox CacheWriterMessage -> m ())
-> (Process CacheWriterMessage -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Inbox msg -> m ()) -> (Process msg -> m a) -> m a
withProcess (Connection -> Inbox CacheWriterMessage -> m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadLoggerIO m) =>
Connection -> Inbox CacheWriterMessage -> m ()
f Connection
conn) ((Process CacheWriterMessage -> m a) -> m a)
-> (Process CacheWriterMessage -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \p :: Process CacheWriterMessage
p ->
Int -> Inbox StoreEvent -> CacheWriter -> m a -> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> Inbox StoreEvent -> CacheWriter -> m a -> m a
cacheWriterProcesses Int
crefresh Inbox StoreEvent
evts (Process CacheWriterMessage -> CacheWriter
forall msg. Process msg -> Mailbox msg
getProcessMailbox Process CacheWriterMessage
p) (m a -> m a) -> m a -> m a
forall a b. (a -> b) -> a -> b
$
Maybe CacheConfig -> m a
action (CacheConfig -> Maybe CacheConfig
forall a. a -> Maybe a
Just (Connection -> CacheConfig
c Connection
conn))
where
crefresh :: Int
crefresh = StoreConfig -> Int
storeConfCacheRefresh StoreConfig
cfg
f :: Connection -> Inbox CacheWriterMessage -> m ()
f conn :: Connection
conn cwinbox :: Inbox CacheWriterMessage
cwinbox =
ReaderT DatabaseReader m () -> DatabaseReader -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (CacheConfig
-> Inbox CacheWriterMessage -> ReaderT DatabaseReader m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadLoggerIO m, StoreReadExtra m) =>
CacheConfig -> Inbox CacheWriterMessage -> m ()
cacheWriter (Connection -> CacheConfig
c Connection
conn) Inbox CacheWriterMessage
cwinbox) DatabaseReader
db
c :: Connection -> CacheConfig
c conn :: Connection
conn =
$WCacheConfig :: Connection -> Int -> Integer -> Chain -> Int -> CacheConfig
CacheConfig
{ cacheConn :: Connection
cacheConn = Connection
conn
, cacheMin :: Int
cacheMin = StoreConfig -> Int
storeConfCacheMin StoreConfig
cfg
, cacheChain :: Chain
cacheChain = Chain
chain
, cacheMax :: Integer
cacheMax = StoreConfig -> Integer
storeConfMaxKeys StoreConfig
cfg
, cacheRefresh :: Int
cacheRefresh = StoreConfig -> Int
storeConfCacheRefresh StoreConfig
cfg
}
cacheWriterProcesses :: MonadUnliftIO m
=> Int
-> Inbox StoreEvent
-> CacheWriter
-> m a
-> m a
cacheWriterProcesses :: Int -> Inbox StoreEvent -> CacheWriter -> m a -> m a
cacheWriterProcesses crefresh :: Int
crefresh evts :: Inbox StoreEvent
evts cwm :: CacheWriter
cwm action :: m a
action =
m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync m ()
events ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \a1 :: Async ()
a1 ->
m Any -> (Async Any -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync m Any
forall b. m b
ping ((Async Any -> m a) -> m a) -> (Async Any -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \a2 :: Async Any
a2 ->
Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a1 m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async Any -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async Any
a2 m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m a
action
where
events :: m ()
events = Inbox StoreEvent -> CacheWriter -> m ()
forall (m :: * -> *).
MonadIO m =>
Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents Inbox StoreEvent
evts CacheWriter
cwm
ping :: m b
ping = m () -> m b
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m b) -> m () -> m b
forall a b. (a -> b) -> a -> b
$ do
Int -> m ()
forall (m :: * -> *). MonadIO m => Int -> m ()
threadDelay (Int
crefresh Int -> Int -> Int
forall a. Num a => a -> a -> a
* 1000)
CacheWriter -> m ()
forall (m :: * -> *). MonadIO m => CacheWriter -> m ()
cachePing CacheWriter
cwm
cacheWriterEvents :: MonadIO m => Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents :: Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents evts :: Inbox StoreEvent
evts cwm :: CacheWriter
cwm =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
Inbox StoreEvent -> m StoreEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox StoreEvent
evts m StoreEvent -> (StoreEvent -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \e :: StoreEvent
e ->
StoreEvent
e StoreEvent -> CacheWriter -> m ()
forall (m :: * -> *).
MonadIO m =>
StoreEvent -> CacheWriter -> m ()
`cacheWriterDispatch` CacheWriter
cwm
cacheWriterDispatch :: MonadIO m => StoreEvent -> CacheWriter -> m ()
cacheWriterDispatch :: StoreEvent -> CacheWriter -> m ()
cacheWriterDispatch (StoreBestBlock _) = CacheWriter -> m ()
forall (m :: * -> *). MonadIO m => CacheWriter -> m ()
cacheNewBlock
cacheWriterDispatch _ = m () -> CacheWriter -> m ()
forall a b. a -> b -> a
const (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())
nodeForwarder :: MonadIO m
=> BlockStore
-> Publisher StoreEvent
-> Inbox NodeEvent
-> m ()
nodeForwarder :: BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
nodeForwarder b :: BlockStore
b pub :: Publisher StoreEvent
pub sub :: Inbox NodeEvent
sub =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Inbox NodeEvent -> m NodeEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox NodeEvent
sub m NodeEvent -> (NodeEvent -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> (NodeEvent -> STM ()) -> NodeEvent -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BlockStore -> Publisher StoreEvent -> NodeEvent -> STM ()
storeDispatch BlockStore
b Publisher StoreEvent
pub
storeDispatch :: BlockStore
-> Publisher StoreEvent
-> NodeEvent
-> STM ()
storeDispatch :: BlockStore -> Publisher StoreEvent -> NodeEvent -> STM ()
storeDispatch b :: BlockStore
b pub :: Publisher StoreEvent
pub (PeerEvent (PeerConnected p :: Peer
p)) = do
StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> StoreEvent
StorePeerConnected Peer
p) Publisher StoreEvent
pub
Peer -> BlockStore -> STM ()
blockStorePeerConnectSTM Peer
p BlockStore
b
storeDispatch b :: BlockStore
b pub :: Publisher StoreEvent
pub (PeerEvent (PeerDisconnected p :: Peer
p)) = do
StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> StoreEvent
StorePeerDisconnected Peer
p) Publisher StoreEvent
pub
Peer -> BlockStore -> STM ()
blockStorePeerDisconnectSTM Peer
p BlockStore
b
storeDispatch b :: BlockStore
b _ (ChainEvent (ChainBestBlock bn :: BlockNode
bn)) =
BlockNode -> BlockStore -> STM ()
blockStoreHeadSTM BlockNode
bn BlockStore
b
storeDispatch _ _ (ChainEvent _) =
() -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
storeDispatch _ pub :: Publisher StoreEvent
pub (PeerMessage p :: Peer
p (MPong (Pong n :: Word64
n))) =
StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> Word64 -> StoreEvent
StorePeerPong Peer
p Word64
n) Publisher StoreEvent
pub
storeDispatch b :: BlockStore
b _ (PeerMessage p :: Peer
p (MBlock block :: Block
block)) =
Peer -> Block -> BlockStore -> STM ()
blockStoreBlockSTM Peer
p Block
block BlockStore
b
storeDispatch b :: BlockStore
b _ (PeerMessage p :: Peer
p (MTx tx :: Tx
tx)) =
Peer -> Tx -> BlockStore -> STM ()
blockStoreTxSTM Peer
p Tx
tx BlockStore
b
storeDispatch b :: BlockStore
b _ (PeerMessage p :: Peer
p (MNotFound (NotFound is :: [InvVector]
is))) = do
let blocks :: [BlockHash]
blocks =
[ Hash256 -> BlockHash
BlockHash Hash256
h
| InvVector t :: InvType
t h :: Hash256
h <- [InvVector]
is
, InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvBlock Bool -> Bool -> Bool
|| InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvWitnessBlock
]
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([BlockHash] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [BlockHash]
blocks) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ Peer -> [BlockHash] -> BlockStore -> STM ()
blockStoreNotFoundSTM Peer
p [BlockHash]
blocks BlockStore
b
storeDispatch b :: BlockStore
b pub :: Publisher StoreEvent
pub (PeerMessage p :: Peer
p (MInv (Inv is :: [InvVector]
is))) = do
let txs :: [TxHash]
txs = [Hash256 -> TxHash
TxHash Hash256
h | InvVector t :: InvType
t h :: Hash256
h <- [InvVector]
is, InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvTx Bool -> Bool -> Bool
|| InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvWitnessTx]
StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> [TxHash] -> StoreEvent
StoreTxAvailable Peer
p [TxHash]
txs) Publisher StoreEvent
pub
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([TxHash] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [TxHash]
txs) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ Peer -> [TxHash] -> BlockStore -> STM ()
blockStoreTxHashSTM Peer
p [TxHash]
txs BlockStore
b
storeDispatch _ pub :: Publisher StoreEvent
pub (PeerMessage p :: Peer
p (MReject r :: Reject
r)) =
Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Reject -> MessageCommand
rejectMessage Reject
r MessageCommand -> MessageCommand -> Bool
forall a. Eq a => a -> a -> Bool
== MessageCommand
MCTx) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
case ByteString -> Either FilePath TxHash
forall a. Serialize a => ByteString -> Either FilePath a
decode (Reject -> ByteString
rejectData Reject
r) of
Left _ -> () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Right th :: TxHash
th ->
let reject :: StoreEvent
reject =
Peer -> TxHash -> RejectCode -> ByteString -> StoreEvent
StoreTxReject
Peer
p
TxHash
th
(Reject -> RejectCode
rejectCode Reject
r)
(VarString -> ByteString
getVarString (Reject -> VarString
rejectReason Reject
r))
in StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM StoreEvent
reject Publisher StoreEvent
pub
storeDispatch _ _ _ =
() -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()