From 3aec21a03268d6ee1cd513bd76b76a6d83b9cdbe Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Wed, 6 May 2026 19:15:23 +0200 Subject: [PATCH 1/7] Add batch consumer polling via rd_kafka_consume_batch_queue Expose librdkafka's rd_kafka_consume_batch_queue API to reduce FFI boundary crossings when consuming multiple messages. Instead of one FFI call per message, poll_batch fetches up to N messages in a single call. New FFI bindings: - rd_kafka_queue_get_consumer (get consumer queue handle) - rd_kafka_consume_batch_queue (blocking, releases GVL) - rd_kafka_consume_batch_queue_nb (non-blocking, no GVL release) New Consumer methods: - poll_batch(timeout_ms, max_items:) - blocking variant - poll_batch_nb(timeout_ms=0, max_items:) - non-blocking variant The consumer queue is lazily initialized and destroyed on close. The FFI pointer buffer is pre-allocated and reused across calls. All message pointers are destroyed in an ensure block even on errors. Closes #868 --- CHANGELOG.md | 1 + lib/rdkafka/bindings.rb | 5 ++ lib/rdkafka/consumer.rb | 129 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 135 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21f74289..c15807c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Rdkafka Changelog ## 0.27.0 (Unreleased) +- [Feature] Add batch consumer polling via `rd_kafka_consume_batch_queue`. New methods `Consumer#poll_batch(timeout_ms, max_items:)` and `Consumer#poll_batch_nb(timeout_ms, max_items:)` fetch up to N messages in a single FFI call, reducing per-message FFI boundary overhead. The `_nb` variant skips GVL release for use in fiber scheduler / IO-multiplexed architectures. - [Feature] Add `Config#describe_properties` to dump all librdkafka configuration properties (including defaults and hidden properties) as a Hash via `rd_kafka_conf_dump`. - [Enhancement] Bump librdkafka to `2.14.0` - [Fix] Fix resource leak in `Admin#describe_configs` and `Admin#incremental_alter_configs` where `admin_options_ptr` and `queue_ptr` were not destroyed in the ensure block. diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index 07ca412e..7b358199 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -386,6 +386,11 @@ class NativeErrorDesc < FFI::Struct # More efficient for poll(0) calls in fiber schedulers. attach_function :rd_kafka_consumer_poll_nb, :rd_kafka_consumer_poll, [:pointer, :int], :pointer, blocking: false attach_function :rd_kafka_consumer_close, [:pointer], :void, blocking: true + # Batch consumer queue API + attach_function :rd_kafka_queue_get_consumer, [:pointer], :pointer + attach_function :rd_kafka_consume_batch_queue, [:pointer, :int, :pointer, :size_t], :ssize_t, blocking: true + # Non-blocking batch consume variant (does not release GVL) + attach_function :rd_kafka_consume_batch_queue_nb, :rd_kafka_consume_batch_queue, [:pointer, :int, :pointer, :size_t], :ssize_t, blocking: false attach_function :rd_kafka_offsets_store, [:pointer, :pointer], :int, blocking: true attach_function :rd_kafka_pause_partitions, [:pointer, :pointer], :int, blocking: true attach_function :rd_kafka_resume_partitions, [:pointer, :pointer], :int, blocking: true diff --git a/lib/rdkafka/consumer.rb b/lib/rdkafka/consumer.rb index 6e1d882e..681c5f4d 100644 --- a/lib/rdkafka/consumer.rb +++ b/lib/rdkafka/consumer.rb @@ -185,6 +185,11 @@ def close Rdkafka::Bindings.rd_kafka_consumer_close(inner) end + if @consumer_queue + Rdkafka::Bindings.rd_kafka_queue_destroy(@consumer_queue) + @consumer_queue = nil + end + @native_kafka.close end @@ -781,6 +786,111 @@ def events_poll_nb(timeout_ms = 0) end end + # Poll for a batch of messages from the consumer queue in a single FFI call. + # + # This is more efficient than calling {#poll} in a loop because it crosses the FFI + # boundary only once to fetch up to `max_items` messages. + # + # The timeout controls how long to wait for the **first** message. Once any message + # is available, librdkafka fills the buffer with whatever is immediately ready and + # returns without further waiting. + # + # @param timeout_ms [Integer] Timeout waiting for the first message (-1 for infinite) + # @param max_items [Integer] Maximum number of messages to return per call + # @return [Array] Array of messages (empty if none available within timeout) + # @raise [RdkafkaError] When a consumed message contains an error + # @raise [ClosedConsumerError] When called on a closed consumer + def poll_batch(timeout_ms, max_items: 100) + closed_consumer_check(__method__) + + buffer = batch_buffer(max_items) + messages = [] + + count = @native_kafka.with_inner do |_inner| + Rdkafka::Bindings.rd_kafka_consume_batch_queue( + consumer_queue, + timeout_ms, + buffer, + max_items + ) + end + + return messages if count <= 0 + + begin + count.times do |i| + ptr = buffer.get_pointer(i * FFI::Pointer.size) + next if ptr.null? + + native_message = Rdkafka::Bindings::Message.new(ptr) + + if native_message[:err] != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR + raise Rdkafka::RdkafkaError.new(native_message[:err]) + end + + messages << Rdkafka::Consumer::Message.new(native_message) + end + ensure + count.times do |i| + ptr = buffer.get_pointer(i * FFI::Pointer.size) + Rdkafka::Bindings.rd_kafka_message_destroy(ptr) unless ptr.null? + end + end + + messages + end + + # Poll for a batch of messages without releasing the GVL (Global VM Lock). + # + # This is more efficient than {#poll_batch} for non-blocking poll(0) calls, + # particularly useful in fiber scheduler contexts where GVL release/reacquire + # overhead is wasteful since we don't expect to wait. + # + # @param timeout_ms [Integer] Timeout waiting for the first message (default: 0 for non-blocking) + # @param max_items [Integer] Maximum number of messages to return per call + # @return [Array] Array of messages (empty if none available within timeout) + # @raise [RdkafkaError] When a consumed message contains an error + # @raise [ClosedConsumerError] When called on a closed consumer + def poll_batch_nb(timeout_ms = 0, max_items: 100) + closed_consumer_check(__method__) + + buffer = batch_buffer(max_items) + messages = [] + + count = @native_kafka.with_inner do |_inner| + Rdkafka::Bindings.rd_kafka_consume_batch_queue_nb( + consumer_queue, + timeout_ms, + buffer, + max_items + ) + end + + return messages if count <= 0 + + begin + count.times do |i| + ptr = buffer.get_pointer(i * FFI::Pointer.size) + next if ptr.null? + + native_message = Rdkafka::Bindings::Message.new(ptr) + + if native_message[:err] != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR + raise Rdkafka::RdkafkaError.new(native_message[:err]) + end + + messages << Rdkafka::Consumer::Message.new(native_message) + end + ensure + count.times do |i| + ptr = buffer.get_pointer(i * FFI::Pointer.size) + Rdkafka::Bindings.rd_kafka_message_destroy(ptr) unless ptr.null? + end + end + + messages + end + # Poll for new messages and yield for each received one. Iteration # will end when the consumer is closed. # @@ -853,5 +963,24 @@ def consumer_group_metadata_pointer def closed_consumer_check(method) raise Rdkafka::ClosedConsumerError.new(method) if closed? end + + # Returns the consumer queue pointer, lazily initialized + # @return [FFI::Pointer] consumer queue handle + def consumer_queue + @consumer_queue ||= @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_queue_get_consumer(inner) + end + end + + # Returns a reusable FFI buffer for batch polling, growing if needed + # @param max_items [Integer] minimum buffer capacity + # @return [FFI::MemoryPointer] pointer buffer + def batch_buffer(max_items) + if @batch_buffer.nil? || @batch_buffer_size < max_items + @batch_buffer = FFI::MemoryPointer.new(:pointer, max_items) + @batch_buffer_size = max_items + end + @batch_buffer + end end end From b93f44779ac7f4e4b8a9bf8e8e9f4997e2427611 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Wed, 6 May 2026 19:17:21 +0200 Subject: [PATCH 2/7] Simplify changelog entry and remove inline comments from bindings --- CHANGELOG.md | 2 +- lib/rdkafka/bindings.rb | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c15807c2..db755d8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,7 +1,7 @@ # Rdkafka Changelog ## 0.27.0 (Unreleased) -- [Feature] Add batch consumer polling via `rd_kafka_consume_batch_queue`. New methods `Consumer#poll_batch(timeout_ms, max_items:)` and `Consumer#poll_batch_nb(timeout_ms, max_items:)` fetch up to N messages in a single FFI call, reducing per-message FFI boundary overhead. The `_nb` variant skips GVL release for use in fiber scheduler / IO-multiplexed architectures. +- [Feature] Add `Consumer#poll_batch(timeout_ms, max_items:)` and `Consumer#poll_batch_nb(timeout_ms, max_items:)` for batch message polling via `rd_kafka_consume_batch_queue`. - [Feature] Add `Config#describe_properties` to dump all librdkafka configuration properties (including defaults and hidden properties) as a Hash via `rd_kafka_conf_dump`. - [Enhancement] Bump librdkafka to `2.14.0` - [Fix] Fix resource leak in `Admin#describe_configs` and `Admin#incremental_alter_configs` where `admin_options_ptr` and `queue_ptr` were not destroyed in the ensure block. diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index 7b358199..711735de 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -386,10 +386,8 @@ class NativeErrorDesc < FFI::Struct # More efficient for poll(0) calls in fiber schedulers. attach_function :rd_kafka_consumer_poll_nb, :rd_kafka_consumer_poll, [:pointer, :int], :pointer, blocking: false attach_function :rd_kafka_consumer_close, [:pointer], :void, blocking: true - # Batch consumer queue API attach_function :rd_kafka_queue_get_consumer, [:pointer], :pointer attach_function :rd_kafka_consume_batch_queue, [:pointer, :int, :pointer, :size_t], :ssize_t, blocking: true - # Non-blocking batch consume variant (does not release GVL) attach_function :rd_kafka_consume_batch_queue_nb, :rd_kafka_consume_batch_queue, [:pointer, :int, :pointer, :size_t], :ssize_t, blocking: false attach_function :rd_kafka_offsets_store, [:pointer, :pointer], :int, blocking: true attach_function :rd_kafka_pause_partitions, [:pointer, :pointer], :int, blocking: true From 446807b777a230382b87c4821428a92ff6ba126b Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Wed, 6 May 2026 19:25:44 +0200 Subject: [PATCH 3/7] Add tests for poll_batch and poll_batch_nb --- spec/lib/rdkafka/consumer_spec.rb | 171 ++++++++++++++++++++++++++++++ 1 file changed, 171 insertions(+) diff --git a/spec/lib/rdkafka/consumer_spec.rb b/spec/lib/rdkafka/consumer_spec.rb index 532b0ee9..9e0eb21f 100644 --- a/spec/lib/rdkafka/consumer_spec.rb +++ b/spec/lib/rdkafka/consumer_spec.rb @@ -1585,6 +1585,177 @@ def collect(name, list) end end + describe "#poll_batch" do + it "returns empty array if there is no subscription" do + expect(consumer.poll_batch(1000, max_items: 10)).to eq([]) + end + + it "returns empty array if there are no messages" do + consumer.subscribe(topic) + expect(consumer.poll_batch(1000, max_items: 10)).to eq([]) + end + + it "returns messages if there are some" do + topic = TestTopics.create + + 3.times do |i| + producer.produce( + topic: topic, + payload: "payload #{i}", + key: "key #{i}" + ).wait + end + + consumer.subscribe(topic) + + messages = [] + deadline = Time.now + 30 + while messages.size < 3 && Time.now < deadline + messages.concat(consumer.poll_batch(1000, max_items: 10)) + end + + expect(messages.size).to eq(3) + messages.each do |message| + expect(message).to be_a(Rdkafka::Consumer::Message) + end + expect(messages.map(&:payload)).to match_array(["payload 0", "payload 1", "payload 2"]) + end + + it "respects max_items" do + topic = TestTopics.create + + 10.times do |i| + producer.produce( + topic: topic, + payload: "payload #{i}", + key: "key #{i}" + ).wait + end + + consumer.subscribe(topic) + + # Wait for messages to be fetched + deadline = Time.now + 30 + first = nil + while first.nil? && Time.now < deadline + first = consumer.poll(1000) + end + expect(first).not_to be_nil + + sleep 1 + + batch = consumer.poll_batch(1000, max_items: 3) + expect(batch.size).to be <= 3 + end + + it "raises an error when a message has an error" do + message = Rdkafka::Bindings::Message.new.tap do |msg| + msg[:err] = 20 + end + message_pointer = message.to_ptr + buffer = FFI::MemoryPointer.new(:pointer, 1) + buffer.put_pointer(0, message_pointer) + + expect(Rdkafka::Bindings).to receive(:rd_kafka_consume_batch_queue).and_return(1) + allow(consumer).to receive(:batch_buffer).and_return(buffer) + allow(consumer).to receive(:consumer_queue).and_return(FFI::Pointer::NULL) + expect(Rdkafka::Bindings).to receive(:rd_kafka_message_destroy).with(message_pointer) + + consumer.subscribe(topic) + + expect { + consumer.poll_batch(100, max_items: 1) + }.to raise_error(Rdkafka::RdkafkaError) + end + + context "when consumer is closed" do + before { consumer.close } + + it "raises ClosedConsumerError" do + expect { + consumer.poll_batch(100, max_items: 10) + }.to raise_error(Rdkafka::ClosedConsumerError, /poll_batch/) + end + end + end + + describe "#poll_batch_nb" do + it "returns empty array if there is no subscription" do + expect(consumer.poll_batch_nb(0, max_items: 10)).to eq([]) + end + + it "returns empty array if there are no messages" do + consumer.subscribe(topic) + sleep 0.5 + expect(consumer.poll_batch_nb(0, max_items: 10)).to eq([]) + end + + it "defaults to timeout 0" do + consumer.subscribe(topic) + sleep 0.5 + expect(consumer.poll_batch_nb(max_items: 10)).to eq([]) + end + + it "returns messages if there are some" do + topic = TestTopics.create + + 3.times do |i| + producer.produce( + topic: topic, + payload: "nb payload #{i}", + key: "nb key #{i}" + ).wait + end + + consumer.subscribe(topic) + + # Wait for messages to arrive in the internal queue + deadline = Time.now + 30 + first = nil + while first.nil? && Time.now < deadline + first = consumer.poll(1000) + end + expect(first).not_to be_nil + + sleep 1 + + messages = consumer.poll_batch_nb(0, max_items: 10) + messages.each do |message| + expect(message).to be_a(Rdkafka::Consumer::Message) + end + end + + it "raises an error when a message has an error" do + message = Rdkafka::Bindings::Message.new.tap do |msg| + msg[:err] = 20 + end + message_pointer = message.to_ptr + buffer = FFI::MemoryPointer.new(:pointer, 1) + buffer.put_pointer(0, message_pointer) + + expect(Rdkafka::Bindings).to receive(:rd_kafka_consume_batch_queue_nb).and_return(1) + allow(consumer).to receive(:batch_buffer).and_return(buffer) + allow(consumer).to receive(:consumer_queue).and_return(FFI::Pointer::NULL) + expect(Rdkafka::Bindings).to receive(:rd_kafka_message_destroy).with(message_pointer) + + consumer.subscribe(topic) + + expect { + consumer.poll_batch_nb(0, max_items: 1) + }.to raise_error(Rdkafka::RdkafkaError) + end + + context "when consumer is closed" do + before { consumer.close } + + it "raises ClosedConsumerError" do + expect { + consumer.poll_batch_nb(0, max_items: 10) + }.to raise_error(Rdkafka::ClosedConsumerError, /poll_batch_nb/) + end + end + end + describe "file descriptor access for fiber scheduler integration" do it "enables IO events on consumer queue" do consumer.subscribe(topic) From 21584c1c7905ce22e4db0a60a1552e944a0be883 Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Wed, 6 May 2026 20:03:13 +0200 Subject: [PATCH 4/7] Fix RuboCop: use receive_messages instead of multiple allow stubs --- spec/lib/rdkafka/consumer_spec.rb | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/spec/lib/rdkafka/consumer_spec.rb b/spec/lib/rdkafka/consumer_spec.rb index 9e0eb21f..d650211e 100644 --- a/spec/lib/rdkafka/consumer_spec.rb +++ b/spec/lib/rdkafka/consumer_spec.rb @@ -1657,8 +1657,7 @@ def collect(name, list) buffer.put_pointer(0, message_pointer) expect(Rdkafka::Bindings).to receive(:rd_kafka_consume_batch_queue).and_return(1) - allow(consumer).to receive(:batch_buffer).and_return(buffer) - allow(consumer).to receive(:consumer_queue).and_return(FFI::Pointer::NULL) + allow(consumer).to receive_messages(batch_buffer: buffer, consumer_queue: FFI::Pointer::NULL) expect(Rdkafka::Bindings).to receive(:rd_kafka_message_destroy).with(message_pointer) consumer.subscribe(topic) @@ -1734,8 +1733,7 @@ def collect(name, list) buffer.put_pointer(0, message_pointer) expect(Rdkafka::Bindings).to receive(:rd_kafka_consume_batch_queue_nb).and_return(1) - allow(consumer).to receive(:batch_buffer).and_return(buffer) - allow(consumer).to receive(:consumer_queue).and_return(FFI::Pointer::NULL) + allow(consumer).to receive_messages(batch_buffer: buffer, consumer_queue: FFI::Pointer::NULL) expect(Rdkafka::Bindings).to receive(:rd_kafka_message_destroy).with(message_pointer) consumer.subscribe(topic) From f7ad89ed01e5ffae3eb0454391f4734c90a490de Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Wed, 6 May 2026 20:10:07 +0200 Subject: [PATCH 5/7] Fix RuboCop: use contain_exactly instead of match_array with array literal --- spec/lib/rdkafka/consumer_spec.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spec/lib/rdkafka/consumer_spec.rb b/spec/lib/rdkafka/consumer_spec.rb index d650211e..b671a952 100644 --- a/spec/lib/rdkafka/consumer_spec.rb +++ b/spec/lib/rdkafka/consumer_spec.rb @@ -1618,7 +1618,7 @@ def collect(name, list) messages.each do |message| expect(message).to be_a(Rdkafka::Consumer::Message) end - expect(messages.map(&:payload)).to match_array(["payload 0", "payload 1", "payload 2"]) + expect(messages.map(&:payload)).to contain_exactly("payload 0", "payload 1", "payload 2") end it "respects max_items" do From 1d3531da1526341df6ef6c9fa09b75f249beefec Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Thu, 7 May 2026 09:02:25 +0200 Subject: [PATCH 6/7] Address review feedback: fix race in close, add GVL warning to poll_batch_nb - Move rd_kafka_queue_destroy inside synchronize block in close to prevent race where another thread could use a destroyed queue between consumer_close and native_kafka.close - Add @note to poll_batch_nb docs warning that non-zero timeout_ms will block all Ruby threads since GVL is not released --- lib/rdkafka/consumer.rb | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/rdkafka/consumer.rb b/lib/rdkafka/consumer.rb index 681c5f4d..72f426a6 100644 --- a/lib/rdkafka/consumer.rb +++ b/lib/rdkafka/consumer.rb @@ -183,11 +183,11 @@ def close @native_kafka.synchronize do |inner| Rdkafka::Bindings.rd_kafka_consumer_close(inner) - end - if @consumer_queue - Rdkafka::Bindings.rd_kafka_queue_destroy(@consumer_queue) - @consumer_queue = nil + if @consumer_queue + Rdkafka::Bindings.rd_kafka_queue_destroy(@consumer_queue) + @consumer_queue = nil + end end @native_kafka.close @@ -846,6 +846,9 @@ def poll_batch(timeout_ms, max_items: 100) # particularly useful in fiber scheduler contexts where GVL release/reacquire # overhead is wasteful since we don't expect to wait. # + # @note Since the GVL is not released, a non-zero timeout_ms will block all Ruby + # threads/fibers for the duration. Use {#poll_batch} if you need a blocking wait. + # # @param timeout_ms [Integer] Timeout waiting for the first message (default: 0 for non-blocking) # @param max_items [Integer] Maximum number of messages to return per call # @return [Array] Array of messages (empty if none available within timeout) From eee4a33b437accbad9c46e123e396c0fe5ce519f Mon Sep 17 00:00:00 2001 From: Maciej Mensfeld Date: Thu, 7 May 2026 09:20:19 +0200 Subject: [PATCH 7/7] Free each native message immediately after copying to Ruby Destroy each rd_kafka_message_t right after extracting its data into a Ruby Message object instead of holding all native messages until the end. Reduces peak memory from N*(C_msg+Ruby_msg) to 1*C_msg+N*Ruby_msg. On error, the ensure block still cleans up remaining unprocessed messages. --- lib/rdkafka/consumer.rb | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/lib/rdkafka/consumer.rb b/lib/rdkafka/consumer.rb index 72f426a6..d36703c7 100644 --- a/lib/rdkafka/consumer.rb +++ b/lib/rdkafka/consumer.rb @@ -817,10 +817,15 @@ def poll_batch(timeout_ms, max_items: 100) return messages if count <= 0 + i = 0 begin - count.times do |i| + while i < count ptr = buffer.get_pointer(i * FFI::Pointer.size) - next if ptr.null? + + if ptr.null? + i += 1 + next + end native_message = Rdkafka::Bindings::Message.new(ptr) @@ -829,11 +834,14 @@ def poll_batch(timeout_ms, max_items: 100) end messages << Rdkafka::Consumer::Message.new(native_message) + Rdkafka::Bindings.rd_kafka_message_destroy(ptr) + i += 1 end ensure - count.times do |i| + while i < count ptr = buffer.get_pointer(i * FFI::Pointer.size) Rdkafka::Bindings.rd_kafka_message_destroy(ptr) unless ptr.null? + i += 1 end end @@ -871,10 +879,15 @@ def poll_batch_nb(timeout_ms = 0, max_items: 100) return messages if count <= 0 + i = 0 begin - count.times do |i| + while i < count ptr = buffer.get_pointer(i * FFI::Pointer.size) - next if ptr.null? + + if ptr.null? + i += 1 + next + end native_message = Rdkafka::Bindings::Message.new(ptr) @@ -883,11 +896,14 @@ def poll_batch_nb(timeout_ms = 0, max_items: 100) end messages << Rdkafka::Consumer::Message.new(native_message) + Rdkafka::Bindings.rd_kafka_message_destroy(ptr) + i += 1 end ensure - count.times do |i| + while i < count ptr = buffer.get_pointer(i * FFI::Pointer.size) Rdkafka::Bindings.rd_kafka_message_destroy(ptr) unless ptr.null? + i += 1 end end