{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{- |
  This module defines the data structures and functions used for handling the
  key space distribution.
-}
module Network.Legion.Distribution (
  ParticipationDefaults,
  Peer,
  empty,
  modify,
  findPartition,
  rebalanceAction,
  RebalanceAction(..),
  newPeer,
  minimumCompleteServiceSet,
) where

import Prelude hiding (null)

import Control.Monad.IO.Class (MonadIO)
import Data.Aeson (ToJSON, toJSON, object, (.=))
import Data.Binary (Binary)
import Data.Function (on)
import Data.List (sort, sortBy)
import Data.Map (Map)
import Data.Monoid ((<>))
import Data.Set (Set)
import Data.Text (pack)
import Data.UUID (UUID)
import GHC.Generics (Generic)
import Network.Legion.KeySet (KeySet, member, (\\), null)
import Network.Legion.PartitionKey (PartitionKey)
import Network.Legion.UUID (getUUID)
import Text.Read (readPrec)
import qualified Data.Map as Map
import qualified Data.Set as Set
import qualified Network.Legion.KeySet as KS


{- |
  The way to identify a peer.
-}
newtype Peer = Peer UUID deriving (Binary, Eq, Ord)
instance Read Peer where
  readPrec = Peer <$> readPrec
instance Show Peer where
  showsPrec n (Peer uuid) = showsPrec n uuid


{- |
  The distribution of partitions and partition replicas among the cluster.
-}
newtype ParticipationDefaults = D {
    unD :: [(KeySet, Set Peer)]
  } deriving (Show, Binary)
instance ToJSON ParticipationDefaults where
  toJSON (D dist) = object [
      pack (show ks) .= Set.map show peers
      | (ks, peers) <- dist
    ]


{- | Constuct a distribution that contains no partitions. -}
empty :: ParticipationDefaults

empty = D []


{- | Find the peers that own the specified partition. -}
findPartition :: PartitionKey -> ParticipationDefaults -> Set Peer

findPartition k d =
  case [ps | (ks, ps) <- unD d, k `member` ks] of
    [ps] -> ps
    _ -> error
      $ "No exact mach for key in distribution. This means there is a bug in "
      ++ "the module `Network.Legion.Distribution`. Please report this bug "
      ++ "via github: " ++ show (k, d)


{- | Find a solution to the minimum complete service set. -}
minimumCompleteServiceSet :: ParticipationDefaults -> Set Peer
minimumCompleteServiceSet defs = Set.fromList [
    p
    | (_, peers) <- unD defs
    , Just (p, _) <- [Set.minView peers]
  ]


{- |
  Modify the default participation for the key set.
-}
modify
  :: (Set Peer -> Set Peer)
  -> KeySet
  -> ParticipationDefaults
  -> ParticipationDefaults

modify fun keyset =
    {-
      doModify can produce key ranges that contain zero keys, which is
      why the `filter`.
    -}
    D . filter (not . null . fst) . doModify keyset . unD
  where
    doModify ks [] = [(ks, fun Set.empty)]
    doModify ks ((r, ps):dist) =
      let {
        unaffected = r \\ ks;
          affected = r \\ unaffected;
         remaining = ks \\ affected;
      } in
      (unaffected, ps):(affected, fun ps):doModify remaining dist


{- |
  Return the best action, if any, that should be taken to rebalance an
  unbalanced distribution, along with the resulting distribution.
-}
rebalanceAction
  :: Set Peer {- ^ The set of all peers in the cluster. -}
  -> ParticipationDefaults
  -> (RebalanceAction, ParticipationDefaults)
rebalanceAction allPeers distribution =
    let
      action = underServed <> overServed <> underUtilized
      newDist = case action of
        NoAction -> D dist
        Invite peer ks -> modify (Set.insert peer) ks (D dist)
        Drop peer ks -> modify (Set.delete peer) ks (D dist)
    in (action, newDist)
  where

    {- | Remove any defunct peers from the distribution. -}
    dist = 
      let
        distPeers = Set.unions (snd <$> unD distribution)
        defunct = distPeers Set.\\ allPeers
      in
        unD (modify (Set.\\ defunct) KS.full distribution)

    {- |
      Figure out if there are any under-served partitions and also figure
      out if this peer is the best candidate to service them . "Under
      served" means that the partition isn't replicated enough times,
      where "enough" is the magic number 3.
    -}
    underServed :: RebalanceAction
    underServed =
      let
        underserved = [
            (ks, ps)
            | (ks, ps) <- dist
            , Set.size ps < 3
          ]
        mostUnderserved = sortBy (compare `on` Set.size . snd) underserved
      in case mostUnderserved of
        [] -> NoAction
        (ks, ps):_ ->
          let
            {- |
              Any peer that is not currently servicing the keyspace
              segment is a candidate.
            -}
            candidateHosts = Set.toAscList (allPeers Set.\\ ps)

            {- |
              The best candidate is the one that currently has the
              least load.
            -}
            bestHosts = sort [(load p, p) | p <- candidateHosts]
          in case bestHosts of
            (currentLoad, candidate):_ ->
              {-
                Don't be too eager to take on too much additional
                load, because if we take more than our fair share, then
                the extra is just going to get rebalanced away almost
                immediately, leading to inefficiency.
              -}
              let
                additionalLoad :: KeySet
                additionalLoad = KS.take (idealLoad - currentLoad) ks
              in Invite candidate additionalLoad
            _ -> NoAction

    {- |
      Figure out if there are any partitions being over served and also
      figure out if we are the best candidate to drop them. "Over served"
      means that the partition it replicated too many times. "Too many times"
      is anything over the magic number, 3.
    -}
    overServed :: RebalanceAction
    overServed =
      let
        {- | 'over' maps peers to the set of keys that peer should drop. -}
        over :: Map Peer KeySet
        over = Map.filter (not . KS.null) . Map.fromList $ [
            (candidate, foldr KS.union KS.empty [
                ks
                | (ks, ps) <- dist
                , Set.size ps > 3
                , best:_ <- [sortBy (flip compare `on` load) (Set.toList ps)]
                , best == candidate
              ])
            | candidate <- Set.toList allPeers
          ]
      in case Map.toAscList over of
          [] -> NoAction
          (peer, ks):_ -> Drop peer ks

    {- |
      Figure out which peer is most underutilized with respect to the
      rest of the cluster and also what keys that peer should begin to
      serve to correct the underutilization.
    -}
    underUtilized :: RebalanceAction
    underUtilized =
      let
        under = sortBy (compare `on` load) [
            p
            | p <- Set.toList allPeers
            , load p + 1 < idealLoad
          ]
        over = sortBy (flip compare `on` load) [
            p
            | p <- Set.toList allPeers
            , load p > idealLoad
          ]
      in case (under, over) of
        (u:_, o:_) | u /= o ->
          {-
            Figure out which keys to take, which is a selection of
            the difference between them large enough to move the under
            utilized peer up to the ideal load.
          -}
          let
            difference = (servicedBy o \\ servicedBy u)
            keys = KS.take (idealLoad - load u) difference
          in Invite u keys 
        _ -> NoAction

    {- | Figure out how much load a peer is servicing.  -}
    load :: Peer -> Integer
    load = KS.size . servicedBy

    {- | The ideal load for each peer.  -}
    idealLoad :: Integer
    idealLoad =
      let
        total = KS.size KS.full * 3
        numPeers = toInteger (Set.size allPeers)
      in (total `div` numPeers) + 1

    {- | Figure out the keyspace serviced by a peer.  -}
    servicedBy :: Peer -> KeySet
    servicedBy p = foldr KS.union KS.empty [
        ks
        | (ks, ps) <- dist
        , p `Set.member` ps
      ]


{- | The actions that are taken in order to build a balanced cluster. -}
data RebalanceAction
  = Invite Peer KeySet
  | Drop Peer KeySet
  | NoAction
  deriving (Show, Generic)
instance Binary RebalanceAction
instance Monoid RebalanceAction where
  mempty = NoAction
  mappend NoAction a = a
  mappend a _ = a


{- |
  Create a new peer.
-}
newPeer :: (MonadIO m) => m Peer
newPeer = Peer <$> getUUID