From 6cef51ef251d4ccc0406312012003830e6360f59 Mon Sep 17 00:00:00 2001 From: Daniele Sciascia Date: Fri, 5 Dec 2025 11:44:04 +0100 Subject: [PATCH] Add helpers for testing raft node additions Signed-off-by: Daniele Sciascia --- server/raft_helpers_test.go | 89 ++++++++++++++++++++++++++++--------- server/raft_test.go | 23 ++++++++++ 2 files changed, 91 insertions(+), 21 deletions(-) diff --git a/server/raft_helpers_test.go b/server/raft_helpers_test.go index a9eab9e642..da4dd408f0 100644 --- a/server/raft_helpers_test.go +++ b/server/raft_helpers_test.go @@ -143,10 +143,31 @@ func (c *cluster) createRaftGroupEx(name string, numMembers int, smf smFactory, return c.createRaftGroupWithPeers(name, servers[:numMembers], smf, st) } -func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, st StorageType) smGroup { +func (c *cluster) createWAL(name string, st StorageType) WAL { c.t.Helper() + var err error + var store WAL + if st == FileStorage { + store, err = newFileStore( + FileStoreConfig{ + StoreDir: c.t.TempDir(), + BlockSize: defaultMediumBlockSize, + AsyncFlush: false, + SyncInterval: 5 * time.Minute}, + StreamConfig{ + Name: name, + Storage: FileStorage}) + } else { + store, err = newMemStore( + &StreamConfig{ + Name: name, + Storage: MemoryStorage}) + } + require_NoError(c.t, err) + return store +} - var sg smGroup +func serverPeerNames(servers []*Server) []string { var peers []string for _, s := range servers { @@ -156,30 +177,56 @@ func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf s s.mu.RUnlock() } + return peers +} + +func (c *cluster) createStateMachine(s *Server, cfg *RaftConfig, peers []string, smf smFactory) stateMachine { + s.bootstrapRaftNode(cfg, peers, true) + n, err := s.startRaftNode(globalAccountName, cfg, pprofLabels{}) + require_NoError(c.t, err) + sm := smf(s, cfg, n) + go smLoop(sm) + return sm +} + +func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, st StorageType) smGroup { + c.t.Helper() + + var sg smGroup + peers := serverPeerNames(servers) + for _, s := range servers { - var cfg *RaftConfig - if st == FileStorage { - fs, err := newFileStore( - FileStoreConfig{StoreDir: c.t.TempDir(), BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute}, - StreamConfig{Name: name, Storage: FileStorage}, - ) - require_NoError(c.t, err) - cfg = &RaftConfig{Name: name, Store: c.t.TempDir(), Log: fs} - } else { - ms, err := newMemStore(&StreamConfig{Name: name, Storage: MemoryStorage}) - require_NoError(c.t, err) - cfg = &RaftConfig{Name: name, Store: c.t.TempDir(), Log: ms} - } - s.bootstrapRaftNode(cfg, peers, true) - n, err := s.startRaftNode(globalAccountName, cfg, pprofLabels{}) - require_NoError(c.t, err) - sm := smf(s, cfg, n) - sg = append(sg, sm) - go smLoop(sm) + cfg := &RaftConfig{ + Name: name, + Store: c.t.TempDir(), + Log: c.createWAL(name, st)} + sg = append(sg, c.createStateMachine(s, cfg, peers, smf)) } return sg } +func (c *cluster) addNodeEx(name string, smf smFactory, st StorageType) stateMachine { + c.t.Helper() + + server := c.addInNewServer() + + cfg := &RaftConfig{ + Name: name, + Store: c.t.TempDir(), + Log: c.createWAL(name, st)} + + peers := serverPeerNames(c.servers) + return c.createStateMachine(server, cfg, peers, smf) +} + +func (c *cluster) addRaftNode(name string, smf smFactory) stateMachine { + return c.addNodeEx(name, smf, FileStorage) +} + +func (c *cluster) addMemRaftNode(name string, smf smFactory) stateMachine { + return c.addNodeEx(name, smf, MemoryStorage) +} + // Driver program for the state machine. // Should be run in its own go routine. func smLoop(sm stateMachine) { diff --git a/server/raft_test.go b/server/raft_test.go index ac5bab4a23..20022a9a95 100644 --- a/server/raft_test.go +++ b/server/raft_test.go @@ -4411,3 +4411,26 @@ func TestNRGLeaderResurrectsRemovedPeers(t *testing.T) { followers[1].restart() require_Equal(t, len(leader.node().Peers()), 2) } + +func TestNRGAddPeers(t *testing.T) { + c := createJetStreamClusterExplicit(t, "R3S", 3) + defer c.shutdown() + + rg := c.createMemRaftGroup("TEST", 3, newStateAdder) + leader := rg.waitOnLeader() + + require_Equal(t, leader.node().ClusterSize(), 3) + + for range 6 { + rg = append(rg, c.addMemRaftNode("TEST", newStateAdder)) + } + + checkFor(t, 1*time.Second, 10*time.Millisecond, func() error { + if leader.node().ClusterSize() != 9 { + return errors.New("node additions still in progress") + } + return nil + }) + + require_Equal(t, leader.node().ClusterSize(), 9) +}