Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 68 additions & 21 deletions server/raft_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,31 @@ func (c *cluster) createRaftGroupEx(name string, numMembers int, smf smFactory,
return c.createRaftGroupWithPeers(name, servers[:numMembers], smf, st)
}

func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, st StorageType) smGroup {
func (c *cluster) createWAL(name string, st StorageType) WAL {
c.t.Helper()
var err error
var store WAL
if st == FileStorage {
store, err = newFileStore(
FileStoreConfig{
StoreDir: c.t.TempDir(),
BlockSize: defaultMediumBlockSize,
AsyncFlush: false,
SyncInterval: 5 * time.Minute},
StreamConfig{
Name: name,
Storage: FileStorage})
} else {
store, err = newMemStore(
&StreamConfig{
Name: name,
Storage: MemoryStorage})
}
require_NoError(c.t, err)
return store
}

var sg smGroup
func serverPeerNames(servers []*Server) []string {
var peers []string

for _, s := range servers {
Expand All @@ -156,30 +177,56 @@ func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf s
s.mu.RUnlock()
}

return peers
}

func (c *cluster) createStateMachine(s *Server, cfg *RaftConfig, peers []string, smf smFactory) stateMachine {
s.bootstrapRaftNode(cfg, peers, true)
n, err := s.startRaftNode(globalAccountName, cfg, pprofLabels{})
require_NoError(c.t, err)
sm := smf(s, cfg, n)
go smLoop(sm)
return sm
}

func (c *cluster) createRaftGroupWithPeers(name string, servers []*Server, smf smFactory, st StorageType) smGroup {
c.t.Helper()

var sg smGroup
peers := serverPeerNames(servers)

for _, s := range servers {
var cfg *RaftConfig
if st == FileStorage {
fs, err := newFileStore(
FileStoreConfig{StoreDir: c.t.TempDir(), BlockSize: defaultMediumBlockSize, AsyncFlush: false, SyncInterval: 5 * time.Minute},
StreamConfig{Name: name, Storage: FileStorage},
)
require_NoError(c.t, err)
cfg = &RaftConfig{Name: name, Store: c.t.TempDir(), Log: fs}
} else {
ms, err := newMemStore(&StreamConfig{Name: name, Storage: MemoryStorage})
require_NoError(c.t, err)
cfg = &RaftConfig{Name: name, Store: c.t.TempDir(), Log: ms}
}
s.bootstrapRaftNode(cfg, peers, true)
n, err := s.startRaftNode(globalAccountName, cfg, pprofLabels{})
require_NoError(c.t, err)
sm := smf(s, cfg, n)
sg = append(sg, sm)
go smLoop(sm)
cfg := &RaftConfig{
Name: name,
Store: c.t.TempDir(),
Log: c.createWAL(name, st)}
sg = append(sg, c.createStateMachine(s, cfg, peers, smf))
}
return sg
}

func (c *cluster) addNodeEx(name string, smf smFactory, st StorageType) stateMachine {
c.t.Helper()

server := c.addInNewServer()

cfg := &RaftConfig{
Name: name,
Store: c.t.TempDir(),
Log: c.createWAL(name, st)}

peers := serverPeerNames(c.servers)
return c.createStateMachine(server, cfg, peers, smf)
}

func (c *cluster) addRaftNode(name string, smf smFactory) stateMachine {
return c.addNodeEx(name, smf, FileStorage)
}

func (c *cluster) addMemRaftNode(name string, smf smFactory) stateMachine {
return c.addNodeEx(name, smf, MemoryStorage)
}

// Driver program for the state machine.
// Should be run in its own go routine.
func smLoop(sm stateMachine) {
Expand Down
23 changes: 23 additions & 0 deletions server/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4411,3 +4411,26 @@ func TestNRGLeaderResurrectsRemovedPeers(t *testing.T) {
followers[1].restart()
require_Equal(t, len(leader.node().Peers()), 2)
}

func TestNRGAddPeers(t *testing.T) {
c := createJetStreamClusterExplicit(t, "R3S", 3)
defer c.shutdown()

rg := c.createMemRaftGroup("TEST", 3, newStateAdder)
leader := rg.waitOnLeader()

require_Equal(t, leader.node().ClusterSize(), 3)

for range 6 {
rg = append(rg, c.addMemRaftNode("TEST", newStateAdder))
}

checkFor(t, 1*time.Second, 10*time.Millisecond, func() error {
if leader.node().ClusterSize() != 9 {
return errors.New("node additions still in progress")
}
return nil
})

require_Equal(t, leader.node().ClusterSize(), 9)
}