{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
module Haskoin.Node
( module Haskoin.Node.Peer
, module Haskoin.Node.Manager
, module Haskoin.Node.Chain
, NodeConfig (..)
, NodeEvent (..)
, Node (..)
, withNode
, withConnection
) where
import Control.Monad (forever)
import Control.Monad.Logger (MonadLoggerIO)
import Data.Conduit.Network (appSink, appSource, clientSettings,
runTCPClient, ClientSettings)
import Data.String.Conversions (cs)
import Data.Time.Clock (NominalDiffTime)
import Database.RocksDB (ColumnFamily, DB)
import Haskoin (Addr (..), BlockNode (..),
Headers (..), Message (..), Network,
NetworkAddress, Ping (..), Pong (..))
import Haskoin.Node.Chain
import Haskoin.Node.Manager
import Haskoin.Node.Peer
import Network.Socket (NameInfoFlag (..), SockAddr,
getNameInfo)
import NQE (Inbox, Publisher, publish, receive,
withPublisher, withSubscription)
import Text.Read (readMaybe)
import UnliftIO (MonadUnliftIO, SomeException, catch,
liftIO, link, throwIO, withAsync)
data NodeConfig = NodeConfig
{ NodeConfig -> Int
nodeConfMaxPeers :: !Int
, NodeConfig -> DB
nodeConfDB :: !DB
, NodeConfig -> Maybe ColumnFamily
nodeConfColumnFamily :: !(Maybe ColumnFamily)
, NodeConfig -> [String]
nodeConfPeers :: ![String]
, NodeConfig -> Bool
nodeConfDiscover :: !Bool
, NodeConfig -> NetworkAddress
nodeConfNetAddr :: !NetworkAddress
, NodeConfig -> Network
nodeConfNet :: !Network
, NodeConfig -> Publisher NodeEvent
nodeConfEvents :: !(Publisher NodeEvent)
, NodeConfig -> NominalDiffTime
nodeConfTimeout :: !NominalDiffTime
, NodeConfig -> NominalDiffTime
nodeConfPeerMaxLife :: !NominalDiffTime
, NodeConfig -> SockAddr -> WithConnection
nodeConfConnect :: !(SockAddr -> WithConnection)
}
data Node = Node { Node -> PeerManager
nodeManager :: !PeerManager
, Node -> Chain
nodeChain :: !Chain
}
data NodeEvent
= ChainEvent !ChainEvent
| PeerEvent !PeerEvent
| PeerMessage !Peer !Message
deriving NodeEvent -> NodeEvent -> Bool
(NodeEvent -> NodeEvent -> Bool)
-> (NodeEvent -> NodeEvent -> Bool) -> Eq NodeEvent
forall a. (a -> a -> Bool) -> (a -> a -> Bool) -> Eq a
/= :: NodeEvent -> NodeEvent -> Bool
$c/= :: NodeEvent -> NodeEvent -> Bool
== :: NodeEvent -> NodeEvent -> Bool
$c== :: NodeEvent -> NodeEvent -> Bool
Eq
withConnection :: SockAddr -> WithConnection
withConnection :: SockAddr -> WithConnection
withConnection SockAddr
na Conduits -> IO ()
f =
SockAddr -> IO (Maybe ClientSettings)
forall (m :: * -> *).
MonadUnliftIO m =>
SockAddr -> m (Maybe ClientSettings)
fromSockAddr SockAddr
na IO (Maybe ClientSettings)
-> (Maybe ClientSettings -> IO ()) -> IO ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \case
Maybe ClientSettings
Nothing -> PeerException -> IO ()
forall (m :: * -> *) e a. (MonadIO m, Exception e) => e -> m a
throwIO PeerException
PeerAddressInvalid
Just ClientSettings
cset ->
ClientSettings -> (AppData -> IO ()) -> IO ()
forall a. ClientSettings -> (AppData -> IO a) -> IO a
runTCPClient ClientSettings
cset ((AppData -> IO ()) -> IO ()) -> (AppData -> IO ()) -> IO ()
forall a b. (a -> b) -> a -> b
$ \AppData
ad ->
Conduits -> IO ()
f (ConduitT () ByteString IO ()
-> ConduitT ByteString Void IO () -> Conduits
Conduits (AppData -> ConduitT () ByteString IO ()
forall ad (m :: * -> *) i.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT i ByteString m ()
appSource AppData
ad) (AppData -> ConduitT ByteString Void IO ()
forall ad (m :: * -> *) o.
(HasReadWrite ad, MonadIO m) =>
ad -> ConduitT ByteString o m ()
appSink AppData
ad))
fromSockAddr ::
(MonadUnliftIO m) => SockAddr -> m (Maybe ClientSettings)
fromSockAddr :: forall (m :: * -> *).
MonadUnliftIO m =>
SockAddr -> m (Maybe ClientSettings)
fromSockAddr SockAddr
sa = m (Maybe ClientSettings)
go m (Maybe ClientSettings)
-> (SomeException -> m (Maybe ClientSettings))
-> m (Maybe ClientSettings)
forall (m :: * -> *) e a.
(MonadUnliftIO m, Exception e) =>
m a -> (e -> m a) -> m a
`catch` SomeException -> m (Maybe ClientSettings)
forall (m :: * -> *) a. Monad m => SomeException -> m (Maybe a)
e
where
go :: m (Maybe ClientSettings)
go = do
(Maybe String
maybe_host, Maybe String
maybe_port) <- IO (Maybe String, Maybe String) -> m (Maybe String, Maybe String)
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO ([NameInfoFlag]
-> Bool -> Bool -> SockAddr -> IO (Maybe String, Maybe String)
getNameInfo [NameInfoFlag]
flags Bool
True Bool
True SockAddr
sa)
Maybe ClientSettings -> m (Maybe ClientSettings)
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe ClientSettings -> m (Maybe ClientSettings))
-> Maybe ClientSettings -> m (Maybe ClientSettings)
forall a b. (a -> b) -> a -> b
$
Int -> ByteString -> ClientSettings
clientSettings
(Int -> ByteString -> ClientSettings)
-> Maybe Int -> Maybe (ByteString -> ClientSettings)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> (String -> Maybe Int
forall a. Read a => String -> Maybe a
readMaybe (String -> Maybe Int) -> Maybe String -> Maybe Int
forall (m :: * -> *) a b. Monad m => (a -> m b) -> m a -> m b
=<< Maybe String
maybe_port)
Maybe (ByteString -> ClientSettings)
-> Maybe ByteString -> Maybe ClientSettings
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> (String -> ByteString
forall a b. ConvertibleStrings a b => a -> b
cs (String -> ByteString) -> Maybe String -> Maybe ByteString
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Maybe String
maybe_host)
flags :: [NameInfoFlag]
flags = [NameInfoFlag
NI_NUMERICHOST, NameInfoFlag
NI_NUMERICSERV]
e :: Monad m => SomeException -> m (Maybe a)
e :: forall (m :: * -> *) a. Monad m => SomeException -> m (Maybe a)
e SomeException
_ = Maybe a -> m (Maybe a)
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
chainForwarder :: MonadLoggerIO m
=> PeerManager
-> Publisher NodeEvent
-> Inbox ChainEvent
-> m ()
chainForwarder :: forall (m :: * -> *).
MonadLoggerIO m =>
PeerManager -> Publisher NodeEvent -> Inbox ChainEvent -> m ()
chainForwarder PeerManager
mgr Publisher NodeEvent
pub Inbox ChainEvent
inbox =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Inbox ChainEvent -> m ChainEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox ChainEvent
inbox m ChainEvent -> (ChainEvent -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \ChainEvent
event -> do
case ChainEvent
event of
ChainBestBlock BlockNode
bb ->
BlockHeight -> PeerManager -> m ()
forall (m :: * -> *).
MonadIO m =>
BlockHeight -> PeerManager -> m ()
managerBest (BlockNode -> BlockHeight
nodeHeight BlockNode
bb) PeerManager
mgr
ChainEvent
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
NodeEvent -> Publisher NodeEvent -> m ()
forall (m :: * -> *) msg. MonadIO m => msg -> Publisher msg -> m ()
publish (ChainEvent -> NodeEvent
ChainEvent ChainEvent
event) Publisher NodeEvent
pub
managerForwarder :: MonadLoggerIO m
=> Chain
-> Publisher NodeEvent
-> Inbox PeerEvent
-> m ()
managerForwarder :: forall (m :: * -> *).
MonadLoggerIO m =>
Chain -> Publisher NodeEvent -> Inbox PeerEvent -> m ()
managerForwarder Chain
ch Publisher NodeEvent
pub Inbox PeerEvent
inbox =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Inbox PeerEvent -> m PeerEvent
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox PeerEvent
inbox m PeerEvent -> (PeerEvent -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \PeerEvent
event -> do
case PeerEvent
event of
PeerConnected Peer
p ->
Peer -> Chain -> m ()
forall (m :: * -> *). MonadIO m => Peer -> Chain -> m ()
chainPeerConnected Peer
p Chain
ch
PeerDisconnected Peer
p ->
Peer -> Chain -> m ()
forall (m :: * -> *). MonadIO m => Peer -> Chain -> m ()
chainPeerDisconnected Peer
p Chain
ch
NodeEvent -> Publisher NodeEvent -> m ()
forall (m :: * -> *) msg. MonadIO m => msg -> Publisher msg -> m ()
publish (PeerEvent -> NodeEvent
PeerEvent PeerEvent
event) Publisher NodeEvent
pub
peerForwarder :: MonadLoggerIO m
=> Chain
-> PeerManager
-> Publisher NodeEvent
-> Inbox (Peer, Message)
-> m ()
peerForwarder :: forall (m :: * -> *).
MonadLoggerIO m =>
Chain
-> PeerManager
-> Publisher NodeEvent
-> Inbox (Peer, Message)
-> m ()
peerForwarder Chain
ch PeerManager
mgr Publisher NodeEvent
pub Inbox (Peer, Message)
inbox =
m () -> m ()
forall (f :: * -> *) a b. Applicative f => f a -> f b
forever (m () -> m ()) -> m () -> m ()
forall a b. (a -> b) -> a -> b
$ Inbox (Peer, Message) -> m (Peer, Message)
forall (mbox :: * -> *) (m :: * -> *) msg.
(InChan mbox, MonadIO m) =>
mbox msg -> m msg
receive Inbox (Peer, Message)
inbox m (Peer, Message) -> ((Peer, Message) -> m ()) -> m ()
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= \(Peer
p, Message
msg) -> do
case Message
msg of
MVersion Version
v ->
Peer -> Version -> PeerManager -> m ()
forall (m :: * -> *).
MonadIO m =>
Peer -> Version -> PeerManager -> m ()
managerVersion Peer
p Version
v PeerManager
mgr
Message
MVerAck ->
Peer -> PeerManager -> m ()
forall (m :: * -> *). MonadIO m => Peer -> PeerManager -> m ()
managerVerAck Peer
p PeerManager
mgr
MPing (Ping Word64
n) ->
Peer -> Word64 -> PeerManager -> m ()
forall (m :: * -> *).
MonadIO m =>
Peer -> Word64 -> PeerManager -> m ()
managerPing Peer
p Word64
n PeerManager
mgr
MPong (Pong Word64
n) ->
Peer -> Word64 -> PeerManager -> m ()
forall (m :: * -> *).
MonadIO m =>
Peer -> Word64 -> PeerManager -> m ()
managerPong Peer
p Word64
n PeerManager
mgr
MAddr (Addr [NetworkAddressTime]
ns) ->
Peer -> [NetworkAddress] -> PeerManager -> m ()
forall (m :: * -> *).
MonadIO m =>
Peer -> [NetworkAddress] -> PeerManager -> m ()
managerAddrs Peer
p ((NetworkAddressTime -> NetworkAddress)
-> [NetworkAddressTime] -> [NetworkAddress]
forall a b. (a -> b) -> [a] -> [b]
map NetworkAddressTime -> NetworkAddress
forall a b. (a, b) -> b
snd [NetworkAddressTime]
ns) PeerManager
mgr
MHeaders (Headers [BlockHeaderCount]
hs) ->
Peer -> [BlockHeader] -> Chain -> m ()
forall (m :: * -> *).
MonadIO m =>
Peer -> [BlockHeader] -> Chain -> m ()
chainHeaders Peer
p ((BlockHeaderCount -> BlockHeader)
-> [BlockHeaderCount] -> [BlockHeader]
forall a b. (a -> b) -> [a] -> [b]
map BlockHeaderCount -> BlockHeader
forall a b. (a, b) -> a
fst [BlockHeaderCount]
hs) Chain
ch
Message
_ -> () -> m ()
forall (m :: * -> *) a. Monad m => a -> m a
return ()
Peer -> PeerManager -> m ()
forall (m :: * -> *). MonadIO m => Peer -> PeerManager -> m ()
managerTickle Peer
p PeerManager
mgr
NodeEvent -> Publisher NodeEvent -> m ()
forall (m :: * -> *) msg. MonadIO m => msg -> Publisher msg -> m ()
publish (Peer -> Message -> NodeEvent
PeerMessage Peer
p Message
msg) Publisher NodeEvent
pub
withNode ::
( MonadLoggerIO m
, MonadUnliftIO m
)
=> NodeConfig
-> (Node -> m a)
-> m a
withNode :: forall (m :: * -> *) a.
(MonadLoggerIO m, MonadUnliftIO m) =>
NodeConfig -> (Node -> m a) -> m a
withNode NodeConfig
cfg Node -> m a
action =
(Publisher (Peer, Message) -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher (Peer, Message) -> m a) -> m a)
-> (Publisher (Peer, Message) -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Publisher (Peer, Message)
peer_pub ->
(Publisher PeerEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher PeerEvent -> m a) -> m a)
-> (Publisher PeerEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Publisher PeerEvent
mgr_pub ->
(Publisher ChainEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
(Publisher msg -> m a) -> m a
withPublisher ((Publisher ChainEvent -> m a) -> m a)
-> (Publisher ChainEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Publisher ChainEvent
ch_pub ->
Publisher (Peer, Message) -> (Inbox (Peer, Message) -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher (Peer, Message)
peer_pub ((Inbox (Peer, Message) -> m a) -> m a)
-> (Inbox (Peer, Message) -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Inbox (Peer, Message)
peer_inbox ->
Publisher PeerEvent -> (Inbox PeerEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher PeerEvent
mgr_pub ((Inbox PeerEvent -> m a) -> m a)
-> (Inbox PeerEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Inbox PeerEvent
mgr_inbox ->
Publisher ChainEvent -> (Inbox ChainEvent -> m a) -> m a
forall (m :: * -> *) msg a.
MonadUnliftIO m =>
Publisher msg -> (Inbox msg -> m a) -> m a
withSubscription Publisher ChainEvent
ch_pub ((Inbox ChainEvent -> m a) -> m a)
-> (Inbox ChainEvent -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Inbox ChainEvent
ch_inbox ->
PeerManagerConfig -> (PeerManager -> m a) -> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
PeerManagerConfig -> (PeerManager -> m a) -> m a
withPeerManager (Publisher PeerEvent
-> Publisher (Peer, Message) -> PeerManagerConfig
mgr_config Publisher PeerEvent
mgr_pub Publisher (Peer, Message)
peer_pub) ((PeerManager -> m a) -> m a) -> (PeerManager -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \PeerManager
mgr ->
ChainConfig -> (Chain -> m a) -> m a
forall (m :: * -> *) a.
(MonadUnliftIO m, MonadLoggerIO m) =>
ChainConfig -> (Chain -> m a) -> m a
withChain (Publisher ChainEvent -> ChainConfig
chain_config Publisher ChainEvent
ch_pub) ((Chain -> m a) -> m a) -> (Chain -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Chain
ch ->
m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (Chain
-> PeerManager
-> Publisher NodeEvent
-> Inbox (Peer, Message)
-> m ()
forall (m :: * -> *).
MonadLoggerIO m =>
Chain
-> PeerManager
-> Publisher NodeEvent
-> Inbox (Peer, Message)
-> m ()
peerForwarder Chain
ch PeerManager
mgr Publisher NodeEvent
pub Inbox (Peer, Message)
peer_inbox) ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async ()
a ->
m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (Chain -> Publisher NodeEvent -> Inbox PeerEvent -> m ()
forall (m :: * -> *).
MonadLoggerIO m =>
Chain -> Publisher NodeEvent -> Inbox PeerEvent -> m ()
managerForwarder Chain
ch Publisher NodeEvent
pub Inbox PeerEvent
mgr_inbox) ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async ()
b ->
m () -> (Async () -> m a) -> m a
forall (m :: * -> *) a b.
MonadUnliftIO m =>
m a -> (Async a -> m b) -> m b
withAsync (PeerManager -> Publisher NodeEvent -> Inbox ChainEvent -> m ()
forall (m :: * -> *).
MonadLoggerIO m =>
PeerManager -> Publisher NodeEvent -> Inbox ChainEvent -> m ()
chainForwarder PeerManager
mgr Publisher NodeEvent
pub Inbox ChainEvent
ch_inbox) ((Async () -> m a) -> m a) -> (Async () -> m a) -> m a
forall a b. (a -> b) -> a -> b
$ \Async ()
c ->
Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
a m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
b m () -> m () -> m ()
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Async () -> m ()
forall (m :: * -> *) a. MonadIO m => Async a -> m ()
link Async ()
c m () -> m a -> m a
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>>
Node -> m a
action Node :: PeerManager -> Chain -> Node
Node { nodeManager :: PeerManager
nodeManager = PeerManager
mgr, nodeChain :: Chain
nodeChain = Chain
ch }
where
pub :: Publisher NodeEvent
pub = NodeConfig -> Publisher NodeEvent
nodeConfEvents NodeConfig
cfg
chain_config :: Publisher ChainEvent -> ChainConfig
chain_config Publisher ChainEvent
ch_pub =
ChainConfig :: DB
-> Maybe ColumnFamily
-> Network
-> Publisher ChainEvent
-> NominalDiffTime
-> ChainConfig
ChainConfig
{ chainConfDB :: DB
chainConfDB = NodeConfig -> DB
nodeConfDB NodeConfig
cfg
, chainConfColumnFamily :: Maybe ColumnFamily
chainConfColumnFamily = NodeConfig -> Maybe ColumnFamily
nodeConfColumnFamily NodeConfig
cfg
, chainConfNetwork :: Network
chainConfNetwork = NodeConfig -> Network
nodeConfNet NodeConfig
cfg
, chainConfEvents :: Publisher ChainEvent
chainConfEvents = Publisher ChainEvent
ch_pub
, chainConfTimeout :: NominalDiffTime
chainConfTimeout = NodeConfig -> NominalDiffTime
nodeConfTimeout NodeConfig
cfg
}
mgr_config :: Publisher PeerEvent
-> Publisher (Peer, Message) -> PeerManagerConfig
mgr_config Publisher PeerEvent
mgr_pub Publisher (Peer, Message)
peer_pub =
PeerManagerConfig :: Int
-> [String]
-> Bool
-> NetworkAddress
-> Network
-> Publisher PeerEvent
-> NominalDiffTime
-> NominalDiffTime
-> (SockAddr -> WithConnection)
-> Publisher (Peer, Message)
-> PeerManagerConfig
PeerManagerConfig
{ peerManagerMaxPeers :: Int
peerManagerMaxPeers = NodeConfig -> Int
nodeConfMaxPeers NodeConfig
cfg
, peerManagerPeers :: [String]
peerManagerPeers = NodeConfig -> [String]
nodeConfPeers NodeConfig
cfg
, peerManagerDiscover :: Bool
peerManagerDiscover = NodeConfig -> Bool
nodeConfDiscover NodeConfig
cfg
, peerManagerNetAddr :: NetworkAddress
peerManagerNetAddr = NodeConfig -> NetworkAddress
nodeConfNetAddr NodeConfig
cfg
, peerManagerNetwork :: Network
peerManagerNetwork = NodeConfig -> Network
nodeConfNet NodeConfig
cfg
, peerManagerEvents :: Publisher PeerEvent
peerManagerEvents = Publisher PeerEvent
mgr_pub
, peerManagerMaxLife :: NominalDiffTime
peerManagerMaxLife = NodeConfig -> NominalDiffTime
nodeConfPeerMaxLife NodeConfig
cfg
, peerManagerTimeout :: NominalDiffTime
peerManagerTimeout = NodeConfig -> NominalDiffTime
nodeConfTimeout NodeConfig
cfg
, peerManagerConnect :: SockAddr -> WithConnection
peerManagerConnect = NodeConfig -> SockAddr -> WithConnection
nodeConfConnect NodeConfig
cfg
, peerManagerPub :: Publisher (Peer, Message)
peerManagerPub = Publisher (Peer, Message)
peer_pub
}