{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TemplateHaskell #-}
module Network.Legion.StateMachine(
NodeState,
newNodeState,
SM,
runSM,
userRequest,
partitionMerge,
clusterMerge,
migrate,
propagate,
rebalance,
heartbeat,
eject,
join,
minimumCompleteServiceSet,
search,
ClusterAction(..),
UserResponse(..),
getPeers,
) where
import Control.Monad (unless)
import Control.Monad.IO.Class (MonadIO)
import Control.Monad.Logger (MonadLogger, logWarn, logDebug, logError)
import Control.Monad.Trans.Class (lift, MonadTrans)
import Control.Monad.Trans.Reader (ReaderT, runReaderT, ask)
import Control.Monad.Trans.State (StateT, runStateT, get, put, modify)
import Data.Aeson (ToJSON, toJSON, object, (.=), encode)
import Data.ByteString.Lazy (toStrict)
import Data.Conduit (($=), ($$), Sink, transPipe, awaitForever)
import Data.Default.Class (Default)
import Data.Map (Map)
import Data.Maybe (fromMaybe)
import Data.Set (Set, (\\))
import Data.Text (pack, unpack)
import Data.Text.Encoding (decodeUtf8)
import Data.Time.Clock (getCurrentTime)
import Network.Legion.Application (getState, saveState, list, Persistence)
import Network.Legion.BSockAddr (BSockAddr)
import Network.Legion.ClusterState (ClusterPropState, ClusterPowerState)
import Network.Legion.Distribution (Peer, rebalanceAction, newPeer,
RebalanceAction(Invite))
import Network.Legion.Index (IndexRecord(IndexRecord), stTag, stKey,
irTag, irKey, SearchTag(SearchTag), indexEntries, Indexable)
import Network.Legion.KeySet (KeySet, union)
import Network.Legion.LIO (LIO)
import Network.Legion.PartitionKey (PartitionKey)
import Network.Legion.PartitionState (PartitionPowerState, PartitionPropState)
import Network.Legion.PowerState (Event, apply)
import qualified Data.Conduit.List as CL
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified Network.Legion.ClusterState as C
import qualified Network.Legion.Distribution as D
import qualified Network.Legion.KeySet as KS
import qualified Network.Legion.PartitionState as P
data NodeState e o s = NodeState {
self :: Peer,
cluster :: ClusterPropState,
partitions :: Map PartitionKey (PartitionPropState e o s),
migration :: KeySet,
nsIndex :: Set IndexRecord
}
instance (Show e, Show s) => Show (NodeState e o s) where
show = unpack . decodeUtf8 . toStrict . encode
instance (Show e, Show s) => ToJSON (NodeState e o s) where
toJSON (NodeState self cluster partitions migration nsIndex) =
object [
"self" .= show self,
"cluster" .= cluster,
"partitions" .= Map.mapKeys show partitions,
"migration" .= show migration,
"nsIndex" .= show nsIndex
]
newNodeState :: Peer -> ClusterPropState -> NodeState e o s
newNodeState self cluster =
NodeState {
self,
cluster,
partitions = Map.empty,
migration = KS.empty,
nsIndex = Set.empty
}
newtype SM e o s a = SM {
unSM :: ReaderT (Persistence e o s) (StateT (NodeState e o s) LIO) a
}
deriving (Functor, Applicative, Monad, MonadLogger, MonadIO)
runSM
:: Persistence e o s
-> NodeState e o s
-> SM e o s a
-> LIO (a, NodeState e o s)
runSM p ns action = runStateT (runReaderT (unSM action) p) ns
userRequest :: (Event e o s, Default s, Indexable s)
=> PartitionKey
-> e
-> SM e o s (UserResponse o)
userRequest key request = SM $ do
NodeState {self, cluster} <- lift get
let owners = C.findPartition key cluster
if self `Set.member` owners
then do
partition <- unSM $ getPartition key
let
response = fst (apply request (P.ask partition))
partition2 = P.event request partition
unSM $ savePartition key partition2
return (Respond response)
else case Set.toList owners of
[] -> do
let msg = "No owners for key: " ++ show key
$(logError) . pack $ msg
error msg
peer:_ -> return (Forward peer)
partitionMerge :: (Show e, Show s, Event e o s, Default s, Indexable s)
=> Peer
-> PartitionKey
-> PartitionPowerState e o s
-> SM e o s ()
partitionMerge source key foreignPartition = do
partition <- getPartition key
case P.mergeEither source foreignPartition partition of
Left err -> $(logWarn) . pack
$ "Can't apply incomming partition merge from "
++ show source ++ ": " ++ show foreignPartition
++ ". because of: " ++ show err
Right newPartition -> savePartition key newPartition
clusterMerge
:: Peer
-> ClusterPowerState
-> SM e o s ()
clusterMerge source foreignCluster = SM . lift $ do
nodeState@NodeState {migration, cluster} <- get
case C.mergeEither source foreignCluster cluster of
Left err -> $(logWarn) . pack
$ "Can't apply incomming cluster merge from "
++ show source ++ ": " ++ show foreignCluster
++ ". because of: " ++ show err
Right (newCluster, newMigration) ->
put nodeState {
migration = migration `union` newMigration,
cluster = newCluster
}
migrate :: (Default s, Event e o s, Indexable s) => SM e o s ()
migrate = do
NodeState {migration} <- (SM . lift) get
persistence <- SM ask
unless (KS.null migration) $
transPipe (SM . lift3) (list persistence)
$= CL.filter ((`KS.member` migration) . fst)
$$ accum
(SM . lift) $ modify (\ns -> ns {migration = KS.empty})
where
accum :: (Default s, Event e o s, Indexable s)
=> Sink (PartitionKey, PartitionPowerState e o s) (SM e o s) ()
accum = awaitForever $ \ (key, ps) -> do
NodeState {self, cluster, partitions} <- (lift . SM . lift) get
let
partition = fromMaybe (P.initProp self ps) (Map.lookup key partitions)
newPeers = C.findPartition key cluster \\ P.projParticipants partition
newPartition = foldr P.participate partition (Set.toList newPeers)
$(logDebug) . pack $ "Migrating: " ++ show key
lift (savePartition key newPartition)
propagate :: SM e o s [ClusterAction e o s]
propagate = SM $ do
partitionActions <- getPartitionActions
clusterActions <- unSM getClusterActions
return (clusterActions ++ partitionActions)
where
getPartitionActions = do
ns@NodeState {partitions} <- lift get
let
updates = [
(key, newPartition, [
PartitionMerge peer key ps
| peer <- Set.toList peers_
])
| (key, partition) <- Map.toAscList partitions
, let (peers_, ps, newPartition) = P.actions partition
]
actions = [a | (_, _, as) <- updates, a <- as]
newPartitions = Map.fromAscList [
(key, newPartition)
| (key, newPartition, _) <- updates
, not (P.idle newPartition)
]
(lift . put) ns {
partitions = newPartitions
}
return actions
getClusterActions :: SM e o s [ClusterAction e o s]
getClusterActions = SM $ do
ns@NodeState {cluster} <- lift get
let
(peers, cs, newCluster) = C.actions cluster
actions = [ClusterMerge peer cs | peer <- Set.toList peers]
(lift . put) ns {
cluster = newCluster
}
return actions
rebalance :: SM e o s ()
rebalance = SM $ do
ns@NodeState {self, cluster} <- lift get
let
allPeers = (Set.fromList . Map.keys . C.getPeers) cluster
dist = C.getDistribution cluster
action = rebalanceAction self allPeers dist
$(logDebug) . pack $ "The rebalance action is: " ++ show action
(lift . put) ns {
cluster = case action of
Nothing -> cluster
Just (Invite ks) ->
C.claimParticipation self ks cluster
}
heartbeat :: SM e o s ()
heartbeat = SM $ do
now <- lift3 getCurrentTime
ns@NodeState {cluster, partitions} <- lift get
(lift . put) ns {
cluster = C.heartbeat now cluster,
partitions = Map.fromAscList [
(k, P.heartbeat now p)
| (k, p) <- Map.toAscList partitions
]
}
eject :: Peer -> SM e o s ()
eject peer = SM . lift $ do
ns@NodeState {cluster} <- get
put ns {cluster = C.eject peer cluster}
join :: BSockAddr -> SM e o s (Peer, ClusterPowerState)
join peerAddr = SM $ do
peer <- lift2 newPeer
ns@NodeState {cluster} <- lift get
let newCluster = C.joinCluster peer peerAddr cluster
(lift . put) ns {cluster = newCluster}
return (peer, C.getPowerState newCluster)
minimumCompleteServiceSet :: SM e o s (Set Peer)
minimumCompleteServiceSet = SM $ do
NodeState {cluster} <- lift get
return (D.minimumCompleteServiceSet (C.getDistribution cluster))
search :: SearchTag -> SM e o s (Maybe IndexRecord)
search SearchTag {stTag, stKey = Nothing} = SM $ do
NodeState {nsIndex} <- lift get
return (Set.lookupGE IndexRecord {irTag = stTag, irKey = minBound} nsIndex)
search SearchTag {stTag, stKey = Just key} = SM $ do
NodeState {nsIndex} <- lift get
return (Set.lookupGT IndexRecord {irTag = stTag, irKey = key} nsIndex)
data ClusterAction e o s
= ClusterMerge Peer ClusterPowerState
| PartitionMerge Peer PartitionKey (PartitionPowerState e o s)
data UserResponse o
= Forward Peer
| Respond o
getPeers :: SM e o s (Map Peer BSockAddr)
getPeers = SM $ C.getPeers . cluster <$> lift get
getPartition :: (Default s, Event e o s)
=> PartitionKey
-> SM e o s (PartitionPropState e o s)
getPartition key = SM $ do
persistence <- ask
NodeState {self, partitions, cluster} <- lift get
case Map.lookup key partitions of
Nothing ->
lift3 (getState persistence key) <&> \case
Nothing -> P.new key self (C.findPartition key cluster)
Just partition -> P.initProp self partition
Just partition -> return partition
savePartition :: (Default s, Event e o s, Indexable s)
=> PartitionKey
-> PartitionPropState e o s
-> SM e o s ()
savePartition key partition = SM $ do
persistence <- ask
oldTags <- indexEntries . P.ask <$> unSM (getPartition key)
let
currentTags = indexEntries (P.ask partition)
obsoleteRecords = Set.map (flip IndexRecord key) (oldTags \\ currentTags)
newRecords = Set.map (flip IndexRecord key) currentTags
$(logDebug) . pack
$ "Tagging " ++ show key ++ " with: "
++ show (currentTags, obsoleteRecords, newRecords)
ns@NodeState {partitions, nsIndex} <- lift get
lift3 (saveState persistence key (
if P.participating partition
then Just (P.getPowerState partition)
else Nothing
))
lift $ put ns {
partitions = if P.idle partition
then
Map.delete key partitions
else
Map.insert key partition partitions,
nsIndex = (nsIndex \\ obsoleteRecords) `Set.union` newRecords
}
(<&>) :: (Functor f) => f a -> (a -> b) -> f b
(<&>) = flip fmap
lift2
:: (
MonadTrans a,
MonadTrans b,
Monad m,
Monad (b m)
)
=> m r
-> a (b m) r
lift2 = lift . lift
lift3
:: (
MonadTrans a,
MonadTrans b,
MonadTrans c,
Monad m,
Monad (c m),
Monad (b (c m))
)
=> m r
-> a (b (c m)) r
lift3 = lift . lift . lift