Skip to content

Commit 85594c8

Browse files
[FIXED] Consumer send 404 No Messages on EOS (#7466)
Requests using `NoWait` but no expiry would not receive `404 No Messages` if the stream was empty and no messages were delivered. `408 Request Timeout` would only be returned if messages were delivered or the request expired. This PR fixes that by sending a `404 No Messages` for `NoWait` requests without expiry (same response when doing a pull request on a consumer with no pending messages) when reaching the end of the stream. Resolves #7457, #5373 Signed-off-by: Maurice van Veen <[email protected]>
2 parents 72503cb + b5b86fb commit 85594c8

File tree

2 files changed

+101
-4
lines changed

2 files changed

+101
-4
lines changed

server/consumer.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4519,7 +4519,8 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
45194519
var pre *waitingRequest
45204520
for wr := wq.head; wr != nil; {
45214521
// Check expiration.
4522-
if (eos && wr.noWait && wr.d > 0) || (!wr.expires.IsZero() && now.After(wr.expires)) {
4522+
expires := !wr.expires.IsZero() && now.After(wr.expires)
4523+
if (eos && wr.noWait) || expires {
45234524
rdWait := o.replicateDeliveries()
45244525
if rdWait {
45254526
// Check if we need to send the timeout after pending replicated deliveries, or can do so immediately.
@@ -4528,13 +4529,26 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
45284529
} else {
45294530
wd.pn, wd.pb = wr.n, wr.b
45304531
}
4532+
// If we still need to wait for replicated deliveries, remove from waiting list.
4533+
if rdWait {
4534+
wr = remove(pre, wr)
4535+
continue
4536+
}
45314537
}
4532-
if !rdWait {
4538+
// Normally it's a timeout.
4539+
if expires {
45334540
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
45344541
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
4542+
wr = remove(pre, wr)
4543+
continue
4544+
} else if wr.expires.IsZero() || wr.d > 0 {
4545+
// But if we're NoWait without expiry, we've reached the end of the stream, and we've not delivered any messages.
4546+
// Return no messages instead, which is the same as if we'd rejected the pull request initially.
4547+
hdr := fmt.Appendf(nil, "NATS/1.0 404 No Messages\r\n\r\n")
4548+
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
4549+
wr = remove(pre, wr)
4550+
continue
45354551
}
4536-
wr = remove(pre, wr)
4537-
continue
45384552
}
45394553
// Now check interest.
45404554
interest := wr.acc.sl.HasInterest(wr.interest)

server/jetstream_consumer_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10341,3 +10341,86 @@ func TestJetStreamConsumerMaxDeliverUnderflow(t *testing.T) {
1034110341
o.mu.RUnlock()
1034210342
require_Equal(t, maxdc, 0)
1034310343
}
10344+
10345+
// https://github.com/nats-io/nats-server/issues/7457
10346+
func TestJetStreamConsumerNoWaitNoMessagesOnEos(t *testing.T) {
10347+
s := RunBasicJetStreamServer(t)
10348+
defer s.Shutdown()
10349+
10350+
nc, js := jsClientConnect(t, s)
10351+
defer nc.Close()
10352+
10353+
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}})
10354+
require_NoError(t, err)
10355+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"})
10356+
require_NoError(t, err)
10357+
10358+
sub, err := nc.SubscribeSync("reply")
10359+
require_NoError(t, err)
10360+
defer sub.Drain()
10361+
require_NoError(t, nc.Flush())
10362+
10363+
mset, err := s.globalAccount().lookupStream("TEST")
10364+
require_NoError(t, err)
10365+
o := mset.lookupConsumer("CONSUMER")
10366+
require_NotNil(t, o)
10367+
10368+
// Fiddle with the pending count such that the NoWait request will go through,
10369+
// and the "404 No Messages" will be sent when hitting the end of the stream.
10370+
o.mu.Lock()
10371+
o.npc++
10372+
o.mu.Unlock()
10373+
10374+
req := &JSApiConsumerGetNextRequest{NoWait: true}
10375+
jreq, err := json.Marshal(req)
10376+
require_NoError(t, err)
10377+
o.processNextMsgRequest("reply", jreq)
10378+
10379+
msg, err := sub.NextMsg(time.Second)
10380+
require_NoError(t, err)
10381+
require_Equal(t, msg.Header.Get("Status"), "404")
10382+
require_Equal(t, msg.Header.Get("Description"), "No Messages")
10383+
}
10384+
10385+
// https://github.com/nats-io/nats-server/issues/5373
10386+
func TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs(t *testing.T) {
10387+
s := RunBasicJetStreamServer(t)
10388+
defer s.Shutdown()
10389+
10390+
nc, js := jsClientConnect(t, s)
10391+
defer nc.Close()
10392+
10393+
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}})
10394+
require_NoError(t, err)
10395+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"})
10396+
require_NoError(t, err)
10397+
10398+
_, err = js.Publish("foo", []byte("msg"))
10399+
require_NoError(t, err)
10400+
10401+
sub, err := nc.SubscribeSync("reply")
10402+
require_NoError(t, err)
10403+
defer sub.Drain()
10404+
require_NoError(t, nc.Flush())
10405+
10406+
mset, err := s.globalAccount().lookupStream("TEST")
10407+
require_NoError(t, err)
10408+
o := mset.lookupConsumer("CONSUMER")
10409+
require_NotNil(t, o)
10410+
10411+
req := &JSApiConsumerGetNextRequest{NoWait: true, Batch: 2}
10412+
jreq, err := json.Marshal(req)
10413+
require_NoError(t, err)
10414+
o.processNextMsgRequest("reply", jreq)
10415+
10416+
msg, err := sub.NextMsg(time.Second)
10417+
require_NoError(t, err)
10418+
require_Equal(t, msg.Subject, "foo")
10419+
require_Equal(t, string(msg.Data), "msg")
10420+
10421+
// We requested two messages but the stream only contained 1.
10422+
msg, err = sub.NextMsg(time.Second)
10423+
require_NoError(t, err)
10424+
require_Equal(t, msg.Header.Get("Status"), "404")
10425+
require_Equal(t, msg.Header.Get("Description"), "No Messages")
10426+
}

0 commit comments

Comments
 (0)