Skip to content

Feature/batch consumer poll#873

Merged
mensfeld merged 7 commits into
masterfrom
feature/batch-consumer-poll
May 7, 2026
Merged

Feature/batch consumer poll#873
mensfeld merged 7 commits into
masterfrom
feature/batch-consumer-poll

Conversation

@mensfeld
Copy link
Copy Markdown
Member

@mensfeld mensfeld commented May 6, 2026

No description provided.

mensfeld added 3 commits May 6, 2026 19:15
Expose librdkafka's rd_kafka_consume_batch_queue API to reduce FFI
boundary crossings when consuming multiple messages. Instead of one
FFI call per message, poll_batch fetches up to N messages in a single
call.

New FFI bindings:
- rd_kafka_queue_get_consumer (get consumer queue handle)
- rd_kafka_consume_batch_queue (blocking, releases GVL)
- rd_kafka_consume_batch_queue_nb (non-blocking, no GVL release)

New Consumer methods:
- poll_batch(timeout_ms, max_items:) - blocking variant
- poll_batch_nb(timeout_ms=0, max_items:) - non-blocking variant

The consumer queue is lazily initialized and destroyed on close.
The FFI pointer buffer is pre-allocated and reused across calls.
All message pointers are destroyed in an ensure block even on errors.

Closes #868
@mensfeld mensfeld requested a review from Copilot May 6, 2026 17:58
@mensfeld mensfeld self-assigned this May 6, 2026
@mensfeld mensfeld added librdkafka Label for reports / issues related to C librdkafka consumer Consumer API related stuff labels May 6, 2026
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds batch consumer polling APIs to the Ruby wrapper to reduce FFI call overhead by fetching multiple messages from the consumer queue in a single librdkafka call.

Changes:

  • Add Consumer#poll_batch and Consumer#poll_batch_nb implemented via rd_kafka_consume_batch_queue (plus lazy consumer queue + reusable buffer helpers).
  • Ensure the cached consumer queue is destroyed on Consumer#close.
  • Add RSpec coverage for the new APIs and document the feature in the changelog.

Reviewed changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
lib/rdkafka/consumer.rb Implements poll_batch / poll_batch_nb, adds lazy consumer queue + reusable buffer, and destroys the queue on close.
lib/rdkafka/bindings.rb Adds FFI bindings for rd_kafka_queue_get_consumer and batch consume functions (blocking + non-blocking/GVL-holding variant).
spec/lib/rdkafka/consumer_spec.rb Adds specs for batch polling behavior, max_items behavior, error propagation, and closed-consumer behavior.
CHANGELOG.md Notes the new batch polling feature for the upcoming release.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread lib/rdkafka/consumer.rb Outdated
Comment thread lib/rdkafka/consumer.rb
Comment thread lib/rdkafka/consumer.rb
mensfeld added 2 commits May 7, 2026 09:02
…atch_nb

- Move rd_kafka_queue_destroy inside synchronize block in close to prevent
  race where another thread could use a destroyed queue between consumer_close
  and native_kafka.close
- Add @note to poll_batch_nb docs warning that non-zero timeout_ms will block
  all Ruby threads since GVL is not released
Destroy each rd_kafka_message_t right after extracting its data into a
Ruby Message object instead of holding all native messages until the end.
Reduces peak memory from N*(C_msg+Ruby_msg) to 1*C_msg+N*Ruby_msg.
On error, the ensure block still cleans up remaining unprocessed messages.
@mensfeld mensfeld merged commit f4b8ceb into master May 7, 2026
97 checks passed
@mensfeld mensfeld deleted the feature/batch-consumer-poll branch May 7, 2026 11:16
mensfeld added a commit to karafka/karafka-rdkafka that referenced this pull request May 8, 2026
* Add batch consumer poll from upstream rdkafka-ruby (karafka#873)

Cherry-pick Feature/batch consumer poll from karafka/rdkafka-ruby.
Adds Consumer#poll_batch and Consumer#poll_batch_nb for batch message
polling via rd_kafka_consume_batch_queue, reducing FFI boundary
crossings when consuming multiple messages.

* Bump version to 0.27.0 and librdkafka to 2.14.1

- Update gem version from 0.26.2 to 0.27.0
- Bump librdkafka from 2.14.0 to 2.14.1 with updated SHA256
- Update changelog header to 0.27.0 with librdkafka bump entry
- Add 0.27.x row to README versions table

* fix readme dates

* Update librdkafka dist tarball from 2.14.0 to 2.14.1

Replace the vendored librdkafka source tarball so precompiled
build CI jobs can find the expected version.
mensfeld added a commit to karafka/karafka-rdkafka that referenced this pull request May 8, 2026
Cherry-pick Feature/batch consumer poll from karafka/rdkafka-ruby.
Adds Consumer#poll_batch and Consumer#poll_batch_nb for batch message
polling via rd_kafka_consume_batch_queue, reducing FFI boundary
crossings when consuming multiple messages.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

consumer Consumer API related stuff librdkafka Label for reports / issues related to C librdkafka

Development

Successfully merging this pull request may close these issues.

2 participants