{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
module Haskoin.Node
( module Haskoin.Node.Peer
, module Haskoin.Node.Manager
, module Haskoin.Node.Chain
, NodeConfig (..)
, NodeEvent (..)
, Node (..)
, withNode
, withConnection
) where
import Control.Monad (forever)
import Control.Monad.Logger (MonadLoggerIO)
import Data.Conduit.Network (appSink, appSource, clientSettings,
runTCPClient)
import Data.String.Conversions (cs)
import Data.Time.Clock (NominalDiffTime)
import Database.RocksDB (ColumnFamily, DB)
import Haskoin (Addr (..), BlockNode (..),
Headers (..), Message (..), Network,
NetworkAddress, Ping (..), Pong (..))
import Haskoin.Node.Chain
import Haskoin.Node.Manager
import Haskoin.Node.Peer
import Network.Socket (NameInfoFlag (..), SockAddr,
getNameInfo)
import NQE (Inbox, Publisher, publish, receive,
withPublisher, withSubscription)
import Text.Read (readMaybe)
import UnliftIO (MonadUnliftIO, SomeException, catch,
liftIO, link, throwIO, withAsync)
data NodeConfig = NodeConfig
{ NodeConfig -> Int
nodeConfMaxPeers :: !Int
, NodeConfig -> DB
nodeConfDB :: !DB
, NodeConfig -> Maybe ColumnFamily
nodeConfColumnFamily :: !(Maybe ColumnFamily)
, NodeConfig -> [HostPort]
nodeConfPeers :: ![HostPort]
, NodeConfig -> Bool
nodeConfDiscover :: !Bool
, NodeConfig -> NetworkAddress
nodeConfNetAddr :: !NetworkAddress
, NodeConfig -> Network
nodeConfNet :: !Network
, NodeConfig -> Publisher NodeEvent
nodeConfEvents :: !(Publisher NodeEvent)
, NodeConfig -> NominalDiffTime
nodeConfTimeout :: !NominalDiffTime
, NodeConfig -> NominalDiffTime
nodeConfPeerMaxLife :: !NominalDiffTime
, NodeConfig -> SockAddr -> WithConnection
nodeConfConnect :: !(SockAddr -> WithConnection)
}
data Node = Node { Node -> PeerManager
nodeManager :: !PeerManager
, Node -> Chain
nodeChain :: !Chain
}
data NodeEvent
= ChainEvent !ChainEvent
| PeerEvent !PeerEvent
| PeerMessage !Peer !Message
deriving NodeEvent -> NodeEvent -> Bool
(NodeEvent -> NodeEvent -> Bool)
-> (NodeEvent -> NodeEvent -> Bool) -> Eq NodeEvent
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: NodeEvent -> NodeEvent -> Bool
$c/= :: NodeEvent -> NodeEvent -> Bool
== :: NodeEvent -> NodeEvent -> Bool
$c== :: NodeEvent -> NodeEvent -> Bool
Eq
withConnection :: SockAddr -> WithConnection
withConnection :: SockAddr -> WithConnection
withConnection na :: SockAddr
na f :: Conduits -> IO ()
f =
SockAddr -> IO (Maybe HostPort)
forall (m :: * -> *).
MonadUnliftIO m =>
SockAddr -> m (Maybe HostPort)
fromSockAddr SockAddr
na IO (Maybe HostPort) -> (Maybe HostPort -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Nothing -> PeerException -> IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO PeerException
PeerAddressInvalid
Just (host :: Host
host, port :: Int
port) -> do
let cset :: ClientSettings
cset = Int -> ByteString -> ClientSettings
clientSettings Int
port (Host -> ByteString
forall a b. ConvertibleStrings a b => a -> b
cs Host
host)
ClientSettings -> (AppData -> IO ()) -> IO ()
forall a. ClientSettings -> (AppData -> IO a) -> IO a
runTCPClient ClientSettings
cset ((AppData -> IO ()) -> IO ()) -> (AppData -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \ad :: AppData
ad ->
Conduits -> IO ()
f (ConduitT () ByteString IO ()
-> ConduitT ByteString Void IO () -> Conduits
Conduits (AppData -> ConduitT () ByteString IO ()
forall ad (m :: * -> *) i.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT i ByteString m ()
appSource AppData
ad) (AppData -> ConduitT ByteString Void IO ()
forall ad (m :: * -> *) o.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT ByteString o m ()
appSink AppData
ad))
fromSockAddr ::
(MonadUnliftIO m) => SockAddr -> m (Maybe HostPort)
fromSockAddr :: SockAddr -> m (Maybe HostPort)
fromSockAddr sa :: SockAddr
sa = m (Maybe HostPort)
go m (Maybe HostPort)
-> (SomeException -> m (Maybe HostPort)) -> m (Maybe HostPort)
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` SomeException -> m (Maybe HostPort)
forall (m :: * -> *) a. Monad m => SomeException -> m (Maybe a)
e
where
go :: m (Maybe HostPort)
go = do
(maybe_host :: Maybe Host
maybe_host, maybe_port :: Maybe Host
maybe_port) <- IO (Maybe Host, Maybe Host) -> m (Maybe Host, Maybe Host)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ([NameInfoFlag]
-> Bool -> Bool -> SockAddr -> IO (Maybe Host, Maybe Host)
getNameInfo [NameInfoFlag]
flags Bool
True Bool
True SockAddr
sa)
Maybe HostPort -> m (Maybe HostPort)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe HostPort -> m (Maybe HostPort))
-> Maybe HostPort -> m (Maybe HostPort)
forall a b. (a -> b) -> a -> b
$ (,) (Host -> Int -> HostPort) -> Maybe Host -> Maybe (Int -> HostPort)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe Host
maybe_host Maybe (Int -> HostPort) -> Maybe Int -> Maybe HostPort
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (Host -> Maybe Int
forall a. Read a => Host -> Maybe a
readMaybe (Host -> Maybe Int) -> Maybe Host -> Maybe Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe Host
maybe_port)
flags :: [NameInfoFlag]
flags = [NameInfoFlag
NI_NUMERICHOST, NameInfoFlag
NI_NUMERICSERV]
e :: Monad m => SomeException -> m (Maybe a)
e :: SomeException -> m (Maybe a)
e _ = Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
chainForwarder :: MonadLoggerIO m
=> PeerManager
-> Publisher NodeEvent
-> Inbox ChainEvent
-> m ()
chainForwarder :: PeerManager -> Publisher NodeEvent -> Inbox ChainEvent -> m ()
chainForwarder mgr :: PeerManager
mgr pub :: Publisher NodeEvent
pub inbox :: Inbox ChainEvent
inbox =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Inbox ChainEvent -> m ChainEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox ChainEvent
inbox m ChainEvent -> (ChainEvent -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \event :: ChainEvent
event -> do
case ChainEvent
event of
ChainBestBlock bb :: BlockNode
bb ->
BlockHeight -> PeerManager -> m ()
forall (m :: * -> *).
MonadIO m =>
BlockHeight -> PeerManager -> m ()
managerBest (BlockNode -> BlockHeight
nodeHeight BlockNode
bb) PeerManager
mgr
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
NodeEvent -> Publisher NodeEvent -> m ()
forall (m :: * -> *) msg. MonadIO m => msg -> Publisher msg -> m ()
publish (ChainEvent -> NodeEvent
ChainEvent ChainEvent
event) Publisher NodeEvent
pub
managerForwarder :: MonadLoggerIO m
=> Chain
-> Publisher NodeEvent
-> Inbox PeerEvent
-> m ()
managerForwarder :: Chain -> Publisher NodeEvent -> Inbox PeerEvent -> m ()
managerForwarder ch :: Chain
ch pub :: Publisher NodeEvent
pub inbox :: Inbox PeerEvent
inbox =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Inbox PeerEvent -> m PeerEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox PeerEvent
inbox m PeerEvent -> (PeerEvent -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \event :: PeerEvent
event -> do
case PeerEvent
event of
PeerConnected p :: Peer
p ->
Peer -> Chain -> m ()
forall (m :: * -> *). MonadIO m => Peer -> Chain -> m ()
chainPeerConnected Peer
p Chain
ch
PeerDisconnected p :: Peer
p ->
Peer -> Chain -> m ()
forall (m :: * -> *). MonadIO m => Peer -> Chain -> m ()
chainPeerDisconnected Peer
p Chain
ch
NodeEvent -> Publisher NodeEvent -> m ()
forall (m :: * -> *) msg. MonadIO m => msg -> Publisher msg -> m ()
publish (PeerEvent -> NodeEvent
PeerEvent PeerEvent
event) Publisher NodeEvent
pub
peerForwarder :: MonadLoggerIO m
=> Chain
-> PeerManager
-> Publisher NodeEvent
-> Inbox (Peer, Message)
-> m ()
peerForwarder :: Chain
-> PeerManager
-> Publisher NodeEvent
-> Inbox (Peer, Message)
-> m ()
peerForwarder ch :: Chain
ch mgr :: PeerManager
mgr pub :: Publisher NodeEvent
pub inbox :: Inbox (Peer, Message)
inbox =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Inbox (Peer, Message) -> m (Peer, Message)
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox (Peer, Message)
inbox m (Peer, Message) -> ((Peer, Message) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(p :: Peer
p, msg :: Message
msg) -> do
case Message
msg of
MVersion v :: Version
v ->
Peer -> Version -> PeerManager -> m ()
forall (m :: * -> *).
MonadIO m =>
Peer -> Version -> PeerManager -> m ()
managerVersion Peer
p Version
v PeerManager
mgr
MVerAck ->
Peer -> PeerManager -> m ()
forall (m :: * -> *). MonadIO m => Peer -> PeerManager -> m ()
managerVerAck Peer
p PeerManager
mgr
MPing (Ping n :: Word64
n) ->
Peer -> Word64 -> PeerManager -> m ()
forall (m :: * -> *).
MonadIO m =>
Peer -> Word64 -> PeerManager -> m ()
managerPing Peer
p Word64
n PeerManager
mgr
MPong (Pong n :: Word64
n) ->
Peer -> Word64 -> PeerManager -> m ()
forall (m :: * -> *).
MonadIO m =>
Peer -> Word64 -> PeerManager -> m ()
managerPong Peer
p Word64
n PeerManager
mgr
MAddr (Addr ns :: [NetworkAddressTime]
ns) ->
Peer -> [NetworkAddress] -> PeerManager -> m ()
forall (m :: * -> *).
MonadIO m =>
Peer -> [NetworkAddress] -> PeerManager -> m ()
managerAddrs Peer
p ((NetworkAddressTime -> NetworkAddress)
-> [NetworkAddressTime] -> [NetworkAddress]
forall a b. (a -> b) -> [a] -> [b]
map NetworkAddressTime -> NetworkAddress
forall a b. (a, b) -> b
snd [NetworkAddressTime]
ns) PeerManager
mgr
MHeaders (Headers hs :: [BlockHeaderCount]
hs) ->
Peer -> [BlockHeader] -> Chain -> m ()
forall (m :: * -> *).
MonadIO m =>
Peer -> [BlockHeader] -> Chain -> m ()
chainHeaders Peer
p ((BlockHeaderCount -> BlockHeader)
-> [BlockHeaderCount] -> [BlockHeader]
forall a b. (a -> b) -> [a] -> [b]
map BlockHeaderCount -> BlockHeader
forall a b. (a, b) -> a
fst [BlockHeaderCount]
hs) Chain
ch
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Peer -> PeerManager -> m ()
forall (m :: * -> *). MonadIO m => Peer -> PeerManager -> m ()
managerTickle Peer
p PeerManager
mgr
NodeEvent -> Publisher NodeEvent -> m ()
forall (m :: * -> *) msg. MonadIO m => msg -> Publisher msg -> m ()
publish (Peer -> Message -> NodeEvent
PeerMessage Peer
p Message
msg) Publisher NodeEvent
pub
withNode ::
( MonadLoggerIO m
, MonadUnliftIO m
)
=> NodeConfig
-> (Node -> m a)
-> m a
withNode :: NodeConfig -> (Node -> m a) -> m a
withNode cfg :: NodeConfig
cfg action :: Node -> m a
action =
(Publisher (Peer, Message) -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher (Peer, Message) -> m a) -> m a)
-> (Publisher (Peer, Message) -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \peer_pub :: Publisher (Peer, Message)
peer_pub ->
(Publisher PeerEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher PeerEvent -> m a) -> m a)
-> (Publisher PeerEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \mgr_pub :: Publisher PeerEvent
mgr_pub ->
(Publisher ChainEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher ChainEvent -> m a) -> m a)
-> (Publisher ChainEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \ch_pub :: Publisher ChainEvent
ch_pub ->
Publisher (Peer, Message) -> (Inbox (Peer, Message) -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher (Peer, Message)
peer_pub ((Inbox (Peer, Message) -> m a) -> m a)
-> (Inbox (Peer, Message) -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \peer_inbox :: Inbox (Peer, Message)
peer_inbox ->
Publisher PeerEvent -> (Inbox PeerEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher PeerEvent
mgr_pub ((Inbox PeerEvent -> m a) -> m a)
-> (Inbox PeerEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \mgr_inbox :: Inbox PeerEvent
mgr_inbox ->
Publisher ChainEvent -> (Inbox ChainEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher ChainEvent
ch_pub ((Inbox ChainEvent -> m a) -> m a)
-> (Inbox ChainEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \ch_inbox :: Inbox ChainEvent
ch_inbox ->
PeerManagerConfig -> (PeerManager -> m a) -> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
PeerManagerConfig -> (PeerManager -> m a) -> m a
withPeerManager (Publisher PeerEvent
-> Publisher (Peer, Message) -> PeerManagerConfig
mgr_config Publisher PeerEvent
mgr_pub Publisher (Peer, Message)
peer_pub) ((PeerManager -> m a) -> m a) -> (PeerManager -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \mgr :: PeerManager
mgr ->
ChainConfig -> (Chain -> m a) -> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
ChainConfig -> (Chain -> m a) -> m a
withChain (Publisher ChainEvent -> ChainConfig
chain_config Publisher ChainEvent
ch_pub) ((Chain -> m a) -> m a) -> (Chain -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \ch :: Chain
ch ->
m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (Chain
-> PeerManager
-> Publisher NodeEvent
-> Inbox (Peer, Message)
-> m ()
forall (m :: * -> *).
MonadLoggerIO m =>
Chain
-> PeerManager
-> Publisher NodeEvent
-> Inbox (Peer, Message)
-> m ()
peerForwarder Chain
ch PeerManager
mgr Publisher NodeEvent
pub Inbox (Peer, Message)
peer_inbox) ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \a :: Async ()
a ->
m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (Chain -> Publisher NodeEvent -> Inbox PeerEvent -> m ()
forall (m :: * -> *).
MonadLoggerIO m =>
Chain -> Publisher NodeEvent -> Inbox PeerEvent -> m ()
managerForwarder Chain
ch Publisher NodeEvent
pub Inbox PeerEvent
mgr_inbox) ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \b :: Async ()
b ->
m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (PeerManager -> Publisher NodeEvent -> Inbox ChainEvent -> m ()
forall (m :: * -> *).
MonadLoggerIO m =>
PeerManager -> Publisher NodeEvent -> Inbox ChainEvent -> m ()
chainForwarder PeerManager
mgr Publisher NodeEvent
pub Inbox ChainEvent
ch_inbox) ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \c :: Async ()
c ->
Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
b m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
c m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
Node -> m a
action $WNode :: PeerManager -> Chain -> Node
Node { nodeManager :: PeerManager
nodeManager = PeerManager
mgr, nodeChain :: Chain
nodeChain = Chain
ch }
where
pub :: Publisher NodeEvent
pub = NodeConfig -> Publisher NodeEvent
nodeConfEvents NodeConfig
cfg
chain_config :: Publisher ChainEvent -> ChainConfig
chain_config ch_pub :: Publisher ChainEvent
ch_pub =
$WChainConfig :: DB
-> Maybe ColumnFamily
-> Network
-> Publisher ChainEvent
-> NominalDiffTime
-> ChainConfig
ChainConfig
{ chainConfDB :: DB
chainConfDB = NodeConfig -> DB
nodeConfDB NodeConfig
cfg
, chainConfColumnFamily :: Maybe ColumnFamily
chainConfColumnFamily = NodeConfig -> Maybe ColumnFamily
nodeConfColumnFamily NodeConfig
cfg
, chainConfNetwork :: Network
chainConfNetwork = NodeConfig -> Network
nodeConfNet NodeConfig
cfg
, chainConfEvents :: Publisher ChainEvent
chainConfEvents = Publisher ChainEvent
ch_pub
, chainConfTimeout :: NominalDiffTime
chainConfTimeout = NodeConfig -> NominalDiffTime
nodeConfTimeout NodeConfig
cfg
}
mgr_config :: Publisher PeerEvent
-> Publisher (Peer, Message) -> PeerManagerConfig
mgr_config mgr_pub :: Publisher PeerEvent
mgr_pub peer_pub :: Publisher (Peer, Message)
peer_pub =
$WPeerManagerConfig :: Int
-> [HostPort]
-> Bool
-> NetworkAddress
-> Network
-> Publisher PeerEvent
-> NominalDiffTime
-> NominalDiffTime
-> (SockAddr -> WithConnection)
-> Publisher (Peer, Message)
-> PeerManagerConfig
PeerManagerConfig
{ peerManagerMaxPeers :: Int
peerManagerMaxPeers = NodeConfig -> Int
nodeConfMaxPeers NodeConfig
cfg
, peerManagerPeers :: [HostPort]
peerManagerPeers = NodeConfig -> [HostPort]
nodeConfPeers NodeConfig
cfg
, peerManagerDiscover :: Bool
peerManagerDiscover = NodeConfig -> Bool
nodeConfDiscover NodeConfig
cfg
, peerManagerNetAddr :: NetworkAddress
peerManagerNetAddr = NodeConfig -> NetworkAddress
nodeConfNetAddr NodeConfig
cfg
, peerManagerNetwork :: Network
peerManagerNetwork = NodeConfig -> Network
nodeConfNet NodeConfig
cfg
, peerManagerEvents :: Publisher PeerEvent
peerManagerEvents = Publisher PeerEvent
mgr_pub
, peerManagerMaxLife :: NominalDiffTime
peerManagerMaxLife = NodeConfig -> NominalDiffTime
nodeConfPeerMaxLife NodeConfig
cfg
, peerManagerTimeout :: NominalDiffTime
peerManagerTimeout = NodeConfig -> NominalDiffTime
nodeConfTimeout NodeConfig
cfg
, peerManagerConnect :: SockAddr -> WithConnection
peerManagerConnect = NodeConfig -> SockAddr -> WithConnection
nodeConfConnect NodeConfig
cfg
, peerManagerPub :: Publisher (Peer, Message)
peerManagerPub = Publisher (Peer, Message)
peer_pub
}