@@ -6712,3 +6712,125 @@ func TestJetStreamClusterInvalidR1Config(t *testing.T) {
67126712 }
67136713 }
67146714}
6715+
6716+ func TestJetStreamClusterMultiLeaderR3Config (t * testing.T ) {
6717+ conf := `
6718+ listen: 127.0.0.1:-1
6719+ server_name: %s
6720+ jetstream: {
6721+ store_dir: '%s',
6722+ }
6723+ cluster {
6724+ name: %s
6725+ listen: 127.0.0.1:%d
6726+ routes = [%s]
6727+ }
6728+ server_tags: ["test"]
6729+ system_account: sys
6730+ no_auth_user: js
6731+ accounts {
6732+ sys { users = [ { user: sys, pass: sys } ] }
6733+ js {
6734+ jetstream = enabled
6735+ users = [ { user: js, pass: js } ]
6736+ }
6737+ }`
6738+ c := createJetStreamClusterWithTemplate (t , conf , "R3TEST" , 3 )
6739+ defer c .shutdown ()
6740+
6741+ nc , js := jsClientConnect (t , c .servers [0 ])
6742+ defer nc .Close ()
6743+
6744+ nc2 , js2 := jsClientConnect (t , c .servers [2 ])
6745+ defer nc2 .Close ()
6746+
6747+ createStreams := func (t * testing.T , js nats.JetStreamContext , n , replicas int ) {
6748+ for i := 0 ; i < n ; i ++ {
6749+ sname := fmt .Sprintf ("S:%d" , i )
6750+ js .AddStream (& nats.StreamConfig {
6751+ Name : sname ,
6752+ MaxMsgsPerSubject : 5 ,
6753+ Replicas : replicas ,
6754+ Subjects : []string {fmt .Sprintf ("A.%d.>" , i )},
6755+ })
6756+ time .Sleep (10 * time .Millisecond )
6757+ js .AddConsumer (sname , & nats.ConsumerConfig {
6758+ Name : sname ,
6759+ Durable : sname ,
6760+ FilterSubject : ">" ,
6761+ })
6762+ js .Publish (fmt .Sprintf ("A.%d.foo" , i ), []byte ("one" ))
6763+ }
6764+ }
6765+
6766+ // Create streams in parallel with different configs, then
6767+ // check whether one of them is in an undefined state.
6768+ totalStreams := 5
6769+
6770+ var wg sync.WaitGroup
6771+ wg .Add (1 )
6772+ go func () {
6773+ defer wg .Done ()
6774+ createStreams (t , js , totalStreams , 3 )
6775+ }()
6776+
6777+ wg .Add (1 )
6778+ go func () {
6779+ defer wg .Done ()
6780+ createStreams (t , js2 , totalStreams , 1 )
6781+ }()
6782+ wg .Wait ()
6783+
6784+ checkMultiLeader := func (accountName , streamName string ) error {
6785+ leaders := make (map [string ]bool )
6786+ for _ , srv := range c .servers {
6787+ jsz , err := srv .Jsz (& JSzOptions {Accounts : true , Streams : true , Consumer : true })
6788+ if err != nil {
6789+ return err
6790+ }
6791+ for _ , acc := range jsz .AccountDetails {
6792+ if acc .Name == accountName {
6793+ for _ , stream := range acc .Streams {
6794+ if stream .Name == streamName {
6795+ leaders [stream .Cluster .Leader ] = true
6796+ }
6797+ }
6798+ }
6799+ }
6800+ }
6801+ if len (leaders ) > 1 {
6802+ return fmt .Errorf ("There are multiple leaders on %s stream: %+v" , streamName , leaders )
6803+ }
6804+ return nil
6805+ }
6806+
6807+ var invalidStream string
6808+ for i := 0 ; i < totalStreams ; i ++ {
6809+ ci , err := js .StreamInfo (fmt .Sprintf ("S:%d" , i ))
6810+ require_NoError (t , err )
6811+ if ci .Config .Replicas == 1 {
6812+ if ci .Cluster != nil {
6813+ peers := ci .Cluster .Replicas
6814+ if len (peers ) > 1 {
6815+ invalidStream = ci .Config .Name
6816+ t .Errorf ("Unexpected stream config drift, 1 replica expected but found %v peers" , len (peers ))
6817+ }
6818+ }
6819+ }
6820+ }
6821+ if len (invalidStream ) > 0 {
6822+ _ , err := js .StreamInfo (invalidStream )
6823+ require_NoError (t , err )
6824+
6825+ // Restart server where first client is connected and almost all R1 replicas landed.
6826+ srv := c .servers [0 ]
6827+ srv .Shutdown ()
6828+ srv .WaitForShutdown ()
6829+ time .Sleep (2 * time .Second )
6830+ c .restartServer (srv )
6831+ c .waitOnClusterReady ()
6832+ checkFor (t , 30 * time .Second , 200 * time .Millisecond , func () error {
6833+ return checkMultiLeader ("js" , invalidStream )
6834+ })
6835+ }
6836+ }
0 commit comments