Skip to content

Commit d393277

Browse files
committed
rsm: not to call ManagedStateMachine.Prepare() when saving dummy snapshots
Fixes #162
1 parent b9784da commit d393277

File tree

2 files changed

+117
-12
lines changed

2 files changed

+117
-12
lines changed

internal/rsm/statemachine.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,10 @@ func (s *StateMachine) setLastApplied(entries []pb.Entry) {
707707
}
708708
}
709709

710+
func (s *StateMachine) savingDummySnapshot(r SSRequest) bool {
711+
return s.OnDiskStateMachine() && !r.Streaming() && !r.Exported()
712+
}
713+
710714
func (s *StateMachine) checkSnapshotStatus(r SSRequest) error {
711715
if s.aborted {
712716
return sm.ErrSnapshotStopped
@@ -771,7 +775,7 @@ func (s *StateMachine) prepare(r SSRequest) (SSMeta, error) {
771775
}
772776
var err error
773777
var ctx interface{}
774-
if s.Concurrent() {
778+
if s.Concurrent() && !s.savingDummySnapshot(r) {
775779
ctx, err = s.sm.Prepare()
776780
if err != nil {
777781
return SSMeta{}, err

internal/rsm/statemachine_test.go

Lines changed: 112 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2016,11 +2016,15 @@ func TestAlreadyAppliedInOnDiskSMEntryTreatedAsNoOP(t *testing.T) {
20162016
}
20172017

20182018
type testManagedStateMachine struct {
2019-
first uint64
2020-
last uint64
2021-
synced bool
2022-
nalookup bool
2023-
corruptIndex bool
2019+
first uint64
2020+
last uint64
2021+
synced bool
2022+
nalookup bool
2023+
corruptIndex bool
2024+
concurrent bool
2025+
onDisk bool
2026+
smType pb.StateMachineType
2027+
prepareInvoked bool
20242028
}
20252029

20262030
func (t *testManagedStateMachine) Open() (uint64, error) { return 10, nil }
@@ -2042,8 +2046,11 @@ func (t *testManagedStateMachine) Sync() error {
20422046
t.synced = true
20432047
return nil
20442048
}
2045-
func (t *testManagedStateMachine) GetHash() (uint64, error) { return 0, nil }
2046-
func (t *testManagedStateMachine) Prepare() (interface{}, error) { return nil, nil }
2049+
func (t *testManagedStateMachine) GetHash() (uint64, error) { return 0, nil }
2050+
func (t *testManagedStateMachine) Prepare() (interface{}, error) {
2051+
t.prepareInvoked = true
2052+
return nil, nil
2053+
}
20472054
func (t *testManagedStateMachine) Save(SSMeta,
20482055
io.Writer, []byte, sm.ISnapshotFileCollection) (bool, error) {
20492056
return false, nil
@@ -2056,9 +2063,9 @@ func (t *testManagedStateMachine) Offloaded() bool { return
20562063
func (t *testManagedStateMachine) Loaded() {}
20572064
func (t *testManagedStateMachine) Close() {}
20582065
func (t *testManagedStateMachine) DestroyedC() <-chan struct{} { return nil }
2059-
func (t *testManagedStateMachine) Concurrent() bool { return false }
2060-
func (t *testManagedStateMachine) OnDisk() bool { return false }
2061-
func (t *testManagedStateMachine) Type() pb.StateMachineType { return 0 }
2066+
func (t *testManagedStateMachine) Concurrent() bool { return t.concurrent }
2067+
func (t *testManagedStateMachine) OnDisk() bool { return t.onDisk }
2068+
func (t *testManagedStateMachine) Type() pb.StateMachineType { return t.smType }
20622069
func (t *testManagedStateMachine) BatchedUpdate(ents []sm.Entry) ([]sm.Entry, error) {
20632070
if !t.corruptIndex {
20642071
t.first = ents[0].Index
@@ -2068,7 +2075,6 @@ func (t *testManagedStateMachine) BatchedUpdate(ents []sm.Entry) ([]sm.Entry, er
20682075
ents[idx].Index = ents[idx].Index + 1
20692076
}
20702077
}
2071-
20722078
return ents, nil
20732079
}
20742080

@@ -2411,3 +2417,98 @@ func TestSetLastApplied(t *testing.T) {
24112417
}()
24122418
}
24132419
}
2420+
2421+
func TestSavingDummySnapshot(t *testing.T) {
2422+
tests := []struct {
2423+
smType pb.StateMachineType
2424+
streaming bool
2425+
export bool
2426+
result bool
2427+
}{
2428+
{pb.RegularStateMachine, true, false, false},
2429+
{pb.RegularStateMachine, false, true, false},
2430+
{pb.RegularStateMachine, false, false, false},
2431+
{pb.ConcurrentStateMachine, true, false, false},
2432+
{pb.ConcurrentStateMachine, false, true, false},
2433+
{pb.ConcurrentStateMachine, false, false, false},
2434+
{pb.OnDiskStateMachine, true, false, false},
2435+
{pb.OnDiskStateMachine, false, true, false},
2436+
{pb.OnDiskStateMachine, false, false, true},
2437+
}
2438+
for idx, tt := range tests {
2439+
sm := StateMachine{
2440+
onDiskSM: tt.smType == pb.OnDiskStateMachine,
2441+
}
2442+
var rt SSReqType
2443+
if tt.export && tt.streaming {
2444+
panic("bad test input")
2445+
}
2446+
if tt.export {
2447+
rt = Exported
2448+
} else if tt.streaming {
2449+
rt = Streaming
2450+
}
2451+
if r := sm.savingDummySnapshot(SSRequest{Type: rt}); r != tt.result {
2452+
t.Errorf("%d, got %t, want %t", idx, r, tt.result)
2453+
}
2454+
}
2455+
}
2456+
2457+
func TestPrepareIsNotCalledWhenSavingDummySnapshot(t *testing.T) {
2458+
tests := []struct {
2459+
onDiskSM bool
2460+
streaming bool
2461+
export bool
2462+
prepareInvoked bool
2463+
}{
2464+
{true, false, false, false},
2465+
{true, true, false, true},
2466+
{true, false, true, true},
2467+
{false, false, false, true},
2468+
{false, false, true, true},
2469+
}
2470+
2471+
for idx, tt := range tests {
2472+
msm := &testManagedStateMachine{
2473+
concurrent: true,
2474+
onDisk: tt.onDiskSM,
2475+
smType: pb.ConcurrentStateMachine,
2476+
}
2477+
if tt.onDiskSM {
2478+
msm.smType = pb.OnDiskStateMachine
2479+
}
2480+
m := &membership{
2481+
members: &pb.Membership{
2482+
Addresses: map[uint64]string{1: "localhost:1234"},
2483+
},
2484+
}
2485+
sm := StateMachine{
2486+
index: 100,
2487+
onDiskSM: tt.onDiskSM,
2488+
sm: msm,
2489+
members: m,
2490+
node: &testNodeProxy{},
2491+
sessions: NewSessionManager(),
2492+
}
2493+
var rt SSReqType
2494+
if tt.export && tt.streaming {
2495+
panic("bad test input")
2496+
}
2497+
if tt.export {
2498+
rt = Exported
2499+
} else if tt.streaming {
2500+
rt = Streaming
2501+
}
2502+
meta, err := sm.prepare(SSRequest{Type: rt})
2503+
if err != nil {
2504+
t.Errorf("prepare failed, %v", err)
2505+
}
2506+
if meta.Index != 100 {
2507+
t.Errorf("failed to get the snapshot metadata")
2508+
}
2509+
if msm.prepareInvoked != tt.prepareInvoked {
2510+
t.Errorf("%d, prepareInvoked got %t, want %t",
2511+
idx, msm.prepareInvoked, tt.prepareInvoked)
2512+
}
2513+
}
2514+
}

0 commit comments

Comments
 (0)