{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
module Network.Legion.StateMachine(
newNodeState,
userRequest,
partitionMerge,
clusterMerge,
eject,
join,
minimumCompleteServiceSet,
search,
joinNext,
joinNextResponse,
UserResponse(..),
getPeers,
getPartition,
) where
import Control.Monad (void, unless)
import Control.Monad.Catch (throwM, MonadThrow)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Logger (MonadLogger, logDebug, logError,
MonadLoggerIO, logWarn)
import Control.Monad.Trans.Class (lift)
import Data.Bool (bool)
import Data.Conduit ((=$=), runConduit, transPipe, awaitForever)
import Data.Default.Class (Default)
import Data.Map (Map)
import Data.Maybe (fromMaybe)
import Data.Set (Set, (\\), member)
import Data.Text (pack)
import Network.Legion.Application (getState, saveState, list)
import Network.Legion.BSockAddr (BSockAddr)
import Network.Legion.ClusterState (ClusterPowerState, ClusterPowerStateT)
import Network.Legion.Distribution (Peer, newPeer, RebalanceAction(Invite,
Drop))
import Network.Legion.Index (IndexRecord(IndexRecord), stTag, stKey,
irTag, irKey, SearchTag(SearchTag), indexEntries, Indexable)
import Network.Legion.KeySet (KeySet)
import Network.Legion.PartitionKey (PartitionKey)
import Network.Legion.PartitionState (PartitionPowerState, PartitionPowerStateT)
import Network.Legion.PowerState (Event)
import Network.Legion.PowerState.Monad (PropAction(Send, DoNothing))
import Network.Legion.StateMachine.Monad (SM, NodeState(NodeState),
ClusterAction(PartitionMerge, ClusterMerge, PartitionJoin),
self, cluster, partitions, nsIndex, getPersistence, getNodeState,
modifyNodeState, pushActions, joins, lastRebalance)
import qualified Data.Conduit.List as CL
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified Network.Legion.ClusterState as C
import qualified Network.Legion.Distribution as D
import qualified Network.Legion.KeySet as KS
import qualified Network.Legion.PowerState as PS
import qualified Network.Legion.PowerState.Monad as PM
newNodeState :: Peer -> ClusterPowerState -> NodeState e o s
newNodeState self cluster =
NodeState {
self,
cluster,
partitions = Map.empty,
nsIndex = Set.empty,
joins = Map.empty,
lastRebalance = minBound
}
userRequest :: (
Default s,
Eq e,
Event e o s,
Indexable s,
MonadLoggerIO m,
MonadThrow m,
Show e,
Show s
)
=> PartitionKey
-> e
-> SM e o s m (UserResponse o)
userRequest key request = do
NodeState {self, cluster} <- getNodeState
let routes = C.findRoute key cluster
if self `Set.member` routes
then do
(response, _) <- runPartitionPowerStateT key (
PM.event request
)
return (Respond response)
else case Set.toList routes of
[] -> do
let msg = "No routes for key: " ++ show key
$(logError) . pack $ msg
error msg
peer:_ -> return (Forward peer)
partitionMerge :: (
Default s,
Eq e,
Event e o s,
Indexable s,
MonadLoggerIO m,
MonadThrow m,
Show e,
Show s
)
=> PartitionKey
-> PartitionPowerState e o s
-> SM e o s m ()
partitionMerge key foreignPartition =
void $ runPartitionPowerStateT key (PM.merge foreignPartition)
clusterMerge :: (
Default s,
Eq e,
Event e o s,
Indexable s,
MonadLoggerIO m,
MonadThrow m,
Show e,
Show s
)
=> ClusterPowerState
-> SM e o s m ()
clusterMerge foreignCluster = do
runClusterPowerStateT (PM.merge foreignCluster)
nodeState@NodeState {lastRebalance, cluster, self} <- getNodeState
$(logDebug) . pack
$ "Next Rebalance: "
++ show (lastRebalance, C.nextAction cluster, nodeState)
case C.nextAction cluster of
(ord, Invite peer keys) | ord > lastRebalance && peer == self -> do
let
askPeers =
Set.toList . Set.delete self . Map.keysSet . C.getPeers $ cluster
pushActions [
PartitionJoin p keys
| p <- askPeers
]
modifyNodeState (\ns -> ns {
joins = Map.fromList [
(p, keys)
| p <- askPeers
],
lastRebalance = ord
})
(ord, Drop peer keys) | ord > lastRebalance && peer == self -> do
persistence <- getPersistence
runConduit (
transPipe liftIO (list persistence)
=$= CL.map fst
=$= CL.filter (`KS.member` keys)
=$= awaitForever (\key ->
lift $ runPartitionPowerStateT key (
PM.disassociate self
)
)
)
modifyNodeState (\ns -> ns {
lastRebalance = ord
})
runClusterPowerStateT C.finishRebalance
_ -> return ()
eject :: (MonadLogger m, MonadThrow m) => Peer -> SM e o s m ()
eject peer = do
runClusterPowerStateT (C.eject peer)
runClusterPowerStateTAs peer (return ())
join :: (MonadIO m, MonadThrow m)
=> BSockAddr
-> SM e o s m (Peer, ClusterPowerState)
join peerAddr = do
peer <- newPeer
void $ runClusterPowerStateT (C.joinCluster peer peerAddr)
NodeState {cluster} <- getNodeState
return (peer, cluster)
minimumCompleteServiceSet :: (Monad m) => SM e o s m (Set Peer)
minimumCompleteServiceSet = do
NodeState {cluster} <- getNodeState
return (D.minimumCompleteServiceSet (C.getDistribution cluster))
search :: (Monad m) => SearchTag -> SM e o s m (Maybe IndexRecord)
search SearchTag {stTag, stKey = Nothing} = do
NodeState {nsIndex} <- getNodeState
return (Set.lookupGE IndexRecord {irTag = stTag, irKey = minBound} nsIndex)
search SearchTag {stTag, stKey = Just key} = do
NodeState {nsIndex} <- getNodeState
return (Set.lookupGT IndexRecord {irTag = stTag, irKey = key} nsIndex)
joinNext :: (
Default s,
Eq e,
Event e o s,
Indexable s,
MonadLoggerIO m,
MonadThrow m
)
=> Peer
-> KeySet
-> SM e o s m (Maybe (PartitionKey, PartitionPowerState e o s))
joinNext peer askKeys = do
persistence <- getPersistence
runConduit (
transPipe liftIO (list persistence)
=$= CL.filter ((`KS.member` askKeys) . fst)
=$= CL.head
) >>= \case
Nothing -> return Nothing
Just (gotKey, partition) -> do
NodeState {self} <- getNodeState
PM.runPowerStateT self partition (do
PM.participate peer
PM.acknowledge
) >>= \case
Left err -> throwM err
Right ((), action, partition2, _infOutputs) -> do
case action of
Send -> pushActions [
PartitionMerge p gotKey partition2
| p <- Set.toList (PS.allParticipants partition2)
, p /= peer
, p /= self
]
DoNothing -> return ()
savePartition gotKey partition2
return (Just (gotKey, partition2))
joinNextResponse :: (
Default s,
Eq e,
Event e o s,
Indexable s,
MonadLoggerIO m,
MonadThrow m,
Show e,
Show s
)
=> Peer
-> Maybe (PartitionKey, PartitionPowerState e o s)
-> SM e o s m ()
joinNextResponse peer response = do
NodeState {cluster, lastRebalance} <- getNodeState
if lastRebalance > fst (C.nextAction cluster)
then
$(logWarn) . pack
$ "Received an old join response: "
++ show (peer, response, cluster, lastRebalance)
else do
case response of
Just (key, partition) -> do
partitionMerge key partition
NodeState {joins} <- getNodeState
case (KS.\\ KS.fromRange minBound key) <$> Map.lookup peer joins of
Nothing ->
return ()
Just needsJoinSet -> do
unless (KS.null needsJoinSet)
(pushActions [PartitionJoin peer needsJoinSet])
modifyNodeState (\ns -> ns {
joins = Map.filter
(not . KS.null)
(Map.insert peer needsJoinSet joins)
})
Nothing ->
modifyNodeState (\ns@NodeState {joins} -> ns {
joins = Map.delete peer joins
})
Map.null . joins <$> getNodeState >>= bool
(return ())
(runClusterPowerStateT C.finishRebalance)
data UserResponse o
= Forward Peer
| Respond o
getPeers :: (Monad m) => SM e o s m (Map Peer BSockAddr)
getPeers = C.getPeers . cluster <$> getNodeState
getPartition :: (Default s, MonadIO m)
=> PartitionKey
-> SM e o s m (PartitionPowerState e o s)
getPartition key = do
persistence <- getPersistence
NodeState {partitions, cluster} <- getNodeState
case Map.lookup key partitions of
Nothing ->
fromMaybe (PS.new key (C.findOwners key cluster)) <$>
liftIO (getState persistence key)
Just partition -> return partition
savePartition :: (Default s, Event e o s, Indexable s, MonadLoggerIO m)
=> PartitionKey
-> PartitionPowerState e o s
-> SM e o s m ()
savePartition key partition = do
persistence <- getPersistence
oldTags <- indexEntries . PS.projectedValue <$> getPartition key
let
currentTags = indexEntries (PS.projectedValue partition)
obsoleteRecords = Set.map (flip IndexRecord key) (oldTags \\ currentTags)
newRecords = Set.map (flip IndexRecord key) currentTags
$(logDebug) . pack
$ "Tagging " ++ show key ++ " with: "
++ show (currentTags, obsoleteRecords, newRecords)
NodeState {self} <- getNodeState
liftIO (saveState persistence key (
if self `member` PS.allParticipants partition
then Just partition
else Nothing
))
modifyNodeState (\ns@NodeState {partitions, nsIndex} ->
nsIndex `seq`
ns {
partitions = if Set.null (PS.divergent partition)
then
Map.delete key partitions
else
Map.insert key partition partitions,
nsIndex = (nsIndex \\ obsoleteRecords) `Set.union` newRecords
}
)
runPartitionPowerStateT :: (
Default s,
Eq e,
Event e o s,
Indexable s,
MonadLoggerIO m,
MonadThrow m,
Show e,
Show s
)
=> PartitionKey
-> PartitionPowerStateT e o s (SM e o s m) a
-> SM e o s m (a, PartitionPowerState e o s)
runPartitionPowerStateT key m = do
NodeState {self} <- getNodeState
partition <- getPartition key
PM.runPowerStateT self partition (
m <* (removeObsolete >> PM.acknowledge)
) >>= \case
Left err -> throwM err
Right (a, action, partition2, _infOutputs) -> do
case action of
Send -> pushActions [
PartitionMerge p key partition2
| p <- Set.toList (PS.allParticipants partition2)
, p /= self
]
DoNothing -> return ()
$(logDebug) . pack
$ "Partition update: " ++ show partition
++ " --> " ++ show partition2 ++ " :: " ++ show action
savePartition key partition2
return (a, partition2)
where
removeObsolete :: (Eq e, Event e o s, Monad m)
=> PartitionPowerStateT e o s (SM e o s m) ()
removeObsolete = do
owners <- C.findOwners key . cluster <$> lift getNodeState
peers <- PS.projParticipants <$> PM.getPowerState
let obsolete = peers \\ owners
mapM_
(\peer -> PM.disassociate peer >> PM.acknowledgeAs peer)
(Set.toList obsolete)
runClusterPowerStateT :: (MonadThrow m)
=> ClusterPowerStateT (SM e o s m) a
-> SM e o s m a
runClusterPowerStateT m = do
NodeState {self} <- getNodeState
runClusterPowerStateTAs self m
runClusterPowerStateTAs :: (MonadThrow m)
=> Peer
-> ClusterPowerStateT (SM e o s m) a
-> SM e o s m a
runClusterPowerStateTAs as m = do
NodeState {cluster, self} <- getNodeState
PM.runPowerStateT as cluster (m <* PM.acknowledge) >>= \case
Left err -> throwM err
Right (a, action, cluster2, _outputs) -> do
case action of
Send -> pushActions [
ClusterMerge p cluster2
| p <- Set.toList (PS.allParticipants cluster2)
, p /= self
]
DoNothing -> return ()
modifyNodeState (\ns -> ns {cluster = cluster2})
return a