Feature/batch consumer poll#873
Merged
Merged
Conversation
Expose librdkafka's rd_kafka_consume_batch_queue API to reduce FFI boundary crossings when consuming multiple messages. Instead of one FFI call per message, poll_batch fetches up to N messages in a single call. New FFI bindings: - rd_kafka_queue_get_consumer (get consumer queue handle) - rd_kafka_consume_batch_queue (blocking, releases GVL) - rd_kafka_consume_batch_queue_nb (non-blocking, no GVL release) New Consumer methods: - poll_batch(timeout_ms, max_items:) - blocking variant - poll_batch_nb(timeout_ms=0, max_items:) - non-blocking variant The consumer queue is lazily initialized and destroyed on close. The FFI pointer buffer is pre-allocated and reused across calls. All message pointers are destroyed in an ensure block even on errors. Closes #868
Contributor
There was a problem hiding this comment.
Pull request overview
Adds batch consumer polling APIs to the Ruby wrapper to reduce FFI call overhead by fetching multiple messages from the consumer queue in a single librdkafka call.
Changes:
- Add
Consumer#poll_batchandConsumer#poll_batch_nbimplemented viard_kafka_consume_batch_queue(plus lazy consumer queue + reusable buffer helpers). - Ensure the cached consumer queue is destroyed on
Consumer#close. - Add RSpec coverage for the new APIs and document the feature in the changelog.
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
lib/rdkafka/consumer.rb |
Implements poll_batch / poll_batch_nb, adds lazy consumer queue + reusable buffer, and destroys the queue on close. |
lib/rdkafka/bindings.rb |
Adds FFI bindings for rd_kafka_queue_get_consumer and batch consume functions (blocking + non-blocking/GVL-holding variant). |
spec/lib/rdkafka/consumer_spec.rb |
Adds specs for batch polling behavior, max_items behavior, error propagation, and closed-consumer behavior. |
CHANGELOG.md |
Notes the new batch polling feature for the upcoming release. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
…atch_nb - Move rd_kafka_queue_destroy inside synchronize block in close to prevent race where another thread could use a destroyed queue between consumer_close and native_kafka.close - Add @note to poll_batch_nb docs warning that non-zero timeout_ms will block all Ruby threads since GVL is not released
Destroy each rd_kafka_message_t right after extracting its data into a Ruby Message object instead of holding all native messages until the end. Reduces peak memory from N*(C_msg+Ruby_msg) to 1*C_msg+N*Ruby_msg. On error, the ensure block still cleans up remaining unprocessed messages.
mensfeld
added a commit
to karafka/karafka-rdkafka
that referenced
this pull request
May 8, 2026
* Add batch consumer poll from upstream rdkafka-ruby (karafka#873) Cherry-pick Feature/batch consumer poll from karafka/rdkafka-ruby. Adds Consumer#poll_batch and Consumer#poll_batch_nb for batch message polling via rd_kafka_consume_batch_queue, reducing FFI boundary crossings when consuming multiple messages. * Bump version to 0.27.0 and librdkafka to 2.14.1 - Update gem version from 0.26.2 to 0.27.0 - Bump librdkafka from 2.14.0 to 2.14.1 with updated SHA256 - Update changelog header to 0.27.0 with librdkafka bump entry - Add 0.27.x row to README versions table * fix readme dates * Update librdkafka dist tarball from 2.14.0 to 2.14.1 Replace the vendored librdkafka source tarball so precompiled build CI jobs can find the expected version.
mensfeld
added a commit
to karafka/karafka-rdkafka
that referenced
this pull request
May 8, 2026
Cherry-pick Feature/batch consumer poll from karafka/rdkafka-ruby. Adds Consumer#poll_batch and Consumer#poll_batch_nb for batch message polling via rd_kafka_consume_batch_queue, reducing FFI boundary crossings when consuming multiple messages.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
No description provided.