Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Rdkafka Changelog

## 0.27.0 (Unreleased)
- [Feature] Add `Consumer#poll_batch(timeout_ms, max_items:)` and `Consumer#poll_batch_nb(timeout_ms, max_items:)` for batch message polling via `rd_kafka_consume_batch_queue`.
- [Feature] Add `Config#describe_properties` to dump all librdkafka configuration properties (including defaults and hidden properties) as a Hash via `rd_kafka_conf_dump`.
- [Enhancement] Bump librdkafka to `2.14.0`
- [Fix] Fix resource leak in `Admin#describe_configs` and `Admin#incremental_alter_configs` where `admin_options_ptr` and `queue_ptr` were not destroyed in the ensure block.
Expand Down
3 changes: 3 additions & 0 deletions lib/rdkafka/bindings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,9 @@ class NativeErrorDesc < FFI::Struct
# More efficient for poll(0) calls in fiber schedulers.
attach_function :rd_kafka_consumer_poll_nb, :rd_kafka_consumer_poll, [:pointer, :int], :pointer, blocking: false
attach_function :rd_kafka_consumer_close, [:pointer], :void, blocking: true
attach_function :rd_kafka_queue_get_consumer, [:pointer], :pointer
attach_function :rd_kafka_consume_batch_queue, [:pointer, :int, :pointer, :size_t], :ssize_t, blocking: true
attach_function :rd_kafka_consume_batch_queue_nb, :rd_kafka_consume_batch_queue, [:pointer, :int, :pointer, :size_t], :ssize_t, blocking: false
attach_function :rd_kafka_offsets_store, [:pointer, :pointer], :int, blocking: true
attach_function :rd_kafka_pause_partitions, [:pointer, :pointer], :int, blocking: true
attach_function :rd_kafka_resume_partitions, [:pointer, :pointer], :int, blocking: true
Expand Down
148 changes: 148 additions & 0 deletions lib/rdkafka/consumer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@ def close

@native_kafka.synchronize do |inner|
Rdkafka::Bindings.rd_kafka_consumer_close(inner)

if @consumer_queue
Rdkafka::Bindings.rd_kafka_queue_destroy(@consumer_queue)
@consumer_queue = nil
end
end

@native_kafka.close
Expand Down Expand Up @@ -781,6 +786,130 @@ def events_poll_nb(timeout_ms = 0)
end
end

# Poll for a batch of messages from the consumer queue in a single FFI call.
#
# This is more efficient than calling {#poll} in a loop because it crosses the FFI
# boundary only once to fetch up to `max_items` messages.
#
# The timeout controls how long to wait for the **first** message. Once any message
# is available, librdkafka fills the buffer with whatever is immediately ready and
# returns without further waiting.
#
# @param timeout_ms [Integer] Timeout waiting for the first message (-1 for infinite)
# @param max_items [Integer] Maximum number of messages to return per call
# @return [Array<Message>] Array of messages (empty if none available within timeout)
# @raise [RdkafkaError] When a consumed message contains an error
# @raise [ClosedConsumerError] When called on a closed consumer
def poll_batch(timeout_ms, max_items: 100)
closed_consumer_check(__method__)

buffer = batch_buffer(max_items)
messages = []

count = @native_kafka.with_inner do |_inner|
Rdkafka::Bindings.rd_kafka_consume_batch_queue(
consumer_queue,
timeout_ms,
buffer,
max_items
)
end

return messages if count <= 0

i = 0
begin
while i < count
ptr = buffer.get_pointer(i * FFI::Pointer.size)

if ptr.null?
i += 1
next
end

native_message = Rdkafka::Bindings::Message.new(ptr)

if native_message[:err] != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
raise Rdkafka::RdkafkaError.new(native_message[:err])
end

messages << Rdkafka::Consumer::Message.new(native_message)
Rdkafka::Bindings.rd_kafka_message_destroy(ptr)
i += 1
end
ensure
while i < count
ptr = buffer.get_pointer(i * FFI::Pointer.size)
Rdkafka::Bindings.rd_kafka_message_destroy(ptr) unless ptr.null?
i += 1
end
end

messages
end

# Poll for a batch of messages without releasing the GVL (Global VM Lock).
#
# This is more efficient than {#poll_batch} for non-blocking poll(0) calls,
# particularly useful in fiber scheduler contexts where GVL release/reacquire
# overhead is wasteful since we don't expect to wait.
#
# @note Since the GVL is not released, a non-zero timeout_ms will block all Ruby
# threads/fibers for the duration. Use {#poll_batch} if you need a blocking wait.
#
# @param timeout_ms [Integer] Timeout waiting for the first message (default: 0 for non-blocking)
# @param max_items [Integer] Maximum number of messages to return per call
# @return [Array<Message>] Array of messages (empty if none available within timeout)
# @raise [RdkafkaError] When a consumed message contains an error
# @raise [ClosedConsumerError] When called on a closed consumer
def poll_batch_nb(timeout_ms = 0, max_items: 100)
closed_consumer_check(__method__)

Comment thread
mensfeld marked this conversation as resolved.
buffer = batch_buffer(max_items)
messages = []

count = @native_kafka.with_inner do |_inner|
Rdkafka::Bindings.rd_kafka_consume_batch_queue_nb(
consumer_queue,
timeout_ms,
buffer,
max_items
)
end

return messages if count <= 0

i = 0
begin
while i < count
ptr = buffer.get_pointer(i * FFI::Pointer.size)

if ptr.null?
i += 1
next
end

native_message = Rdkafka::Bindings::Message.new(ptr)

if native_message[:err] != Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR
raise Rdkafka::RdkafkaError.new(native_message[:err])
end

messages << Rdkafka::Consumer::Message.new(native_message)
Rdkafka::Bindings.rd_kafka_message_destroy(ptr)
i += 1
end
ensure
while i < count
ptr = buffer.get_pointer(i * FFI::Pointer.size)
Rdkafka::Bindings.rd_kafka_message_destroy(ptr) unless ptr.null?
i += 1
end
end

messages
end

# Poll for new messages and yield for each received one. Iteration
# will end when the consumer is closed.
#
Expand Down Expand Up @@ -853,5 +982,24 @@ def consumer_group_metadata_pointer
def closed_consumer_check(method)
raise Rdkafka::ClosedConsumerError.new(method) if closed?
end

# Returns the consumer queue pointer, lazily initialized
# @return [FFI::Pointer] consumer queue handle
def consumer_queue
@consumer_queue ||= @native_kafka.with_inner do |inner|
Rdkafka::Bindings.rd_kafka_queue_get_consumer(inner)
end
end
Comment thread
mensfeld marked this conversation as resolved.

# Returns a reusable FFI buffer for batch polling, growing if needed
# @param max_items [Integer] minimum buffer capacity
# @return [FFI::MemoryPointer] pointer buffer
def batch_buffer(max_items)
if @batch_buffer.nil? || @batch_buffer_size < max_items
@batch_buffer = FFI::MemoryPointer.new(:pointer, max_items)
@batch_buffer_size = max_items
end
@batch_buffer
end
end
end
169 changes: 169 additions & 0 deletions spec/lib/rdkafka/consumer_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1585,6 +1585,175 @@ def collect(name, list)
end
end

describe "#poll_batch" do
it "returns empty array if there is no subscription" do
expect(consumer.poll_batch(1000, max_items: 10)).to eq([])
end

it "returns empty array if there are no messages" do
consumer.subscribe(topic)
expect(consumer.poll_batch(1000, max_items: 10)).to eq([])
end

it "returns messages if there are some" do
topic = TestTopics.create

3.times do |i|
producer.produce(
topic: topic,
payload: "payload #{i}",
key: "key #{i}"
).wait
end

consumer.subscribe(topic)

messages = []
deadline = Time.now + 30
while messages.size < 3 && Time.now < deadline
messages.concat(consumer.poll_batch(1000, max_items: 10))
end

expect(messages.size).to eq(3)
messages.each do |message|
expect(message).to be_a(Rdkafka::Consumer::Message)
end
expect(messages.map(&:payload)).to contain_exactly("payload 0", "payload 1", "payload 2")
end

it "respects max_items" do
topic = TestTopics.create

10.times do |i|
producer.produce(
topic: topic,
payload: "payload #{i}",
key: "key #{i}"
).wait
end

consumer.subscribe(topic)

# Wait for messages to be fetched
deadline = Time.now + 30
first = nil
while first.nil? && Time.now < deadline
first = consumer.poll(1000)
end
expect(first).not_to be_nil

sleep 1

batch = consumer.poll_batch(1000, max_items: 3)
expect(batch.size).to be <= 3
end

it "raises an error when a message has an error" do
message = Rdkafka::Bindings::Message.new.tap do |msg|
msg[:err] = 20
end
message_pointer = message.to_ptr
buffer = FFI::MemoryPointer.new(:pointer, 1)
buffer.put_pointer(0, message_pointer)

expect(Rdkafka::Bindings).to receive(:rd_kafka_consume_batch_queue).and_return(1)
allow(consumer).to receive_messages(batch_buffer: buffer, consumer_queue: FFI::Pointer::NULL)
expect(Rdkafka::Bindings).to receive(:rd_kafka_message_destroy).with(message_pointer)

consumer.subscribe(topic)

expect {
consumer.poll_batch(100, max_items: 1)
}.to raise_error(Rdkafka::RdkafkaError)
end

context "when consumer is closed" do
before { consumer.close }

it "raises ClosedConsumerError" do
expect {
consumer.poll_batch(100, max_items: 10)
}.to raise_error(Rdkafka::ClosedConsumerError, /poll_batch/)
end
end
end

describe "#poll_batch_nb" do
it "returns empty array if there is no subscription" do
expect(consumer.poll_batch_nb(0, max_items: 10)).to eq([])
end

it "returns empty array if there are no messages" do
consumer.subscribe(topic)
sleep 0.5
expect(consumer.poll_batch_nb(0, max_items: 10)).to eq([])
end

it "defaults to timeout 0" do
consumer.subscribe(topic)
sleep 0.5
expect(consumer.poll_batch_nb(max_items: 10)).to eq([])
end

it "returns messages if there are some" do
topic = TestTopics.create

3.times do |i|
producer.produce(
topic: topic,
payload: "nb payload #{i}",
key: "nb key #{i}"
).wait
end

consumer.subscribe(topic)

# Wait for messages to arrive in the internal queue
deadline = Time.now + 30
first = nil
while first.nil? && Time.now < deadline
first = consumer.poll(1000)
end
expect(first).not_to be_nil

sleep 1

messages = consumer.poll_batch_nb(0, max_items: 10)
messages.each do |message|
expect(message).to be_a(Rdkafka::Consumer::Message)
end
end

it "raises an error when a message has an error" do
message = Rdkafka::Bindings::Message.new.tap do |msg|
msg[:err] = 20
end
message_pointer = message.to_ptr
buffer = FFI::MemoryPointer.new(:pointer, 1)
buffer.put_pointer(0, message_pointer)

expect(Rdkafka::Bindings).to receive(:rd_kafka_consume_batch_queue_nb).and_return(1)
allow(consumer).to receive_messages(batch_buffer: buffer, consumer_queue: FFI::Pointer::NULL)
expect(Rdkafka::Bindings).to receive(:rd_kafka_message_destroy).with(message_pointer)

consumer.subscribe(topic)

expect {
consumer.poll_batch_nb(0, max_items: 1)
}.to raise_error(Rdkafka::RdkafkaError)
end

context "when consumer is closed" do
before { consumer.close }

it "raises ClosedConsumerError" do
expect {
consumer.poll_batch_nb(0, max_items: 10)
}.to raise_error(Rdkafka::ClosedConsumerError, /poll_batch_nb/)
end
end
end

describe "file descriptor access for fiber scheduler integration" do
it "enables IO events on consumer queue" do
consumer.subscribe(topic)
Expand Down
Loading