{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE MonoLocalBinds #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE TupleSections #-}
module Raft.Candidate (
handleAppendEntries
, handleAppendEntriesResponse
, handleRequestVote
, handleRequestVoteResponse
, handleTimeout
, handleClientRequest
) where
import Protolude
import qualified Data.Set as Set
import qualified Data.Sequence as Seq
import qualified Data.Map as Map
import Raft.NodeState
import Raft.RPC
import Raft.Client
import Raft.Event
import Raft.Action
import Raft.Persistent
import Raft.Log
import Raft.Config
import Raft.Monad
import Raft.Types
handleAppendEntries :: RPCHandler 'Candidate sm (AppendEntries v) v
handleAppendEntries (NodeCandidateState candidateState@CandidateState{..}) sender AppendEntries {..} = do
currentTerm <- gets currentTerm
if currentTerm <= aeTerm
then stepDown sender aeTerm csCommitIndex csLastApplied csLastLogEntryData
else pure $ candidateResultState Noop candidateState
handleAppendEntriesResponse :: RPCHandler 'Candidate sm AppendEntriesResponse v
handleAppendEntriesResponse (NodeCandidateState candidateState) _sender _appendEntriesResp =
pure $ candidateResultState Noop candidateState
handleRequestVote :: RPCHandler 'Candidate sm RequestVote v
handleRequestVote ns@(NodeCandidateState candidateState@CandidateState{..}) sender requestVote@RequestVote{..} = do
currentTerm <- gets currentTerm
send sender $
SendRequestVoteResponseRPC $
RequestVoteResponse currentTerm False
pure $ candidateResultState Noop candidateState
handleRequestVoteResponse
:: forall sm v. Show v
=> RPCHandler 'Candidate sm RequestVoteResponse v
handleRequestVoteResponse (NodeCandidateState candidateState@CandidateState{..}) sender requestVoteResp@RequestVoteResponse{..} = do
currentTerm <- gets currentTerm
if | Set.member sender csVotes -> pure $ candidateResultState Noop candidateState
| not rvrVoteGranted -> pure $ candidateResultState Noop candidateState
| otherwise -> do
let newCsVotes = Set.insert sender csVotes
cNodeIds <- asks (configNodeIds . nodeConfig)
if not $ hasMajority cNodeIds newCsVotes
then do
let newCandidateState = candidateState { csVotes = newCsVotes }
pure $ candidateResultState Noop newCandidateState
else leaderResultState BecomeLeader <$> becomeLeader
where
hasMajority :: Set a -> Set b -> Bool
hasMajority nids votes =
Set.size votes >= Set.size nids `div` 2 + 1
mkNoopEntry :: TransitionM sm v (Entry v)
mkNoopEntry = do
let (lastLogEntryIdx, _) = csLastLogEntryData
currTerm <- gets currentTerm
nid <- asks (configNodeId . nodeConfig)
pure Entry
{ entryIndex = succ lastLogEntryIdx
, entryTerm = currTerm
, entryValue = NoValue
, entryIssuer = LeaderIssuer (LeaderId nid)
}
becomeLeader :: TransitionM sm v LeaderState
becomeLeader = do
currentTerm <- gets currentTerm
resetHeartbeatTimeout
noopEntry <- mkNoopEntry
appendLogEntries (Seq.Empty Seq.|> noopEntry)
broadcast $ SendAppendEntriesRPC
AppendEntriesData
{ aedTerm = currentTerm
, aedLeaderCommit = csCommitIndex
, aedEntriesSpec = FromNewLeader noopEntry
}
cNodeIds <- asks (configNodeIds . nodeConfig)
let lastLogEntryIdx = entryIndex noopEntry
lastLogEntryTerm = entryTerm noopEntry
pure LeaderState
{ lsCommitIndex = csCommitIndex
, lsLastApplied = csLastApplied
, lsNextIndex = Map.fromList $
(,lastLogEntryIdx) <$> Set.toList cNodeIds
, lsMatchIndex = Map.fromList $
(,index0) <$> Set.toList cNodeIds
, lsLastLogEntryData = (lastLogEntryIdx, lastLogEntryTerm, Nothing)
, lsReadReqsHandled = 0
, lsReadRequest = mempty
}
handleTimeout :: TimeoutHandler 'Candidate sm v
handleTimeout (NodeCandidateState candidateState@CandidateState{..}) timeout =
case timeout of
HeartbeatTimeout -> pure $ candidateResultState Noop candidateState
ElectionTimeout ->
candidateResultState RestartElection <$>
startElection csCommitIndex csLastApplied csLastLogEntryData
handleClientRequest :: ClientReqHandler 'Candidate sm v
handleClientRequest (NodeCandidateState candidateState) (ClientRequest clientId _) = do
redirectClientToLeader clientId NoLeader
pure (candidateResultState Noop candidateState)
stepDown
:: NodeId
-> Term
-> Index
-> Index
-> (Index, Term)
-> TransitionM a sm (ResultState 'Candidate)
stepDown sender term commitIndex lastApplied lastLogEntryData = do
send sender $
SendRequestVoteResponseRPC $
RequestVoteResponse
{ rvrTerm = term
, rvrVoteGranted = True
}
resetElectionTimeout
pure $ ResultState DiscoverLeader $
NodeFollowerState FollowerState
{ fsCurrentLeader = CurrentLeader (LeaderId sender)
, fsCommitIndex = commitIndex
, fsLastApplied = lastApplied
, fsLastLogEntryData = lastLogEntryData
, fsTermAtAEPrevIndex = Nothing
}