diff --git a/CHANGELOG.md b/CHANGELOG.md index 21f74289..db755d8f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ # Rdkafka Changelog ## 0.27.0 (Unreleased) +- [Feature] Add `Consumer#poll_batch(timeout_ms, max_items:)` and `Consumer#poll_batch_nb(timeout_ms, max_items:)` for batch message polling via `rd_kafka_consume_batch_queue`. - [Feature] Add `Config#describe_properties` to dump all librdkafka configuration properties (including defaults and hidden properties) as a Hash via `rd_kafka_conf_dump`. - [Enhancement] Bump librdkafka to `2.14.0` - [Fix] Fix resource leak in `Admin#describe_configs` and `Admin#incremental_alter_configs` where `admin_options_ptr` and `queue_ptr` were not destroyed in the ensure block. diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index 07ca412e..711735de 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -386,6 +386,9 @@ class NativeErrorDesc < FFI::Struct # More efficient for poll(0) calls in fiber schedulers. attach_function :rd_kafka_consumer_poll_nb, :rd_kafka_consumer_poll, [:pointer, :int], :pointer, blocking: false attach_function :rd_kafka_consumer_close, [:pointer], :void, blocking: true + attach_function :rd_kafka_queue_get_consumer, [:pointer], :pointer + attach_function :rd_kafka_consume_batch_queue, [:pointer, :int, :pointer, :size_t], :ssize_t, blocking: true + attach_function :rd_kafka_consume_batch_queue_nb, :rd_kafka_consume_batch_queue, [:pointer, :int, :pointer, :size_t], :ssize_t, blocking: false attach_function :rd_kafka_offsets_store, [:pointer, :pointer], :int, blocking: true attach_function :rd_kafka_pause_partitions, [:pointer, :pointer], :int, blocking: true attach_function :rd_kafka_resume_partitions, [:pointer, :pointer], :int, blocking: true diff --git a/lib/rdkafka/consumer.rb b/lib/rdkafka/consumer.rb index 6e1d882e..d36703c7 100644 --- a/lib/rdkafka/consumer.rb +++ b/lib/rdkafka/consumer.rb @@ -183,6 +183,11 @@ def close @native_kafka.synchronize do |inner| Rdkafka::Bindings.rd_kafka_consumer_close(inner) + + if @consumer_queue + Rdkafka::Bindings.rd_kafka_queue_destroy(@consumer_queue) + @consumer_queue = nil + end end @native_kafka.close @@ -781,6 +786,130 @@ def events_poll_nb(timeout_ms = 0) end end + # Poll for a batch of messages from the consumer queue in a single FFI call. + # + # This is more efficient than calling {#poll} in a loop because it crosses the FFI + # boundary only once to fetch up to `max_items` messages. + # + # The timeout controls how long to wait for the **first** message. Once any message + # is available, librdkafka fills the buffer with whatever is immediately ready and + # returns without further waiting. + # + # @param timeout_ms [Integer] Timeout waiting for the first message (-1 for infinite) + # @param max_items [Integer] Maximum number of messages to return per call + # @return [Array] Array of messages (empty if none available within timeout) + # @raise [RdkafkaError] When a consumed message contains an error + # @raise [ClosedConsumerError] When called on a closed consumer + def poll_batch(timeout_ms, max_items: 100) + closed_consumer_check(__method__) + + buffer = batch_buffer(max_items) + messages = [] + + count = @native_kafka.with_inner do |_inner| + Rdkafka::Bindings.rd_kafka_consume_batch_queue( + consumer_queue, + timeout_ms, + buffer, + max_items + ) + end + + return messages if count <= 0 + + i = 0 + begin + while i < count + ptr = buffer.get_pointer(i * FFI::Pointer.size) + + if ptr.null? + i += 1 + next + end + + native_message = Rdkafka::Bindings::Message.new(ptr) + + if native_message[:err] != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR + raise Rdkafka::RdkafkaError.new(native_message[:err]) + end + + messages << Rdkafka::Consumer::Message.new(native_message) + Rdkafka::Bindings.rd_kafka_message_destroy(ptr) + i += 1 + end + ensure + while i < count + ptr = buffer.get_pointer(i * FFI::Pointer.size) + Rdkafka::Bindings.rd_kafka_message_destroy(ptr) unless ptr.null? + i += 1 + end + end + + messages + end + + # Poll for a batch of messages without releasing the GVL (Global VM Lock). + # + # This is more efficient than {#poll_batch} for non-blocking poll(0) calls, + # particularly useful in fiber scheduler contexts where GVL release/reacquire + # overhead is wasteful since we don't expect to wait. + # + # @note Since the GVL is not released, a non-zero timeout_ms will block all Ruby + # threads/fibers for the duration. Use {#poll_batch} if you need a blocking wait. + # + # @param timeout_ms [Integer] Timeout waiting for the first message (default: 0 for non-blocking) + # @param max_items [Integer] Maximum number of messages to return per call + # @return [Array] Array of messages (empty if none available within timeout) + # @raise [RdkafkaError] When a consumed message contains an error + # @raise [ClosedConsumerError] When called on a closed consumer + def poll_batch_nb(timeout_ms = 0, max_items: 100) + closed_consumer_check(__method__) + + buffer = batch_buffer(max_items) + messages = [] + + count = @native_kafka.with_inner do |_inner| + Rdkafka::Bindings.rd_kafka_consume_batch_queue_nb( + consumer_queue, + timeout_ms, + buffer, + max_items + ) + end + + return messages if count <= 0 + + i = 0 + begin + while i < count + ptr = buffer.get_pointer(i * FFI::Pointer.size) + + if ptr.null? + i += 1 + next + end + + native_message = Rdkafka::Bindings::Message.new(ptr) + + if native_message[:err] != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR + raise Rdkafka::RdkafkaError.new(native_message[:err]) + end + + messages << Rdkafka::Consumer::Message.new(native_message) + Rdkafka::Bindings.rd_kafka_message_destroy(ptr) + i += 1 + end + ensure + while i < count + ptr = buffer.get_pointer(i * FFI::Pointer.size) + Rdkafka::Bindings.rd_kafka_message_destroy(ptr) unless ptr.null? + i += 1 + end + end + + messages + end + # Poll for new messages and yield for each received one. Iteration # will end when the consumer is closed. # @@ -853,5 +982,24 @@ def consumer_group_metadata_pointer def closed_consumer_check(method) raise Rdkafka::ClosedConsumerError.new(method) if closed? end + + # Returns the consumer queue pointer, lazily initialized + # @return [FFI::Pointer] consumer queue handle + def consumer_queue + @consumer_queue ||= @native_kafka.with_inner do |inner| + Rdkafka::Bindings.rd_kafka_queue_get_consumer(inner) + end + end + + # Returns a reusable FFI buffer for batch polling, growing if needed + # @param max_items [Integer] minimum buffer capacity + # @return [FFI::MemoryPointer] pointer buffer + def batch_buffer(max_items) + if @batch_buffer.nil? || @batch_buffer_size < max_items + @batch_buffer = FFI::MemoryPointer.new(:pointer, max_items) + @batch_buffer_size = max_items + end + @batch_buffer + end end end diff --git a/spec/lib/rdkafka/consumer_spec.rb b/spec/lib/rdkafka/consumer_spec.rb index 532b0ee9..b671a952 100644 --- a/spec/lib/rdkafka/consumer_spec.rb +++ b/spec/lib/rdkafka/consumer_spec.rb @@ -1585,6 +1585,175 @@ def collect(name, list) end end + describe "#poll_batch" do + it "returns empty array if there is no subscription" do + expect(consumer.poll_batch(1000, max_items: 10)).to eq([]) + end + + it "returns empty array if there are no messages" do + consumer.subscribe(topic) + expect(consumer.poll_batch(1000, max_items: 10)).to eq([]) + end + + it "returns messages if there are some" do + topic = TestTopics.create + + 3.times do |i| + producer.produce( + topic: topic, + payload: "payload #{i}", + key: "key #{i}" + ).wait + end + + consumer.subscribe(topic) + + messages = [] + deadline = Time.now + 30 + while messages.size < 3 && Time.now < deadline + messages.concat(consumer.poll_batch(1000, max_items: 10)) + end + + expect(messages.size).to eq(3) + messages.each do |message| + expect(message).to be_a(Rdkafka::Consumer::Message) + end + expect(messages.map(&:payload)).to contain_exactly("payload 0", "payload 1", "payload 2") + end + + it "respects max_items" do + topic = TestTopics.create + + 10.times do |i| + producer.produce( + topic: topic, + payload: "payload #{i}", + key: "key #{i}" + ).wait + end + + consumer.subscribe(topic) + + # Wait for messages to be fetched + deadline = Time.now + 30 + first = nil + while first.nil? && Time.now < deadline + first = consumer.poll(1000) + end + expect(first).not_to be_nil + + sleep 1 + + batch = consumer.poll_batch(1000, max_items: 3) + expect(batch.size).to be <= 3 + end + + it "raises an error when a message has an error" do + message = Rdkafka::Bindings::Message.new.tap do |msg| + msg[:err] = 20 + end + message_pointer = message.to_ptr + buffer = FFI::MemoryPointer.new(:pointer, 1) + buffer.put_pointer(0, message_pointer) + + expect(Rdkafka::Bindings).to receive(:rd_kafka_consume_batch_queue).and_return(1) + allow(consumer).to receive_messages(batch_buffer: buffer, consumer_queue: FFI::Pointer::NULL) + expect(Rdkafka::Bindings).to receive(:rd_kafka_message_destroy).with(message_pointer) + + consumer.subscribe(topic) + + expect { + consumer.poll_batch(100, max_items: 1) + }.to raise_error(Rdkafka::RdkafkaError) + end + + context "when consumer is closed" do + before { consumer.close } + + it "raises ClosedConsumerError" do + expect { + consumer.poll_batch(100, max_items: 10) + }.to raise_error(Rdkafka::ClosedConsumerError, /poll_batch/) + end + end + end + + describe "#poll_batch_nb" do + it "returns empty array if there is no subscription" do + expect(consumer.poll_batch_nb(0, max_items: 10)).to eq([]) + end + + it "returns empty array if there are no messages" do + consumer.subscribe(topic) + sleep 0.5 + expect(consumer.poll_batch_nb(0, max_items: 10)).to eq([]) + end + + it "defaults to timeout 0" do + consumer.subscribe(topic) + sleep 0.5 + expect(consumer.poll_batch_nb(max_items: 10)).to eq([]) + end + + it "returns messages if there are some" do + topic = TestTopics.create + + 3.times do |i| + producer.produce( + topic: topic, + payload: "nb payload #{i}", + key: "nb key #{i}" + ).wait + end + + consumer.subscribe(topic) + + # Wait for messages to arrive in the internal queue + deadline = Time.now + 30 + first = nil + while first.nil? && Time.now < deadline + first = consumer.poll(1000) + end + expect(first).not_to be_nil + + sleep 1 + + messages = consumer.poll_batch_nb(0, max_items: 10) + messages.each do |message| + expect(message).to be_a(Rdkafka::Consumer::Message) + end + end + + it "raises an error when a message has an error" do + message = Rdkafka::Bindings::Message.new.tap do |msg| + msg[:err] = 20 + end + message_pointer = message.to_ptr + buffer = FFI::MemoryPointer.new(:pointer, 1) + buffer.put_pointer(0, message_pointer) + + expect(Rdkafka::Bindings).to receive(:rd_kafka_consume_batch_queue_nb).and_return(1) + allow(consumer).to receive_messages(batch_buffer: buffer, consumer_queue: FFI::Pointer::NULL) + expect(Rdkafka::Bindings).to receive(:rd_kafka_message_destroy).with(message_pointer) + + consumer.subscribe(topic) + + expect { + consumer.poll_batch_nb(0, max_items: 1) + }.to raise_error(Rdkafka::RdkafkaError) + end + + context "when consumer is closed" do + before { consumer.close } + + it "raises ClosedConsumerError" do + expect { + consumer.poll_batch_nb(0, max_items: 10) + }.to raise_error(Rdkafka::ClosedConsumerError, /poll_batch_nb/) + end + end + end + describe "file descriptor access for fiber scheduler integration" do it "enables IO events on consumer queue" do consumer.subscribe(topic)