Skip to content

Commit 693c34e

Browse files
NRG: Can't become leader based on majority when repairing truncated log
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 9751bab commit 693c34e

File tree

2 files changed

+96
-1
lines changed

2 files changed

+96
-1
lines changed

server/raft.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3324,12 +3324,16 @@ func (n *raft) runAsCandidate() {
33243324
n.RLock()
33253325
nterm := n.term
33263326
csz := n.csz
3327+
repairing := n.repairing
33273328
n.RUnlock()
33283329

33293330
if vresp.granted && nterm == vresp.term {
33303331
// only track peers that would be our followers
33313332
n.trackPeer(vresp.peer)
3332-
if !vresp.empty {
3333+
3334+
// A vote only counts toward a majority if it's a non-empty vote from an intact server,
3335+
// and we're not repairing ourselves either (a more up-to-date server could exist).
3336+
if !vresp.empty && !repairing {
33333337
votes[vresp.peer] = struct{}{}
33343338
} else {
33353339
emptyVotes[vresp.peer] = struct{}{}

server/raft_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3654,6 +3654,7 @@ func TestNRGLostQuorum(t *testing.T) {
36543654
n, cleanup := initSingleMemRaftNode(t)
36553655
defer cleanup()
36563656

3657+
n.resetRepairing()
36573658
require_Equal(t, n.State(), Follower)
36583659
require_False(t, n.Quorum())
36593660
require_True(t, n.lostQuorum())
@@ -4127,6 +4128,96 @@ func TestNRGRepairAfterMajorityCorruption(t *testing.T) {
41274128
rg.waitOnTotal(t, 5)
41284129
}
41294130

4131+
func TestNRGRepairAfterMinorityCorruption(t *testing.T) {
4132+
c := createJetStreamClusterExplicit(t, "R3S", 3)
4133+
defer c.shutdown()
4134+
4135+
rg := c.createRaftGroup("TEST", 3, newStateAdder)
4136+
l := rg.waitOnLeader()
4137+
rs := rg.nonLeader()
4138+
for i := range int64(5) {
4139+
l.(*stateAdder).proposeDelta(1)
4140+
// Custom rg.waitOnTotal to allow a server to be down.
4141+
checkFor(t, 5*time.Second, 200*time.Millisecond, func() error {
4142+
var err error
4143+
for _, sm := range rg {
4144+
if sm.node().State() == Closed {
4145+
continue
4146+
}
4147+
asm := sm.(*stateAdder)
4148+
if total := asm.total(); total != (i + 1) {
4149+
err = errors.Join(err, fmt.Errorf("Adder on %v has wrong total: %d vs %d", asm.server(), total, i+1))
4150+
}
4151+
}
4152+
return err
4153+
})
4154+
// Stop a random member so that its log will be outdated.
4155+
if i == 3 {
4156+
rs.stop()
4157+
}
4158+
}
4159+
4160+
// Stop all servers.
4161+
for _, sm := range rg {
4162+
sm.stop()
4163+
}
4164+
4165+
// Corrupt the log on one server and then bring it back together with the outdated server.
4166+
// The corrupted server should not be able to become leader.
4167+
var cs stateMachine
4168+
for _, sm := range rg {
4169+
if sm == l || sm == rs {
4170+
continue
4171+
}
4172+
cs = sm
4173+
rn := sm.node().(*raft)
4174+
rn.RLock()
4175+
blk := filepath.Join(rn.sd, msgDir, "1.blk")
4176+
rn.RUnlock()
4177+
4178+
stat, err := os.Stat(blk)
4179+
require_NoError(t, err)
4180+
require_LessThan(t, 0, stat.Size())
4181+
require_NoError(t, os.Truncate(blk, stat.Size()-1))
4182+
4183+
// Confirm the file was truncated after restart.
4184+
sm.restart()
4185+
nstat, err := os.Stat(blk)
4186+
require_NoError(t, err)
4187+
require_NotEqual(t, nstat.Size(), stat.Size())
4188+
}
4189+
rs.restart()
4190+
expires := time.Now().Add(2 * time.Second)
4191+
for time.Now().Before(expires) {
4192+
// Speed up the leader election on the corrupted server by purposefully having it campaign immediately.
4193+
// The outdated server will vote for the corrupted server, no questions asked. But the corrupted server
4194+
// should know it requires its log to be repaired so it can't become leader based on a majority alone.
4195+
n := cs.node().(*raft)
4196+
n.Lock()
4197+
n.campaign(time.Millisecond)
4198+
n.Unlock()
4199+
4200+
time.Sleep(100 * time.Millisecond)
4201+
for _, sm := range rg {
4202+
if sm == l {
4203+
continue
4204+
}
4205+
if n := sm.node(); n.State() == Leader || n.Leader() {
4206+
t.Fatal("Leader elected from corrupted logs")
4207+
}
4208+
}
4209+
}
4210+
4211+
// Restart the leader, and we expect it to become leader again.
4212+
l.restart()
4213+
nl := rg.waitOnLeader()
4214+
require_NotNil(t, nl)
4215+
require_Equal(t, nl, l)
4216+
4217+
// The logs should have been repaired, and the total should match.
4218+
rg.waitOnTotal(t, 5)
4219+
}
4220+
41304221
// This is a RaftChainOfBlocks test where a block is proposed and then we wait for all replicas to apply it before
41314222
// proposing the next one.
41324223
// The test may fail if:

0 commit comments

Comments
 (0)