-
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Proposed change
Summary
I would like to request a per-subscriber prefetch count feature in JetStream, similar to RabbitMQ’s basic.qos(prefetch_count=N).
Currently, JetStream allows controlling MaxAckPending, which applies per consumer. When multiple subscribers share a single durable consumer / queue group, MaxAckPending is global, and there is no way to limit how many messages a single subscriber can receive at a time.
Motivation / Use Case
Consider a high-throughput processing system with multiple worker instances consuming from the same queue group. Each worker can only process a limited number of messages concurrently.
-
With RabbitMQ,
prefetch_countensures each subscriber receives at most N un-acked messages, giving predictable per-worker concurrency. -
In JetStream today:
- MaxAckPending applies per consumer, not per subscriber.
- If some subscribers go down, remaining subscribers may suddenly receive a large number of messages, overwhelming them.
- This makes per-subscriber concurrency control difficult.
Proposal
- Introduce a
PrefetchCountor similar option at the subscriber level. - The broker would ensure no subscriber receives more than N un-acked messages, independent of other subscribers sharing the consumer.
- This should integrate with queue groups so each subscriber respects its own prefetch limit.
Attempted Workarounds
I tried to achieve this using the new JetStream Go API without success. For example, I tried limiting the number of messages fetched per pull using:
jetstream.PullMaxMessages(10)- This only limits the client-side batch per pull request, not per-subscriber concurrency.
- Even with this, a subscriber can receive many more messages if MaxAckPending for the consumer allows it.
This illustrates that the current API does not provide the requested behavior natively.
Benefits
- Predictable per-subscriber concurrency
- Reduces risk of slow-consumer warnings
- Simplifies design of high-throughput workloads with variable processing times