module Network.Haskoin.Node.STM where
import Control.Concurrent (ThreadId)
import Control.Concurrent.STM (STM, TMVar, TVar, atomically,
isEmptyTMVar, modifyTVar',
newEmptyTMVarIO, newTVar,
newTVarIO, orElse, putTMVar,
readTMVar, readTVar,
takeTMVar, tryPutTMVar,
tryReadTMVar, writeTVar)
import Control.Concurrent.STM.Lock (Lock)
import qualified Control.Concurrent.STM.Lock as Lock (new)
import Control.Concurrent.STM.TBMChan (TBMChan, closeTBMChan,
newTBMChan)
import Control.DeepSeq (NFData (..))
import Control.Exception.Lifted (Exception, SomeException,
catch, fromException, throw)
import Control.Monad ((<=<))
import Control.Monad.Logger (MonadLoggerIO, logDebug)
import Control.Monad.Reader (ReaderT, ask, asks,
runReaderT)
import Control.Monad.Trans (MonadIO, lift, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Aeson.TH (deriveJSON)
import qualified Data.Map.Strict as M (Map, delete, empty,
insert, lookup)
import Data.Maybe (isJust)
import Data.Time.Clock (NominalDiffTime)
import Data.Typeable (Typeable)
import Data.Unique (Unique, hashUnique)
import Data.Word (Word32, Word64)
import Database.Persist.Sql (ConnectionPool, SqlBackend,
SqlPersistT, runSqlConn,
runSqlPool)
import Network.Haskoin.Block
import Network.Haskoin.Node
import Network.Haskoin.Node.HeaderTree
import Network.Haskoin.Transaction
import Network.Haskoin.Util
type MerkleTxs = [TxHash]
type NodeT = ReaderT SharedNodeState
type PeerId = Unique
type PeerHostScore = Word32
newtype ShowPeerId = ShowPeerId { getShowPeerId :: PeerId }
deriving (Eq)
instance Show ShowPeerId where
show = show . hashUnique . getShowPeerId
runSql :: (MonadBaseControl IO m)
=> SqlPersistT m a
-> Either SqlBackend ConnectionPool
-> m a
runSql f (Left conn) = runSqlConn f conn
runSql f (Right pool) = runSqlPool f pool
runSqlNodeT :: (MonadBaseControl IO m) => SqlPersistT m a -> NodeT m a
runSqlNodeT f = asks sharedSqlBackend >>= lift . runSql f
getNodeState :: (MonadLoggerIO m, MonadBaseControl IO m)
=> Either SqlBackend ConnectionPool
-> m SharedNodeState
getNodeState sharedSqlBackend = do
$(logDebug) "Initializing the HeaderTree and NodeState"
best <- runSql (initHeaderTree >> getBestBlock) sharedSqlBackend
liftIO $ do
sharedPeerMap <- newTVarIO M.empty
sharedHostMap <- newTVarIO M.empty
sharedNetworkHeight <- newTVarIO 0
sharedHeaders <- newEmptyTMVarIO
sharedHeaderPeer <- newTVarIO Nothing
sharedMerklePeer <- newTVarIO Nothing
sharedSyncLock <- atomically Lock.new
sharedTickleChan <- atomically $ newTBMChan 1024
sharedTxChan <- atomically $ newTBMChan 1024
sharedTxGetData <- newTVarIO M.empty
sharedRescan <- newEmptyTMVarIO
sharedMempool <- newTVarIO False
sharedBloomFilter <- newTVarIO Nothing
sharedBestHeader <- newTVarIO best
sharedBestBlock <- newTVarIO genesisBlock
return SharedNodeState{..}
runNodeT :: Monad m => NodeT m a -> SharedNodeState -> m a
runNodeT = runReaderT
withNodeT :: (MonadLoggerIO m, MonadBaseControl IO m)
=> NodeT m a
-> Either SqlBackend ConnectionPool
-> m a
withNodeT action sql = runNodeT action =<< getNodeState sql
atomicallyNodeT :: MonadIO m => NodeT STM a -> NodeT m a
atomicallyNodeT action = liftIO . atomically . runReaderT action =<< ask
data PeerHostSession = PeerHostSession
{ peerHostSessionScore :: !PeerHostScore
, peerHostSessionReconnect :: !Int
, peerHostSessionLog :: ![String]
}
instance NFData PeerHostSession where
rnf PeerHostSession{..} =
rnf peerHostSessionScore `seq`
rnf peerHostSessionReconnect `seq`
rnf peerHostSessionLog
data SharedNodeState = SharedNodeState
{ sharedPeerMap :: !(TVar (M.Map PeerId (TVar PeerSession)))
, sharedHostMap :: !(TVar (M.Map PeerHost (TVar PeerHostSession)))
, sharedNetworkHeight :: !(TVar BlockHeight)
, sharedHeaders :: !(TMVar (PeerId, Headers))
, sharedHeaderPeer :: !(TVar (Maybe PeerId))
, sharedMerklePeer :: !(TVar (Maybe PeerId))
, sharedSyncLock :: !Lock
, sharedBestHeader :: !(TVar NodeBlock)
, sharedBestBlock :: !(TVar NodeBlock)
, sharedTxGetData :: !(TVar (M.Map TxHash [(PeerId, PeerHost)]))
, sharedBloomFilter :: !(TVar (Maybe (BloomFilter, Int)))
, sharedTickleChan :: !(TBMChan (PeerId, PeerHost, BlockHash))
, sharedTxChan :: !(TBMChan (PeerId, PeerHost, Tx))
, sharedRescan :: !(TMVar (Either Timestamp BlockHeight))
, sharedMempool :: !(TVar Bool)
, sharedSqlBackend :: !(Either SqlBackend ConnectionPool)
}
type PingNonce = Word64
data PeerSession = PeerSession
{ peerSessionConnected :: !Bool
, peerSessionVersion :: !(Maybe Version)
, peerSessionHeight :: !BlockHeight
, peerSessionChan :: !(TBMChan Message)
, peerSessionHost :: !PeerHost
, peerSessionThreadId :: !ThreadId
, peerSessionMerkleChan :: !(TBMChan (Either (MerkleBlock, MerkleTxs) Tx))
, peerSessionPings :: !(TVar [PingNonce])
, peerSessionScore :: !(Maybe NominalDiffTime)
}
instance NFData PeerSession where
rnf PeerSession{..} =
rnf peerSessionConnected `seq`
rnf peerSessionVersion `seq`
rnf peerSessionHeight `seq`
peerSessionChan `seq`
rnf peerSessionHost `seq`
peerSessionThreadId `seq` ()
data PeerHost = PeerHost
{ peerHost :: !String
, peerPort :: !Int
}
deriving (Eq, Ord)
$(deriveJSON (dropFieldLabel 4) ''PeerHost)
peerHostString :: PeerHost -> String
peerHostString PeerHost{..} = concat [ peerHost, ":", show peerPort ]
instance NFData PeerHost where
rnf PeerHost{..} =
rnf peerHost `seq`
rnf peerPort
data PeerStatus = PeerStatus
{ peerStatusPeerId :: !Int
, peerStatusHost :: !PeerHost
, peerStatusConnected :: !Bool
, peerStatusHeight :: !BlockHeight
, peerStatusProtocol :: !(Maybe Word32)
, peerStatusUserAgent :: !(Maybe String)
, peerStatusPing :: !(Maybe String)
, peerStatusDoSScore :: !(Maybe PeerHostScore)
, peerStatusHaveMerkles :: !Bool
, peerStatusHaveMessage :: !Bool
, peerStatusPingNonces :: ![PingNonce]
, peerStatusReconnectTimer :: !(Maybe Int)
, peerStatusLog :: !(Maybe [String])
}
$(deriveJSON (dropFieldLabel 10) ''PeerStatus)
data NodeStatus = NodeStatus
{ nodeStatusPeers :: ![PeerStatus]
, nodeStatusNetworkHeight :: !BlockHeight
, nodeStatusBestHeader :: !BlockHash
, nodeStatusBestHeaderHeight :: !BlockHeight
, nodeStatusBestBlock :: !BlockHash
, nodeStatusBestBlockHeight :: !BlockHeight
, nodeStatusBloomSize :: !Int
, nodeStatusHeaderPeer :: !(Maybe Int)
, nodeStatusMerklePeer :: !(Maybe Int)
, nodeStatusHaveHeaders :: !Bool
, nodeStatusHaveTickles :: !Bool
, nodeStatusHaveTxs :: !Bool
, nodeStatusGetData :: ![TxHash]
, nodeStatusRescan :: !(Maybe (Either Timestamp BlockHeight))
, nodeStatusMempool :: !Bool
, nodeStatusSyncLock :: !Bool
}
$(deriveJSON (dropFieldLabel 10) ''NodeStatus)
tryGetPeerSession :: PeerId -> NodeT STM (Maybe PeerSession)
tryGetPeerSession pid = do
peerMap <- readTVarS sharedPeerMap
case M.lookup pid peerMap of
Just sessTVar -> fmap Just $ lift $ readTVar sessTVar
_ -> return Nothing
getPeerSession :: PeerId -> NodeT STM PeerSession
getPeerSession pid = do
sessM <- tryGetPeerSession pid
case sessM of
Just sess -> return sess
_ -> throw $ NodeExceptionInvalidPeer $ ShowPeerId pid
newPeerSession :: PeerId -> PeerSession -> NodeT STM ()
newPeerSession pid sess = do
peerMapTVar <- asks sharedPeerMap
peerMap <- lift $ readTVar peerMapTVar
case M.lookup pid peerMap of
Just _ -> return ()
Nothing -> do
sessTVar <- lift $ newTVar sess
let newMap = M.insert pid sessTVar peerMap
lift $ writeTVar peerMapTVar $! newMap
modifyPeerSession :: PeerId -> (PeerSession -> PeerSession) -> NodeT STM ()
modifyPeerSession pid f = do
peerMap <- readTVarS sharedPeerMap
case M.lookup pid peerMap of
Just sessTVar -> lift $ modifyTVar' sessTVar f
_ -> return ()
removePeerSession :: PeerId -> NodeT STM (Maybe PeerSession)
removePeerSession pid = do
peerMapTVar <- asks sharedPeerMap
peerMap <- lift $ readTVar peerMapTVar
sessM <- case M.lookup pid peerMap of
Just sessTVar -> lift $ do
sess@PeerSession{..} <- readTVar sessTVar
closeTBMChan peerSessionChan
return $ Just sess
_ -> return Nothing
let newMap = M.delete pid peerMap
lift $ writeTVar peerMapTVar $! newMap
return sessM
getHostSession :: PeerHost
-> NodeT STM (Maybe PeerHostSession)
getHostSession ph = do
hostMap <- readTVarS sharedHostMap
lift $ case M.lookup ph hostMap of
Just hostSessionTVar -> Just <$> readTVar hostSessionTVar
_ -> return Nothing
modifyHostSession :: PeerHost
-> (PeerHostSession -> PeerHostSession)
-> NodeT STM ()
modifyHostSession ph f = do
hostMap <- readTVarS sharedHostMap
case M.lookup ph hostMap of
Just hostSessionTVar -> lift $ modifyTVar' hostSessionTVar f
_ -> newHostSession ph $!
f PeerHostSession { peerHostSessionScore = 0
, peerHostSessionReconnect = 1
, peerHostSessionLog = []
}
newHostSession :: PeerHost -> PeerHostSession -> NodeT STM ()
newHostSession ph session = do
hostMapTVar <- asks sharedHostMap
hostMap <- lift $ readTVar hostMapTVar
case M.lookup ph hostMap of
Just _ -> return ()
Nothing -> lift $ do
hostSessionTVar <- newTVar session
let newHostMap = M.insert ph hostSessionTVar hostMap
writeTVar hostMapTVar $! newHostMap
bannedScore :: PeerHostScore
bannedScore = 100
minorDoS :: PeerHostScore -> PeerHostScore
minorDoS = (+ 1)
moderateDoS :: PeerHostScore -> PeerHostScore
moderateDoS = (+ 10)
severeDoS :: PeerHostScore -> PeerHostScore
severeDoS = (+ bannedScore)
isHostScoreBanned :: PeerHostScore -> Bool
isHostScoreBanned = (>= bannedScore)
orElseNodeT :: NodeT STM a -> NodeT STM a -> NodeT STM a
orElseNodeT a b = do
s <- ask
lift $ runNodeT a s `orElse` runNodeT b s
readTVarS :: (SharedNodeState -> TVar a) -> NodeT STM a
readTVarS = lift . readTVar <=< asks
writeTVarS :: (SharedNodeState -> TVar a) -> a -> NodeT STM ()
writeTVarS f val = lift . flip writeTVar val =<< asks f
takeTMVarS :: (SharedNodeState -> TMVar a) -> NodeT STM a
takeTMVarS = lift . takeTMVar <=< asks
readTMVarS :: (SharedNodeState -> TMVar a) -> NodeT STM a
readTMVarS = lift . readTMVar <=< asks
tryReadTMVarS :: (SharedNodeState -> TMVar a) -> NodeT STM (Maybe a)
tryReadTMVarS = lift . tryReadTMVar <=< asks
putTMVarS :: (SharedNodeState -> TMVar a) -> a -> NodeT STM ()
putTMVarS f val = lift . flip putTMVar val =<< asks f
tryPutTMVarS :: (SharedNodeState -> TMVar a) -> a -> NodeT STM Bool
tryPutTMVarS f val = lift . flip tryPutTMVar val =<< asks f
swapTMVarS :: (SharedNodeState -> TMVar a) -> a -> NodeT STM ()
swapTMVarS f val = lift . flip putTMVar val =<< asks f
isEmptyTMVarS :: (SharedNodeState -> TMVar a) -> NodeT STM Bool
isEmptyTMVarS f = lift . isEmptyTMVar =<< asks f
data NodeException
= NodeExceptionBanned
| NodeExceptionConnected
| NodeExceptionInvalidPeer !ShowPeerId
| NodeExceptionPeerNotConnected !ShowPeerId
| NodeException !String
deriving (Show, Typeable)
instance Exception NodeException
isNodeException :: SomeException -> Bool
isNodeException se = isJust (fromException se :: Maybe NodeException)
catchAny :: MonadBaseControl IO m
=> m a -> (SomeException -> m a) -> m a
catchAny = catch
catchAny_ :: MonadBaseControl IO m
=> m () -> m ()
catchAny_ = flip catchAny $ \_ -> return ()