{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE UndecidableInstances #-}
module Network.Haskoin.Node.Chain
( chain
) where
import Control.Monad
import Control.Monad.Logger
import Control.Monad.Reader
import Data.String.Conversions
import Data.Text (Text)
import Data.Time.Clock
import Network.Haskoin.Block
import Network.Haskoin.Network
import Network.Haskoin.Node.Chain.Logic
import Network.Haskoin.Node.Common
import NQE
import System.Random
import UnliftIO
import UnliftIO.Concurrent
type MonadChain m
= (MonadLoggerIO m, MonadChainLogic ChainConfig Peer m)
chain ::
(MonadUnliftIO m, MonadLoggerIO m)
=> ChainConfig
-> Inbox ChainMessage
-> m ()
chain cfg inbox = do
st <-
newTVarIO
ChainState
{ syncingPeer = Nothing
, mySynced = False
, newPeers = []
}
let rd = ChainReader {myReader = cfg, myChainDB = db, chainState = st}
withSyncLoop ch $ run `runReaderT` rd
where
net = chainConfNetwork cfg
db = chainConfDB cfg
ch = inboxToMailbox inbox
run = do
$(logDebugS) "Chain" "Initializing..."
initChainDB net
getBestBlockHeader >>= chainEvent . ChainBestBlock
$(logInfoS) "Chain" "Initialization complete"
forever $ receive inbox >>= chainMessage
chainEvent :: MonadChain m => ChainEvent -> m ()
chainEvent e = do
l <- chainConfEvents <$> asks myReader
case e of
ChainBestBlock b ->
$(logInfoS) "Chain" $
"Best block header at height " <> cs (show (nodeHeight b))
ChainSynced b ->
$(logInfoS) "Chain" $
"Headers now synced at height " <> cs (show (nodeHeight b))
atomically $ l e
processHeaders ::
MonadChain m => Peer -> [BlockHeader] -> m ()
processHeaders p hs = do
s <- peerString p
net <- chainConfNetwork <$> asks myReader
mgr <- chainConfManager <$> asks myReader
$(logDebugS) "Chain" $
"Importing " <> cs (show (length hs)) <> " headers from peer " <> s
importHeaders net hs >>= \case
Left e -> do
$(logErrorS) "Chain" $
"Could not connect headers sent by peer " <> s <> ": " <>
cs (show e)
managerKill e p mgr
Right done -> do
setLastReceived
best <- getBestBlockHeader
chainEvent $ ChainBestBlock best
if done
then do
$(logDebugS) "Chain" $
"Finished importing headers from peer: " <> s
MSendHeaders `sendMessage` p
finishPeer p
syncNewPeer
syncNotif
else syncPeer p
syncNewPeer :: MonadChain m => m ()
syncNewPeer = do
$(logDebugS) "Chain" "Attempting to sync against a new peer"
getSyncingPeer >>= \case
Nothing -> do
$(logDebugS) "Chain" "Getting next peer to sync from"
nextPeer >>= \case
Nothing ->
$(logInfoS) "Chain" "Finished syncing against all peers"
Just p -> syncPeer p
Just p -> do
s <- peerString p
$(logDebugS) "Chain" $ "Already syncing against peer " <> s
syncNotif :: MonadChain m => m ()
syncNotif =
notifySynced >>= \x ->
when x $ getBestBlockHeader >>= chainEvent . ChainSynced
syncPeer :: MonadChain m => Peer -> m ()
syncPeer p = do
s <- peerString p
$(logInfoS) "Chain" $ "Syncing against peer " <> s
bb <- getBestBlockHeader
gh <- syncHeaders bb p
MGetHeaders gh `sendMessage` p
chainMessage :: MonadChain m => ChainMessage -> m ()
chainMessage (ChainGetBest reply) =
getBestBlockHeader >>= atomically . reply
chainMessage (ChainHeaders p hs) = do
s <- peerString p
$(logDebugS) "Chain" $
"Processing " <> cs (show (length hs)) <> " headers from peer " <> s
processHeaders p hs
chainMessage (ChainPeerConnected p a) = do
$(logDebugS) "Chain" $ "Adding new peer to sync queue: " <> cs (show a)
addPeer p
syncNewPeer
chainMessage (ChainPeerDisconnected p a) = do
$(logWarnS) "Chain" $ "Removing a peer from sync queue: " <> cs (show a)
finishPeer p
syncNewPeer
chainMessage (ChainGetAncestor h n reply) =
getAncestor h n >>= atomically . reply
chainMessage (ChainGetSplit r l reply) =
splitPoint r l >>= atomically . reply
chainMessage (ChainGetBlock h reply) =
getBlockHeader h >>= atomically . reply
chainMessage (ChainIsSynced reply) =
isSynced >>= atomically . reply
chainMessage ChainPing = do
ChainConfig {chainConfManager = mgr, chainConfTimeout = to} <- asks myReader
now <- liftIO getCurrentTime
lastMessage >>= \case
Nothing -> return ()
Just (p, t)
| diffUTCTime now t > fromIntegral to -> do
s <- peerString p
$(logErrorS) "Chain" $ "Syncing peer timed out: " <> s
managerKill PeerTimeout p mgr
| otherwise -> return ()
withSyncLoop :: (MonadUnliftIO m, MonadLoggerIO m) => Chain -> m a -> m a
withSyncLoop ch f = withAsync go $ \a -> link a >> f
where
go =
forever $ do
threadDelay =<<
liftIO (randomRIO (250 * 1000, 1000 * 1000))
ChainPing `send` ch
peerString :: MonadChain m => Peer -> m Text
peerString p = do
ChainConfig {chainConfManager = mgr} <- asks myReader
managerGetPeer p mgr >>= \case
Nothing -> return "[unknown]"
Just o -> return . cs . show $ onlinePeerAddress o