Skip to content

Commit b5b86fb

Browse files
(2.14) [FIXED] Consumer send 404 No Messages on EOS after delivering messages
Signed-off-by: Maurice van Veen <[email protected]>
1 parent 7d6b0a8 commit b5b86fb

File tree

2 files changed

+45
-2
lines changed

2 files changed

+45
-2
lines changed

server/consumer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4536,12 +4536,12 @@ func (o *consumer) processWaiting(eos bool) (int, int, int, time.Time) {
45364536
}
45374537
}
45384538
// Normally it's a timeout.
4539-
if expires || !wr.noWait || wr.d > 0 {
4539+
if expires {
45404540
hdr := fmt.Appendf(nil, "NATS/1.0 408 Request Timeout\r\n%s: %d\r\n%s: %d\r\n\r\n", JSPullRequestPendingMsgs, wr.n, JSPullRequestPendingBytes, wr.b)
45414541
o.outq.send(newJSPubMsg(wr.reply, _EMPTY_, _EMPTY_, hdr, nil, nil, 0))
45424542
wr = remove(pre, wr)
45434543
continue
4544-
} else if wr.expires.IsZero() {
4544+
} else if wr.expires.IsZero() || wr.d > 0 {
45454545
// But if we're NoWait without expiry, we've reached the end of the stream, and we've not delivered any messages.
45464546
// Return no messages instead, which is the same as if we'd rejected the pull request initially.
45474547
hdr := fmt.Appendf(nil, "NATS/1.0 404 No Messages\r\n\r\n")

server/jetstream_consumer_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10381,3 +10381,46 @@ func TestJetStreamConsumerNoWaitNoMessagesOnEos(t *testing.T) {
1038110381
require_Equal(t, msg.Header.Get("Status"), "404")
1038210382
require_Equal(t, msg.Header.Get("Description"), "No Messages")
1038310383
}
10384+
10385+
// https://github.com/nats-io/nats-server/issues/5373
10386+
func TestJetStreamConsumerNoWaitNoMessagesOnEosWithDeliveredMsgs(t *testing.T) {
10387+
s := RunBasicJetStreamServer(t)
10388+
defer s.Shutdown()
10389+
10390+
nc, js := jsClientConnect(t, s)
10391+
defer nc.Close()
10392+
10393+
_, err := js.AddStream(&nats.StreamConfig{Name: "TEST", Subjects: []string{"foo"}})
10394+
require_NoError(t, err)
10395+
_, err = js.AddConsumer("TEST", &nats.ConsumerConfig{Durable: "CONSUMER"})
10396+
require_NoError(t, err)
10397+
10398+
_, err = js.Publish("foo", []byte("msg"))
10399+
require_NoError(t, err)
10400+
10401+
sub, err := nc.SubscribeSync("reply")
10402+
require_NoError(t, err)
10403+
defer sub.Drain()
10404+
require_NoError(t, nc.Flush())
10405+
10406+
mset, err := s.globalAccount().lookupStream("TEST")
10407+
require_NoError(t, err)
10408+
o := mset.lookupConsumer("CONSUMER")
10409+
require_NotNil(t, o)
10410+
10411+
req := &JSApiConsumerGetNextRequest{NoWait: true, Batch: 2}
10412+
jreq, err := json.Marshal(req)
10413+
require_NoError(t, err)
10414+
o.processNextMsgRequest("reply", jreq)
10415+
10416+
msg, err := sub.NextMsg(time.Second)
10417+
require_NoError(t, err)
10418+
require_Equal(t, msg.Subject, "foo")
10419+
require_Equal(t, string(msg.Data), "msg")
10420+
10421+
// We requested two messages but the stream only contained 1.
10422+
msg, err = sub.NextMsg(time.Second)
10423+
require_NoError(t, err)
10424+
require_Equal(t, msg.Header.Get("Status"), "404")
10425+
require_Equal(t, msg.Header.Get("Description"), "No Messages")
10426+
}

0 commit comments

Comments
 (0)