{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}
module Network.Legion.ClusterState (
ClusterState,
ClusterPowerState,
ClusterPowerStateT,
RebalanceOrd,
new,
getPeers,
findRoute,
findOwners,
getDistribution,
joinCluster,
finishRebalance,
eject,
nextAction,
) where
import Control.Exception (throw)
import Data.Aeson (ToJSON, toJSON, object, (.=), encode)
import Data.Binary (Binary)
import Data.Default.Class (Default(def))
import Data.Functor.Identity (runIdentity)
import Data.Map (Map)
import Data.Set (Set)
import Data.Text.Encoding (decodeUtf8)
import Data.UUID (UUID)
import Data.Word (Word64)
import GHC.Generics (Generic)
import Network.Legion.BSockAddr (BSockAddr(BSockAddr))
import Network.Legion.Distribution (ParticipationDefaults,
Peer, rebalanceAction, RebalanceAction(NoAction))
import Network.Legion.PartitionKey (PartitionKey)
import Network.Legion.PowerState (Event, apply, PowerState)
import Network.Legion.PowerState.Monad (PowerStateT, runPowerStateT)
import Network.Socket (SockAddr)
import qualified Data.ByteString.Lazy as LBS
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified Data.Text as T
import qualified Network.Legion.Distribution as D
import qualified Network.Legion.KeySet as KS
import qualified Network.Legion.PowerState as PS
import qualified Network.Legion.PowerState.Monad as PM
data ClusterState = ClusterState {
distribution :: ParticipationDefaults,
peers :: Map Peer BSockAddr,
updates :: [ClusterChange],
rebalanceOrd :: RebalanceOrd
}
deriving (Generic)
instance Binary ClusterState
instance Default ClusterState where
def = ClusterState {
distribution = D.empty,
peers = Map.empty,
updates = [],
rebalanceOrd = minBound
}
instance ToJSON ClusterState where
toJSON (ClusterState distribution_ peers_ updates_ rebalanceOrd_) = object [
"distribution" .= distribution_,
"peers" .= Map.fromList [
(show p, show a)
| (p, a) <- Map.toList peers_
],
"updates" .= (show <$> updates_),
"rebalanceOrd" .= show rebalanceOrd_
]
instance Show ClusterState where
show = T.unpack . decodeUtf8 . LBS.toStrict . encode
type ClusterPowerState =
PowerState UUID ClusterState Peer Update ()
type ClusterPowerStateT =
PowerStateT UUID ClusterState Peer Update ()
newtype RebalanceOrd = RebalanceOrd Word64
deriving (Generic, Show, Enum, Bounded, Eq, Ord)
instance Binary RebalanceOrd
data Update
= Change ClusterChange
| Complete
deriving (Show, Generic, Eq)
instance Binary Update
instance Event Update () ClusterState where
apply update cs@ClusterState {peers, updates, distribution, rebalanceOrd} =
((),) . popUpdate $ case update of
Change change -> cs {updates = updates ++ [change]}
Complete -> cs {
distribution =
snd (rebalanceAction (Map.keysSet peers) distribution),
rebalanceOrd =
succ rebalanceOrd
}
popUpdate :: ClusterState -> ClusterState
popUpdate cs@ClusterState {updates, distribution, peers} =
case (updates, rebalanceAction (Map.keysSet peers) distribution) of
(u:moreUpdates, (NoAction, _)) -> popUpdate $
case u of
PeerJoined peer addr -> cs {
peers = Map.insert peer addr peers,
updates = moreUpdates
}
PeerEjected peer -> cs {
distribution = D.modify (Set.delete peer) KS.full distribution,
peers = Map.delete peer peers,
updates = moreUpdates
}
_ -> cs
data ClusterChange
= PeerJoined Peer BSockAddr
| PeerEjected Peer
deriving (Show, Generic, Eq)
instance Binary ClusterChange
new :: UUID -> Peer -> SockAddr -> ClusterPowerState
new clusterId self addy =
runIdentity $ runPowerStateT self (PS.new clusterId (Set.singleton self)) (do
PM.event (Change (PeerJoined self (BSockAddr addy)))
PM.event Complete
PM.acknowledge
) >>= \case
Left err -> throw err
Right ((), _, cluster, _) -> return cluster
getPeers :: ClusterPowerState -> Map Peer BSockAddr
getPeers = peers . PS.projectedValue
getDistribution :: ClusterPowerState -> ParticipationDefaults
getDistribution = distribution . PS.projectedValue
findRoute :: PartitionKey -> ClusterPowerState -> Set Peer
findRoute key =
D.findPartition key . distribution . PS.projectedValue
findOwners :: PartitionKey -> ClusterPowerState -> Set Peer
findOwners key cluster =
let ClusterState {distribution, peers} = PS.projectedValue cluster
in
D.findPartition
key
(snd (rebalanceAction (Map.keysSet peers) distribution))
joinCluster :: (Monad m)
=> Peer
-> BSockAddr
-> ClusterPowerStateT m ()
joinCluster peer addy = do
PM.participate peer
PM.event (Change (PeerJoined peer addy))
finishRebalance :: (Monad m) => ClusterPowerStateT m ()
finishRebalance = PM.event Complete
eject :: (Monad m) => Peer -> ClusterPowerStateT m ()
eject peer = do
PM.event (Change (PeerEjected peer))
PM.disassociate peer
nextAction :: ClusterPowerState -> (RebalanceOrd, RebalanceAction)
nextAction cluster =
let ClusterState {peers, distribution, rebalanceOrd} = PS.infimumValue cluster
in (rebalanceOrd, fst (rebalanceAction (Map.keysSet peers) distribution))