Skip to content

Commit 8d70f43

Browse files
[FIXED] Consumer MaxDeliver underflow (#7216)
A `MaxDeliver` of `-1`/infinite would result in an underflow when doing `uint64(cfg.MaxDeliver)`. This didn't result in any issues per se, only the `o.hasMaxDeliveries` code path couldn't short-circuit if `o.maxdc == 0`. Signed-off-by: Maurice van Veen <[email protected]>
2 parents 4598607 + c35d54e commit 8d70f43

File tree

2 files changed

+47
-2
lines changed

2 files changed

+47
-2
lines changed

server/consumer.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,7 +1120,7 @@ func (mset *stream) addConsumerWithAssignment(config *ConsumerConfig, oname stri
11201120
uch: make(chan struct{}, 1),
11211121
mch: make(chan struct{}, 1),
11221122
sfreq: int32(sampleFreq),
1123-
maxdc: uint64(config.MaxDeliver),
1123+
maxdc: uint64(max(config.MaxDeliver, 0)), // MaxDeliver is negative (-1) when infinite.
11241124
maxp: config.MaxAckPending,
11251125
retention: cfg.Retention,
11261126
created: time.Now().UTC(),
@@ -2335,7 +2335,8 @@ func (o *consumer) updateConfig(cfg *ConsumerConfig) error {
23352335
}
23362336
// Set MaxDeliver if changed
23372337
if cfg.MaxDeliver != o.cfg.MaxDeliver {
2338-
o.maxdc = uint64(cfg.MaxDeliver)
2338+
// MaxDeliver is negative (-1) when infinite.
2339+
o.maxdc = uint64(max(cfg.MaxDeliver, 0))
23392340
}
23402341
// Set InactiveThreshold if changed.
23412342
if val := cfg.InactiveThreshold; val != o.cfg.InactiveThreshold {

server/jetstream_consumer_test.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10241,3 +10241,47 @@ func TestJetStreamConsumerPrioritized(t *testing.T) {
1024110241
require_NotNil(t, msg)
1024210242
})
1024310243
}
10244+
10245+
func TestJetStreamConsumerMaxDeliverUnderflow(t *testing.T) {
10246+
s := RunBasicJetStreamServer(t)
10247+
defer s.Shutdown()
10248+
10249+
nc, js := jsClientConnect(t, s)
10250+
defer nc.Close()
10251+
10252+
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}})
10253+
require_NoError(t, err)
10254+
10255+
cfg := &nats.ConsumerConfig{Durable: "CONSUMER", MaxDeliver: -1}
10256+
_, err = js.AddConsumer("TEST", cfg)
10257+
require_NoError(t, err)
10258+
10259+
mset, err := s.globalAccount().lookupStream("TEST")
10260+
require_NoError(t, err)
10261+
o := mset.lookupConsumer("CONSUMER")
10262+
require_NotNil(t, o)
10263+
10264+
// Infinite MaxDeliver should be zero.
10265+
o.mu.RLock()
10266+
maxdc := o.maxdc
10267+
o.mu.RUnlock()
10268+
require_Equal(t, maxdc, 0)
10269+
10270+
// Finite MaxDeliver should be reported the same.
10271+
cfg.MaxDeliver = 1
10272+
_, err = js.UpdateConsumer("TEST", cfg)
10273+
require_NoError(t, err)
10274+
o.mu.RLock()
10275+
maxdc = o.maxdc
10276+
o.mu.RUnlock()
10277+
require_Equal(t, maxdc, 1)
10278+
10279+
// Infinite MaxDeliver should be zero.
10280+
cfg.MaxDeliver = -1
10281+
_, err = js.UpdateConsumer("TEST", cfg)
10282+
require_NoError(t, err)
10283+
o.mu.RLock()
10284+
maxdc = o.maxdc
10285+
o.mu.RUnlock()
10286+
require_Equal(t, maxdc, 0)
10287+
}

0 commit comments

Comments
 (0)