{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE MonoLocalBinds #-}
module Raft.Follower (
handleAppendEntries
, handleAppendEntriesResponse
, handleRequestVote
, handleRequestVoteResponse
, handleTimeout
, handleClientReadRequest
, handleClientWriteRequest
) where
import Protolude
import Data.Sequence (Seq(..))
import Raft.Action
import Raft.NodeState
import Raft.RPC
import Raft.Client
import Raft.Event
import Raft.Persistent
import Raft.Log
import Raft.Transition
import Raft.Types
handleAppendEntries :: forall v sm. Show v => RPCHandler 'Follower sm (AppendEntries v) v
handleAppendEntries ns@(NodeFollowerState fs) sender AppendEntries{..} = do
PersistentState{..} <- get
(success, newFollowerState) <-
if aeTerm < currentTerm
then pure (False, fs)
else
case fsTermAtAEPrevIndex fs of
Nothing
| aePrevLogIndex == index0 -> do
appendLogEntries aeEntries
pure (True, updateFollowerState fs)
| otherwise -> pure (False, fs)
Just entryAtAePrevLogIndexTerm ->
if entryAtAePrevLogIndexTerm /= aePrevLogTerm
then pure (False, fs)
else do
appendLogEntries aeEntries
pure (True, updateFollowerState fs)
when success resetElectionTimeout
send (unLeaderId aeLeaderId) $
SendAppendEntriesResponseRPC $
AppendEntriesResponse
{ aerTerm = currentTerm
, aerSuccess = success
, aerReadRequest = aeReadRequest
}
pure (followerResultState Noop newFollowerState)
where
updateFollowerState :: FollowerState v -> FollowerState v
updateFollowerState fs =
if aeLeaderCommit > fsCommitIndex fs
then updateLeader (updateCommitIndex fs)
else updateLeader fs
updateCommitIndex :: FollowerState v -> FollowerState v
updateCommitIndex followerState =
case aeEntries of
Empty -> followerState
_ :|> e ->
let newCommitIndex = min aeLeaderCommit (entryIndex e)
in followerState { fsCommitIndex = newCommitIndex }
updateLeader :: FollowerState v -> FollowerState v
updateLeader followerState = followerState { fsCurrentLeader = CurrentLeader (LeaderId sender) }
handleAppendEntriesResponse :: RPCHandler 'Follower sm AppendEntriesResponse v
handleAppendEntriesResponse (NodeFollowerState fs) _ _ =
pure (followerResultState Noop fs)
handleRequestVote :: RPCHandler 'Follower sm RequestVote v
handleRequestVote ns@(NodeFollowerState fs) sender RequestVote{..} = do
PersistentState{..} <- get
let voteGranted = giveVote currentTerm votedFor
when voteGranted $ do
modify $ \pstate ->
pstate { votedFor = Just sender }
resetElectionTimeout
send sender $
SendRequestVoteResponseRPC $
RequestVoteResponse
{ rvrTerm = currentTerm
, rvrVoteGranted = voteGranted
}
pure $ followerResultState Noop fs
where
giveVote term mVotedFor =
and [ term <= rvTerm
, validCandidateId mVotedFor
, validCandidateLog
]
validCandidateId Nothing = True
validCandidateId (Just cid) = cid == rvCandidateId
validCandidateLog =
let (lastEntryIdx, lastEntryTerm) = lastLogEntryIndexAndTerm (fsLastLogEntry fs)
in (rvLastLogTerm > lastEntryTerm)
|| (rvLastLogTerm == lastEntryTerm && rvLastLogIndex >= lastEntryIdx)
handleRequestVoteResponse :: RPCHandler 'Follower sm RequestVoteResponse v
handleRequestVoteResponse (NodeFollowerState fs) _ _ =
pure (followerResultState Noop fs)
handleTimeout :: TimeoutHandler 'Follower sm v
handleTimeout ns@(NodeFollowerState fs) timeout =
case timeout of
ElectionTimeout -> do
logDebug "Follower times out. Starts election. Becomes candidate"
candidateResultState StartElection <$>
startElection (fsCommitIndex fs) (fsLastApplied fs) (fsLastLogEntry fs) (fsClientReqCache fs)
HeartbeatTimeout -> pure (followerResultState Noop fs)
handleClientReadRequest :: ClientReqHandler 'Follower ClientReadReq sm v
handleClientReadRequest = handleClientRequest
handleClientWriteRequest :: ClientReqHandler 'Follower (ClientWriteReq v) sm v
handleClientWriteRequest = handleClientRequest
handleClientRequest :: ClientReqHandler 'Follower cr sm v
handleClientRequest (NodeFollowerState fs) clientId _ = do
redirectClientToLeader clientId (fsCurrentLeader fs)
pure (followerResultState Noop fs)