{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE MonoLocalBinds #-}
module Raft.Follower (
handleAppendEntries
, handleAppendEntriesResponse
, handleRequestVote
, handleRequestVoteResponse
, handleTimeout
, handleClientRequest
) where
import Protolude
import Data.Sequence (Seq(..))
import Raft.Action
import Raft.NodeState
import Raft.RPC
import Raft.Client
import Raft.Event
import Raft.Persistent
import Raft.Log (entryIndex)
import Raft.Monad
import Raft.Types
handleAppendEntries :: forall v sm. Show v => RPCHandler 'Follower sm (AppendEntries v) v
handleAppendEntries ns@(NodeFollowerState fs) sender AppendEntries{..} = do
PersistentState{..} <- get
(success, newFollowerState) <-
if aeTerm < currentTerm
then pure (False, fs)
else
case fsTermAtAEPrevIndex fs of
Nothing
| aePrevLogIndex == index0 -> do
appendLogEntries aeEntries
pure (True, updateFollowerState fs)
| otherwise -> pure (False, fs)
Just entryAtAePrevLogIndexTerm ->
if entryAtAePrevLogIndexTerm /= aePrevLogTerm
then pure (False, fs)
else do
appendLogEntries aeEntries
pure (True, updateFollowerState fs)
send (unLeaderId aeLeaderId) $
SendAppendEntriesResponseRPC $
AppendEntriesResponse
{ aerTerm = currentTerm
, aerSuccess = success
, aerReadRequest = aeReadRequest
}
resetElectionTimeout
pure (followerResultState Noop newFollowerState)
where
updateFollowerState :: FollowerState -> FollowerState
updateFollowerState fs =
if aeLeaderCommit > fsCommitIndex fs
then updateLeader (updateCommitIndex fs)
else updateLeader fs
updateCommitIndex :: FollowerState -> FollowerState
updateCommitIndex followerState =
case aeEntries of
Empty ->
followerState { fsCommitIndex = aeLeaderCommit }
_ :|> e ->
let newCommitIndex = min aeLeaderCommit (entryIndex e)
in followerState { fsCommitIndex = newCommitIndex }
updateLeader :: FollowerState -> FollowerState
updateLeader followerState = followerState { fsCurrentLeader = CurrentLeader (LeaderId sender) }
handleAppendEntriesResponse :: RPCHandler 'Follower sm AppendEntriesResponse v
handleAppendEntriesResponse (NodeFollowerState fs) _ _ =
pure (followerResultState Noop fs)
handleRequestVote :: RPCHandler 'Follower sm RequestVote v
handleRequestVote ns@(NodeFollowerState fs) sender RequestVote{..} = do
PersistentState{..} <- get
let voteGranted = giveVote currentTerm votedFor
logDebug $ "Vote granted: " <> show voteGranted
send sender $
SendRequestVoteResponseRPC $
RequestVoteResponse
{ rvrTerm = currentTerm
, rvrVoteGranted = voteGranted
}
when voteGranted $
modify $ \pstate ->
pstate { votedFor = Just sender }
pure $ followerResultState Noop fs
where
giveVote term mVotedFor =
and [ term <= rvTerm
, validCandidateId mVotedFor
, validCandidateLog
]
validCandidateId Nothing = True
validCandidateId (Just cid) = cid == rvCandidateId
validCandidateLog =
let (lastLogEntryIdx, lastLogEntryTerm) = fsLastLogEntryData fs
in (rvLastLogTerm > lastLogEntryTerm)
|| (rvLastLogTerm == lastLogEntryTerm && rvLastLogIndex >= lastLogEntryIdx)
handleRequestVoteResponse :: RPCHandler 'Follower sm RequestVoteResponse v
handleRequestVoteResponse (NodeFollowerState fs) _ _ =
pure (followerResultState Noop fs)
handleTimeout :: TimeoutHandler 'Follower sm v
handleTimeout ns@(NodeFollowerState fs) timeout =
case timeout of
ElectionTimeout -> do
logDebug "Follower times out. Starts election. Becomes candidate"
candidateResultState StartElection <$>
startElection (fsCommitIndex fs) (fsLastApplied fs) (fsLastLogEntryData fs)
HeartbeatTimeout -> pure (followerResultState Noop fs)
handleClientRequest :: ClientReqHandler 'Follower sm v
handleClientRequest (NodeFollowerState fs) (ClientRequest clientId _)= do
redirectClientToLeader clientId (fsCurrentLeader fs)
pure (followerResultState Noop fs)