{-# LANGUAGE FlexibleContexts #-}
module Haskoin.Store.Manager
    ( StoreConfig(..)
    , Store(..)
    , withStore
    ) where

import           Control.Monad                 (forever, unless, when)
import           Control.Monad.Logger          (MonadLoggerIO)
import           Control.Monad.Reader          (ReaderT (ReaderT), runReaderT)
import           Data.Serialize                (decode)
import           Data.Time.Clock               (NominalDiffTime)
import           Data.Word                     (Word32)
import           Haskoin                       (BlockHash (..), Inv (..),
                                                InvType (..), InvVector (..),
                                                Message (..),
                                                MessageCommand (..), Network,
                                                NetworkAddress (..),
                                                NotFound (..), Pong (..),
                                                Reject (..), TxHash (..),
                                                VarString (..),
                                                sockToHostAddress)
import           Haskoin.Node                  (Chain, ChainEvent (..),
                                                HostPort, Node (..),
                                                NodeConfig (..), NodeEvent (..),
                                                PeerEvent (..), PeerManager,
                                                WithConnection, withNode)
import           Haskoin.Store.BlockStore      (BlockStore,
                                                BlockStoreConfig (..),
                                                blockStoreBlockSTM,
                                                blockStoreHeadSTM,
                                                blockStoreNotFoundSTM,
                                                blockStorePeerConnectSTM,
                                                blockStorePeerDisconnectSTM,
                                                blockStoreTxHashSTM,
                                                blockStoreTxSTM, withBlockStore)
import           Haskoin.Store.Cache           (CacheConfig (..), CacheWriter,
                                                cacheNewBlock, cacheWriter,
                                                connectRedis, newCacheMetrics)
import           Haskoin.Store.Common          (StoreEvent (..))
import           Haskoin.Store.Database.Reader (DatabaseReader (..),
                                                DatabaseReaderT,
                                                withDatabaseReader)
import           NQE                           (Inbox, Process (..), Publisher,
                                                publishSTM, receive,
                                                withProcess, withPublisher,
                                                withSubscription)
import           Network.Socket                (SockAddr (..))
import qualified System.Metrics                as Metrics (Store)
import           UnliftIO                      (MonadIO, MonadUnliftIO, STM,
                                                atomically, link, withAsync)
import           UnliftIO.Concurrent           (threadDelay)

-- | Store mailboxes.
data Store =
    Store
        { Store -> PeerManager
storeManager   :: !PeerManager
        , Store -> Chain
storeChain     :: !Chain
        , Store -> BlockStore
storeBlock     :: !BlockStore
        , Store -> DatabaseReader
storeDB        :: !DatabaseReader
        , Store -> Maybe CacheConfig
storeCache     :: !(Maybe CacheConfig)
        , Store -> Publisher StoreEvent
storePublisher :: !(Publisher StoreEvent)
        , Store -> Network
storeNetwork   :: !Network
        }

-- | Configuration for a 'Store'.
data StoreConfig =
    StoreConfig
        { StoreConfig -> Int
storeConfMaxPeers        :: !Int
      -- ^ max peers to connect to
        , StoreConfig -> [HostPort]
storeConfInitPeers       :: ![HostPort]
      -- ^ static set of peers to connect to
        , StoreConfig -> Bool
storeConfDiscover        :: !Bool
      -- ^ discover new peers
        , StoreConfig -> FilePath
storeConfDB              :: !FilePath
      -- ^ RocksDB database path
        , StoreConfig -> Network
storeConfNetwork         :: !Network
      -- ^ network constants
        , StoreConfig -> Maybe FilePath
storeConfCache           :: !(Maybe String)
      -- ^ Redis cache configuration
        , StoreConfig -> Word32
storeConfInitialGap      :: !Word32
      -- ^ gap on extended public key with no transactions
        , StoreConfig -> Word32
storeConfGap             :: !Word32
      -- ^ gap for extended public keys
        , StoreConfig -> Int
storeConfCacheMin        :: !Int
      -- ^ cache xpubs with more than this many used addresses
        , StoreConfig -> Integer
storeConfMaxKeys         :: !Integer
      -- ^ maximum number of keys in Redis cache
        , StoreConfig -> Bool
storeConfNoMempool       :: !Bool
      -- ^ do not index new mempool transactions
        , StoreConfig -> Bool
storeConfWipeMempool     :: !Bool
      -- ^ wipe mempool when starting
        , StoreConfig -> NominalDiffTime
storeConfPeerTimeout     :: !NominalDiffTime
      -- ^ disconnect peer if message not received for this many seconds
        , StoreConfig -> NominalDiffTime
storeConfPeerMaxLife     :: !NominalDiffTime
      -- ^ disconnect peer if it has been connected this long
        , StoreConfig -> SockAddr -> WithConnection
storeConfConnect         :: !(SockAddr -> WithConnection)
      -- ^ connect to peers using the function 'withConnection'
        , StoreConfig -> Int
storeConfCacheRefresh    :: !Int
      -- ^ refresh the cache this often (milliseconds)
        , StoreConfig -> Int
storeConfCacheRetryDelay :: !Int
      -- ^ delay in microseconds to retry getting cache lock
        , StoreConfig -> Maybe Store
storeConfStats           :: !(Maybe Metrics.Store)
      -- ^ stats store
        }

withStore :: (MonadLoggerIO m, MonadUnliftIO m)
          => StoreConfig -> (Store -> m a) -> m a
withStore :: StoreConfig -> (Store -> m a) -> m a
withStore cfg :: StoreConfig
cfg action :: Store -> m a
action =
    StoreConfig -> DatabaseReaderT m a -> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
StoreConfig -> DatabaseReaderT m a -> m a
connectDB StoreConfig
cfg (DatabaseReaderT m a -> m a) -> DatabaseReaderT m a -> m a
forall a b. (a -> b) -> a -> b
$ (DatabaseReader -> m a) -> DatabaseReaderT m a
forall r (m :: * -> *) a. (r -> m a) -> ReaderT r m a
ReaderT ((DatabaseReader -> m a) -> DatabaseReaderT m a)
-> (DatabaseReader -> m a) -> DatabaseReaderT m a
forall a b. (a -> b) -> a -> b
$ \db :: DatabaseReader
db ->
    (Publisher StoreEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher StoreEvent -> m a) -> m a)
-> (Publisher StoreEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \pub :: Publisher StoreEvent
pub ->
    (Publisher NodeEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher NodeEvent -> m a) -> m a)
-> (Publisher NodeEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \node_pub :: Publisher NodeEvent
node_pub ->
    Publisher NodeEvent -> (Inbox NodeEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher NodeEvent
node_pub ((Inbox NodeEvent -> m a) -> m a)
-> (Inbox NodeEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \node_sub :: Inbox NodeEvent
node_sub ->
    NodeConfig -> (Node -> m a) -> m a
forall (m :: * -> *) a.
(MonadLoggerIO m, MonadUnliftIO m) =>
NodeConfig -> (Node -> m a) -> m a
withNode (StoreConfig -> DatabaseReader -> Publisher NodeEvent -> NodeConfig
nodeCfg StoreConfig
cfg DatabaseReader
db Publisher NodeEvent
node_pub) ((Node -> m a) -> m a) -> (Node -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \node :: Node
node ->
    StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache StoreConfig
cfg (Node -> Chain
nodeChain Node
node) DatabaseReader
db Publisher StoreEvent
pub ((Maybe CacheConfig -> m a) -> m a)
-> (Maybe CacheConfig -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \mcache :: Maybe CacheConfig
mcache ->
    BlockStoreConfig -> (BlockStore -> m a) -> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
BlockStoreConfig -> (BlockStore -> m a) -> m a
withBlockStore (StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg StoreConfig
cfg Node
node Publisher StoreEvent
pub DatabaseReader
db) ((BlockStore -> m a) -> m a) -> (BlockStore -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \b :: BlockStore
b ->
    m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
forall (m :: * -> *).
MonadIO m =>
BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
nodeForwarder BlockStore
b Publisher StoreEvent
pub Inbox NodeEvent
node_sub) ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \a1 :: Async ()
a1 -> Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a1 m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
    Store -> m a
action $WStore :: PeerManager
-> Chain
-> BlockStore
-> DatabaseReader
-> Maybe CacheConfig
-> Publisher StoreEvent
-> Network
-> Store
Store { storeManager :: PeerManager
storeManager = Node -> PeerManager
nodeManager Node
node
                 , storeChain :: Chain
storeChain = Node -> Chain
nodeChain Node
node
                 , storeBlock :: BlockStore
storeBlock = BlockStore
b
                 , storeDB :: DatabaseReader
storeDB = DatabaseReader
db
                 , storeCache :: Maybe CacheConfig
storeCache = Maybe CacheConfig
mcache
                 , storePublisher :: Publisher StoreEvent
storePublisher = Publisher StoreEvent
pub
                 , storeNetwork :: Network
storeNetwork = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
                 }

connectDB :: MonadUnliftIO m => StoreConfig -> DatabaseReaderT m a -> m a
connectDB :: StoreConfig -> DatabaseReaderT m a -> m a
connectDB cfg :: StoreConfig
cfg =
    Network
-> Word32 -> Word32 -> FilePath -> DatabaseReaderT m a -> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
Network
-> Word32 -> Word32 -> FilePath -> DatabaseReaderT m a -> m a
withDatabaseReader
          (StoreConfig -> Network
storeConfNetwork StoreConfig
cfg)
          (StoreConfig -> Word32
storeConfInitialGap StoreConfig
cfg)
          (StoreConfig -> Word32
storeConfGap StoreConfig
cfg)
          (StoreConfig -> FilePath
storeConfDB StoreConfig
cfg)

blockStoreCfg :: StoreConfig
              -> Node
              -> Publisher StoreEvent
              -> DatabaseReader
              -> BlockStoreConfig
blockStoreCfg :: StoreConfig
-> Node
-> Publisher StoreEvent
-> DatabaseReader
-> BlockStoreConfig
blockStoreCfg cfg :: StoreConfig
cfg node :: Node
node pub :: Publisher StoreEvent
pub db :: DatabaseReader
db =
    $WBlockStoreConfig :: PeerManager
-> Chain
-> Publisher StoreEvent
-> DatabaseReader
-> Network
-> Bool
-> Bool
-> NominalDiffTime
-> Maybe Store
-> BlockStoreConfig
BlockStoreConfig
        { blockConfChain :: Chain
blockConfChain = Node -> Chain
nodeChain Node
node
        , blockConfManager :: PeerManager
blockConfManager = Node -> PeerManager
nodeManager Node
node
        , blockConfListener :: Publisher StoreEvent
blockConfListener = Publisher StoreEvent
pub
        , blockConfDB :: DatabaseReader
blockConfDB = DatabaseReader
db
        , blockConfNet :: Network
blockConfNet = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
        , blockConfNoMempool :: Bool
blockConfNoMempool = StoreConfig -> Bool
storeConfNoMempool StoreConfig
cfg
        , blockConfWipeMempool :: Bool
blockConfWipeMempool = StoreConfig -> Bool
storeConfWipeMempool StoreConfig
cfg
        , blockConfPeerTimeout :: NominalDiffTime
blockConfPeerTimeout = StoreConfig -> NominalDiffTime
storeConfPeerTimeout StoreConfig
cfg
        , blockConfStats :: Maybe Store
blockConfStats = StoreConfig -> Maybe Store
storeConfStats StoreConfig
cfg
        }

nodeCfg :: StoreConfig
        -> DatabaseReader
        -> Publisher NodeEvent
        -> NodeConfig
nodeCfg :: StoreConfig -> DatabaseReader -> Publisher NodeEvent -> NodeConfig
nodeCfg cfg :: StoreConfig
cfg db :: DatabaseReader
db pub :: Publisher NodeEvent
pub =
    $WNodeConfig :: Int
-> DB
-> Maybe ColumnFamily
-> [HostPort]
-> Bool
-> NetworkAddress
-> Network
-> Publisher NodeEvent
-> NominalDiffTime
-> NominalDiffTime
-> (SockAddr -> WithConnection)
-> NodeConfig
NodeConfig
        { nodeConfMaxPeers :: Int
nodeConfMaxPeers = StoreConfig -> Int
storeConfMaxPeers StoreConfig
cfg
        , nodeConfDB :: DB
nodeConfDB = DatabaseReader -> DB
databaseHandle DatabaseReader
db
        , nodeConfColumnFamily :: Maybe ColumnFamily
nodeConfColumnFamily = Maybe ColumnFamily
forall a. Maybe a
Nothing
        , nodeConfPeers :: [HostPort]
nodeConfPeers = StoreConfig -> [HostPort]
storeConfInitPeers StoreConfig
cfg
        , nodeConfDiscover :: Bool
nodeConfDiscover = StoreConfig -> Bool
storeConfDiscover StoreConfig
cfg
        , nodeConfEvents :: Publisher NodeEvent
nodeConfEvents = Publisher NodeEvent
pub
        , nodeConfNetAddr :: NetworkAddress
nodeConfNetAddr =
                Word64 -> HostAddress -> NetworkAddress
NetworkAddress
                0
                (SockAddr -> HostAddress
sockToHostAddress (PortNumber -> Word32 -> SockAddr
SockAddrInet 0 0))
        , nodeConfNet :: Network
nodeConfNet = StoreConfig -> Network
storeConfNetwork StoreConfig
cfg
        , nodeConfTimeout :: NominalDiffTime
nodeConfTimeout = StoreConfig -> NominalDiffTime
storeConfPeerTimeout StoreConfig
cfg
        , nodeConfPeerMaxLife :: NominalDiffTime
nodeConfPeerMaxLife = StoreConfig -> NominalDiffTime
storeConfPeerMaxLife StoreConfig
cfg
        , nodeConfConnect :: SockAddr -> WithConnection
nodeConfConnect = StoreConfig -> SockAddr -> WithConnection
storeConfConnect StoreConfig
cfg
        }

withCache :: (MonadUnliftIO m, MonadLoggerIO m)
          => StoreConfig
          -> Chain
          -> DatabaseReader
          -> Publisher StoreEvent
          -> (Maybe CacheConfig -> m a)
          -> m a
withCache :: StoreConfig
-> Chain
-> DatabaseReader
-> Publisher StoreEvent
-> (Maybe CacheConfig -> m a)
-> m a
withCache cfg :: StoreConfig
cfg chain :: Chain
chain db :: DatabaseReader
db pub :: Publisher StoreEvent
pub action :: Maybe CacheConfig -> m a
action =
    case StoreConfig -> Maybe FilePath
storeConfCache StoreConfig
cfg of
        Nothing ->
            Maybe CacheConfig -> m a
action Maybe CacheConfig
forall a. Maybe a
Nothing
        Just redisurl :: FilePath
redisurl ->
            (Store -> m CacheMetrics) -> Maybe Store -> m (Maybe CacheMetrics)
forall (t :: * -> *) (m :: * -> *) a b.
(Traversable t, Monad m) =>
(a -> m b) -> t a -> m (t b)
mapM Store -> m CacheMetrics
forall (m :: * -> *). MonadIO m => Store -> m CacheMetrics
newCacheMetrics (StoreConfig -> Maybe Store
storeConfStats StoreConfig
cfg) m (Maybe CacheMetrics) -> (Maybe CacheMetrics -> m a) -> m a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \metrics :: Maybe CacheMetrics
metrics ->
            FilePath -> m Connection
forall (m :: * -> *). MonadIO m => FilePath -> m Connection
connectRedis FilePath
redisurl m Connection -> (Connection -> m a) -> m a
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \conn :: Connection
conn ->
            Publisher StoreEvent -> (Inbox StoreEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher StoreEvent
pub ((Inbox StoreEvent -> m a) -> m a)
-> (Inbox StoreEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \evts :: Inbox StoreEvent
evts ->
            let conf :: CacheConfig
conf = Connection -> Maybe CacheMetrics -> CacheConfig
c Connection
conn Maybe CacheMetrics
metrics
            in  (Inbox CacheWriterMessage -> m ())
-> (Process CacheWriterMessage -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Inbox msg -> m ()) -> (Process msg -> m a) -> m a
withProcess (CacheConfig -> Inbox CacheWriterMessage -> m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadLoggerIO m) =>
CacheConfig -> Inbox CacheWriterMessage -> m ()
f CacheConfig
conf) ((Process CacheWriterMessage -> m a) -> m a)
-> (Process CacheWriterMessage -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \p :: Process CacheWriterMessage
p ->
                Int -> Inbox StoreEvent -> CacheWriter -> m a -> m a
forall (m :: * -> *) a.
MonadUnliftIO m =>
Int -> Inbox StoreEvent -> CacheWriter -> m a -> m a
cacheWriterProcesses Int
crefresh Inbox StoreEvent
evts (Process CacheWriterMessage -> CacheWriter
forall msg. Process msg -> Mailbox msg
getProcessMailbox Process CacheWriterMessage
p) (m a -> m a) -> m a -> m a
forall a b. (a -> b) -> a -> b
$ do
                Maybe CacheConfig -> m a
action (CacheConfig -> Maybe CacheConfig
forall a. a -> Maybe a
Just CacheConfig
conf)
  where
    crefresh :: Int
crefresh = StoreConfig -> Int
storeConfCacheRefresh StoreConfig
cfg
    f :: CacheConfig -> Inbox CacheWriterMessage -> m ()
f conf :: CacheConfig
conf cwinbox :: Inbox CacheWriterMessage
cwinbox = ReaderT DatabaseReader m () -> DatabaseReader -> m ()
forall r (m :: * -> *) a. ReaderT r m a -> r -> m a
runReaderT (CacheConfig
-> Inbox CacheWriterMessage -> ReaderT DatabaseReader m ()
forall (m :: * -> *).
(MonadUnliftIO m, MonadLoggerIO m, StoreReadExtra m) =>
CacheConfig -> Inbox CacheWriterMessage -> m ()
cacheWriter CacheConfig
conf Inbox CacheWriterMessage
cwinbox) DatabaseReader
db
    c :: Connection -> Maybe CacheMetrics -> CacheConfig
c conn :: Connection
conn metrics :: Maybe CacheMetrics
metrics =
        $WCacheConfig :: Connection
-> Int
-> Integer
-> Chain
-> Int
-> Int
-> Maybe CacheMetrics
-> CacheConfig
CacheConfig
           { cacheConn :: Connection
cacheConn = Connection
conn
           , cacheMin :: Int
cacheMin = StoreConfig -> Int
storeConfCacheMin StoreConfig
cfg
           , cacheChain :: Chain
cacheChain = Chain
chain
           , cacheMax :: Integer
cacheMax = StoreConfig -> Integer
storeConfMaxKeys StoreConfig
cfg
           , cacheRefresh :: Int
cacheRefresh = StoreConfig -> Int
storeConfCacheRefresh StoreConfig
cfg
           , cacheRetryDelay :: Int
cacheRetryDelay = StoreConfig -> Int
storeConfCacheRetryDelay StoreConfig
cfg
           , cacheMetrics :: Maybe CacheMetrics
cacheMetrics = Maybe CacheMetrics
metrics
           }

cacheWriterProcesses :: MonadUnliftIO m
                     => Int
                     -> Inbox StoreEvent
                     -> CacheWriter
                     -> m a
                     -> m a
cacheWriterProcesses :: Int -> Inbox StoreEvent -> CacheWriter -> m a -> m a
cacheWriterProcesses crefresh :: Int
crefresh evts :: Inbox StoreEvent
evts cwm :: CacheWriter
cwm action :: m a
action =
    m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync m ()
events ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \a1 :: Async ()
a1 -> Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a1 m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> m a
action
  where
    events :: m ()
events = Inbox StoreEvent -> CacheWriter -> m ()
forall (m :: * -> *).
MonadIO m =>
Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents Inbox StoreEvent
evts CacheWriter
cwm

cacheWriterEvents :: MonadIO m => Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents :: Inbox StoreEvent -> CacheWriter -> m ()
cacheWriterEvents evts :: Inbox StoreEvent
evts cwm :: CacheWriter
cwm =
    m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$
    Inbox StoreEvent -> m StoreEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox StoreEvent
evts m StoreEvent -> (StoreEvent -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \e :: StoreEvent
e ->
    StoreEvent
e StoreEvent -> CacheWriter -> m ()
forall (m :: * -> *).
MonadIO m =>
StoreEvent -> CacheWriter -> m ()
`cacheWriterDispatch` CacheWriter
cwm

cacheWriterDispatch :: MonadIO m => StoreEvent -> CacheWriter -> m ()
cacheWriterDispatch :: StoreEvent -> CacheWriter -> m ()
cacheWriterDispatch (StoreBestBlock _) = CacheWriter -> m ()
forall (m :: * -> *). MonadIO m => CacheWriter -> m ()
cacheNewBlock
cacheWriterDispatch _                  = m () -> CacheWriter -> m ()
forall a b. a -> b -> a
const (() -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ())

nodeForwarder :: MonadIO m
              => BlockStore
              -> Publisher StoreEvent
              -> Inbox NodeEvent
              -> m ()
nodeForwarder :: BlockStore -> Publisher StoreEvent -> Inbox NodeEvent -> m ()
nodeForwarder b :: BlockStore
b pub :: Publisher StoreEvent
pub sub :: Inbox NodeEvent
sub =
    m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Inbox NodeEvent -> m NodeEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox NodeEvent
sub m NodeEvent -> (NodeEvent -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= STM () -> m ()
forall (m :: * -> *) a. MonadIO m => STM a -> m a
atomically (STM () -> m ()) -> (NodeEvent -> STM ()) -> NodeEvent -> m ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. BlockStore -> Publisher StoreEvent -> NodeEvent -> STM ()
storeDispatch BlockStore
b Publisher StoreEvent
pub

-- | Dispatcher of node events.
storeDispatch :: BlockStore
              -> Publisher StoreEvent
              -> NodeEvent
              -> STM ()

storeDispatch :: BlockStore -> Publisher StoreEvent -> NodeEvent -> STM ()
storeDispatch b :: BlockStore
b pub :: Publisher StoreEvent
pub (PeerEvent (PeerConnected p :: Peer
p)) = do
    StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> StoreEvent
StorePeerConnected Peer
p) Publisher StoreEvent
pub
    Peer -> BlockStore -> STM ()
blockStorePeerConnectSTM Peer
p BlockStore
b

storeDispatch b :: BlockStore
b pub :: Publisher StoreEvent
pub (PeerEvent (PeerDisconnected p :: Peer
p)) = do
    StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> StoreEvent
StorePeerDisconnected Peer
p) Publisher StoreEvent
pub
    Peer -> BlockStore -> STM ()
blockStorePeerDisconnectSTM Peer
p BlockStore
b

storeDispatch b :: BlockStore
b _ (ChainEvent (ChainBestBlock bn :: BlockNode
bn)) =
    BlockNode -> BlockStore -> STM ()
blockStoreHeadSTM BlockNode
bn BlockStore
b

storeDispatch _ _ (ChainEvent _) =
    () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()

storeDispatch _ pub :: Publisher StoreEvent
pub (PeerMessage p :: Peer
p (MPong (Pong n :: Word64
n))) =
    StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> Word64 -> StoreEvent
StorePeerPong Peer
p Word64
n) Publisher StoreEvent
pub

storeDispatch b :: BlockStore
b _ (PeerMessage p :: Peer
p (MBlock block :: Block
block)) =
    Peer -> Block -> BlockStore -> STM ()
blockStoreBlockSTM Peer
p Block
block BlockStore
b

storeDispatch b :: BlockStore
b _ (PeerMessage p :: Peer
p (MTx tx :: Tx
tx)) =
    Peer -> Tx -> BlockStore -> STM ()
blockStoreTxSTM Peer
p Tx
tx BlockStore
b

storeDispatch b :: BlockStore
b _ (PeerMessage p :: Peer
p (MNotFound (NotFound is :: [InvVector]
is))) = do
    let blocks :: [BlockHash]
blocks =
            [ Hash256 -> BlockHash
BlockHash Hash256
h
            | InvVector t :: InvType
t h :: Hash256
h <- [InvVector]
is
            , InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvBlock Bool -> Bool -> Bool
|| InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvWitnessBlock
            ]
    Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([BlockHash] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [BlockHash]
blocks) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ Peer -> [BlockHash] -> BlockStore -> STM ()
blockStoreNotFoundSTM Peer
p [BlockHash]
blocks BlockStore
b

storeDispatch b :: BlockStore
b pub :: Publisher StoreEvent
pub (PeerMessage p :: Peer
p (MInv (Inv is :: [InvVector]
is))) = do
    let txs :: [TxHash]
txs = [Hash256 -> TxHash
TxHash Hash256
h | InvVector t :: InvType
t h :: Hash256
h <- [InvVector]
is, InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvTx Bool -> Bool -> Bool
|| InvType
t InvType -> InvType -> Bool
forall a. Eq a => a -> a -> Bool
== InvType
InvWitnessTx]
    StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM (Peer -> [TxHash] -> StoreEvent
StoreTxAvailable Peer
p [TxHash]
txs) Publisher StoreEvent
pub
    Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless ([TxHash] -> Bool
forall (t :: * -> *) a. Foldable t => t a -> Bool
null [TxHash]
txs) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$ Peer -> [TxHash] -> BlockStore -> STM ()
blockStoreTxHashSTM Peer
p [TxHash]
txs BlockStore
b

storeDispatch _ pub :: Publisher StoreEvent
pub (PeerMessage p :: Peer
p (MReject r :: Reject
r)) =
    Bool -> STM () -> STM ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
when (Reject -> MessageCommand
rejectMessage Reject
r MessageCommand -> MessageCommand -> Bool
forall a. Eq a => a -> a -> Bool
== MessageCommand
MCTx) (STM () -> STM ()) -> STM () -> STM ()
forall a b. (a -> b) -> a -> b
$
    case ByteString -> Either FilePath TxHash
forall a. Serialize a => ByteString -> Either FilePath a
decode (Reject -> ByteString
rejectData Reject
r) of
        Left _ -> () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Right th :: TxHash
th ->
            let reject :: StoreEvent
reject =
                    Peer -> TxHash -> RejectCode -> ByteString -> StoreEvent
StoreTxReject
                    Peer
p
                    TxHash
th
                    (Reject -> RejectCode
rejectCode Reject
r)
                    (VarString -> ByteString
getVarString (Reject -> VarString
rejectReason Reject
r))
            in StoreEvent -> Publisher StoreEvent -> STM ()
forall msg. msg -> Publisher msg -> STM ()
publishSTM StoreEvent
reject Publisher StoreEvent
pub

storeDispatch _ _ _ =
    () -> STM ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()