{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE OverloadedStrings #-}
module Network.Legion.ClusterState (
ClusterState,
ClusterPowerState,
ClusterPropState,
claimParticipation,
new,
initProp,
getPowerState,
getPeers,
findPartition,
getDistribution,
joinCluster,
eject,
mergeEither,
actions,
allParticipants,
heartbeat,
) where
import Data.Aeson (ToJSON, toJSON, object, (.=))
import Data.Binary (Binary)
import Data.Default.Class (Default(def))
import Data.Map (Map)
import Data.Set (Set)
import Data.Time.Clock (UTCTime)
import Data.UUID (UUID)
import GHC.Generics (Generic)
import Network.Legion.BSockAddr (BSockAddr(BSockAddr))
import Network.Legion.Distribution (ParticipationDefaults, modify, Peer)
import Network.Legion.KeySet (KeySet, full, unions)
import Network.Legion.PartitionKey (PartitionKey)
import Network.Legion.PowerState (Event(apply))
import Network.Legion.Propagation (PropState, PropPowerState)
import Network.Socket (SockAddr)
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified Network.Legion.Distribution as D
import qualified Network.Legion.Propagation as P
data ClusterState = ClusterState {
distribution :: ParticipationDefaults,
peers :: Map Peer BSockAddr
}
deriving (Show, Generic)
instance Binary ClusterState
instance Default ClusterState where
def = ClusterState {
distribution = D.empty,
peers = Map.empty
}
instance ToJSON ClusterState where
toJSON ClusterState {distribution, peers} = object [
"distribution" .= distribution,
"peers" .= Map.fromList [
(show p, show a)
| (p, a) <- Map.toList peers
]
]
newtype ClusterPowerState = ClusterPowerState {
unPowerState :: PropPowerState UUID ClusterState Peer Update ()
} deriving (Show, Binary)
newtype ClusterPropState = ClusterPropState {
unPropState :: PropState UUID ClusterState Peer Update ()
} deriving (Show, ToJSON)
data Update
= PeerJoined Peer BSockAddr
| Participating Peer KeySet
| PeerEjected Peer
deriving (Show, Generic)
instance Binary Update
instance Event Update () ClusterState where
apply (PeerJoined peer addr) cs@ClusterState {peers} =
((), cs {peers = Map.insert peer addr peers})
apply (Participating peer ks) cs@ClusterState {distribution} =
((), cs {distribution = modify (Set.insert peer) ks distribution})
apply (PeerEjected peer) cs@ClusterState {distribution, peers} =
((), cs {
distribution = modify (Set.delete peer) full distribution,
peers = Map.delete peer peers
})
claimParticipation
:: Peer
-> KeySet
-> ClusterPropState
-> ClusterPropState
claimParticipation peer ks =
ClusterPropState
. P.event (Participating peer ks)
. unPropState
new :: UUID -> Peer -> SockAddr -> ClusterPropState
new clusterId self addy =
claimParticipation self full
. ClusterPropState
. P.event (PeerJoined self (BSockAddr addy))
$ P.new clusterId self (Set.singleton self)
initProp :: Peer -> ClusterPowerState -> ClusterPropState
initProp self = ClusterPropState . P.initProp self . unPowerState
getPowerState :: ClusterPropState -> ClusterPowerState
getPowerState = ClusterPowerState . P.getPowerState . unPropState
getPeers :: ClusterPropState -> Map Peer BSockAddr
getPeers = peers . P.ask . unPropState
getDistribution :: ClusterPropState -> ParticipationDefaults
getDistribution = distribution . P.ask . unPropState
findPartition :: PartitionKey -> ClusterPropState -> Set Peer
findPartition key =
D.findPartition key . distribution . P.ask . unPropState
joinCluster
:: Peer
-> BSockAddr
-> ClusterPropState
-> ClusterPropState
joinCluster peer addy =
ClusterPropState
. P.event (PeerJoined peer addy)
. P.participate peer
. unPropState
eject :: Peer -> ClusterPropState -> ClusterPropState
eject peer =
ClusterPropState
. P.event (PeerEjected peer)
. P.disassociate peer
. unPropState
mergeEither
:: Peer
-> ClusterPowerState
-> ClusterPropState
-> Either String (ClusterPropState, KeySet)
mergeEither otherPeer (ClusterPowerState otherPS) (ClusterPropState prop) =
let
self = P.getSelf prop
divergences = P.divergences self (P.initProp otherPeer otherPS)
migrating = unions [
ks
| (_, Participating _ ks) <- Map.toList divergences
]
in case P.mergeEither otherPeer otherPS prop of
Left err -> Left err
Right newProp -> Right (ClusterPropState newProp, migrating)
actions :: ClusterPropState -> (Set Peer, ClusterPowerState, ClusterPropState)
actions prop =
let (peers, ps, newProp) = P.actions (unPropState prop)
in (peers, ClusterPowerState ps, ClusterPropState newProp)
allParticipants :: ClusterPropState -> Set Peer
allParticipants = P.allParticipants . unPropState
heartbeat :: UTCTime -> ClusterPropState -> ClusterPropState
heartbeat now = ClusterPropState . P.heartbeat now . unPropState