module Network.Legion.Distribution (
ParticipationDefaults,
Peer,
empty,
modify,
findPartition,
rebalanceAction,
RebalanceAction(..),
newPeer,
minimumCompleteServiceSet,
) where
import Prelude hiding (null)
import Control.Monad.IO.Class (MonadIO)
import Data.Aeson (ToJSON, toJSON, object, (.=))
import Data.Binary (Binary)
import Data.Function (on)
import Data.List (sort, sortBy)
import Data.Map (Map)
import Data.Monoid ((<>))
import Data.Set (Set)
import Data.Text (pack)
import Data.UUID (UUID)
import GHC.Generics (Generic)
import Network.Legion.KeySet (KeySet, member, (\\), null)
import Network.Legion.PartitionKey (PartitionKey)
import Network.Legion.UUID (getUUID)
import Text.Read (readPrec)
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified Network.Legion.KeySet as KS
newtype Peer = Peer UUID deriving (Binary, Eq, Ord)
instance Read Peer where
readPrec = Peer <$> readPrec
instance Show Peer where
showsPrec n (Peer uuid) = showsPrec n uuid
newtype ParticipationDefaults = D {
unD :: [(KeySet, Set Peer)]
} deriving (Show, Binary)
instance ToJSON ParticipationDefaults where
toJSON (D dist) = object [
pack (show ks) .= Set.map show peers
| (ks, peers) <- dist
]
empty :: ParticipationDefaults
empty = D []
findPartition :: PartitionKey -> ParticipationDefaults -> Set Peer
findPartition k d =
case [ps | (ks, ps) <- unD d, k `member` ks] of
[ps] -> ps
_ -> error
$ "No exact mach for key in distribution. This means there is a bug in "
++ "the module `Network.Legion.Distribution`. Please report this bug "
++ "via github: " ++ show (k, d)
minimumCompleteServiceSet :: ParticipationDefaults -> Set Peer
minimumCompleteServiceSet defs = Set.fromList [
p
| (_, peers) <- unD defs
, Just (p, _) <- [Set.minView peers]
]
modify
:: (Set Peer -> Set Peer)
-> KeySet
-> ParticipationDefaults
-> ParticipationDefaults
modify fun keyset =
D . filter (not . null . fst) . doModify keyset . unD
where
doModify ks [] = [(ks, fun Set.empty)]
doModify ks ((r, ps):dist) =
let {
unaffected = r \\ ks;
affected = r \\ unaffected;
remaining = ks \\ affected;
} in
(unaffected, ps):(affected, fun ps):doModify remaining dist
rebalanceAction
:: Set Peer
-> ParticipationDefaults
-> (RebalanceAction, ParticipationDefaults)
rebalanceAction allPeers distribution =
let
action = underServed <> overServed <> underUtilized
newDist = case action of
NoAction -> D dist
Invite peer ks -> modify (Set.insert peer) ks (D dist)
Drop peer ks -> modify (Set.delete peer) ks (D dist)
in (action, newDist)
where
dist =
let
distPeers = Set.unions (snd <$> unD distribution)
defunct = distPeers Set.\\ allPeers
in
unD (modify (Set.\\ defunct) KS.full distribution)
underServed :: RebalanceAction
underServed =
let
underserved = [
(ks, ps)
| (ks, ps) <- dist
, Set.size ps < 3
]
mostUnderserved = sortBy (compare `on` Set.size . snd) underserved
in case mostUnderserved of
[] -> NoAction
(ks, ps):_ ->
let
candidateHosts = Set.toAscList (allPeers Set.\\ ps)
bestHosts = sort [(load p, p) | p <- candidateHosts]
in case bestHosts of
(currentLoad, candidate):_ ->
let
additionalLoad :: KeySet
additionalLoad = KS.take (idealLoad currentLoad) ks
in Invite candidate additionalLoad
_ -> NoAction
overServed :: RebalanceAction
overServed =
let
over :: Map Peer KeySet
over = Map.filter (not . KS.null) . Map.fromList $ [
(candidate, foldr KS.union KS.empty [
ks
| (ks, ps) <- dist
, Set.size ps > 3
, best:_ <- [sortBy (flip compare `on` load) (Set.toList ps)]
, best == candidate
])
| candidate <- Set.toList allPeers
]
in case Map.toAscList over of
[] -> NoAction
(peer, ks):_ -> Drop peer ks
underUtilized :: RebalanceAction
underUtilized =
let
under = sortBy (compare `on` load) [
p
| p <- Set.toList allPeers
, load p + 1 < idealLoad
]
over = sortBy (flip compare `on` load) [
p
| p <- Set.toList allPeers
, load p > idealLoad
]
in case (under, over) of
(u:_, o:_) | u /= o ->
let
difference = (servicedBy o \\ servicedBy u)
keys = KS.take (idealLoad load u) difference
in Invite u keys
_ -> NoAction
load :: Peer -> Integer
load = KS.size . servicedBy
idealLoad :: Integer
idealLoad =
let
total = KS.size KS.full * 3
numPeers = toInteger (Set.size allPeers)
in (total `div` numPeers) + 1
servicedBy :: Peer -> KeySet
servicedBy p = foldr KS.union KS.empty [
ks
| (ks, ps) <- dist
, p `Set.member` ps
]
data RebalanceAction
= Invite Peer KeySet
| Drop Peer KeySet
| NoAction
deriving (Show, Generic)
instance Binary RebalanceAction
instance Monoid RebalanceAction where
mempty = NoAction
mappend NoAction a = a
mappend a _ = a
newPeer :: (MonadIO m) => m Peer
newPeer = Peer <$> getUUID