From 730429fa21cd8c754937e5fc7134524d64b1ba78 Mon Sep 17 00:00:00 2001 From: Chris Jacob Date: Fri, 9 Jan 2026 16:42:20 -0600 Subject: [PATCH] add x-queue-type inheritance for retry and error queues to maxretry handler --- lib/sneakers/handlers/maxretry.rb | 28 +++++-- spec/sneakers/worker_handlers_spec.rb | 114 ++++++++++++++++++++++++++ 2 files changed, 136 insertions(+), 6 deletions(-) diff --git a/lib/sneakers/handlers/maxretry.rb b/lib/sneakers/handlers/maxretry.rb index 19b7d4ea..8183133f 100644 --- a/lib/sneakers/handlers/maxretry.rb +++ b/lib/sneakers/handlers/maxretry.rb @@ -60,19 +60,24 @@ def initialize(channel, queue, opts) Sneakers.logger.debug do "#{log_prefix} creating queue=#{retry_name} x-dead-letter-exchange=#{requeue_name}" end + retry_args = retry_queue_arguments.merge( + :'x-dead-letter-exchange' => requeue_name, + :'x-message-ttl' => @opts[:retry_timeout] || 60000 + ) @retry_queue = @channel.queue(retry_name, :durable => queue_durable?, - :arguments => { - :'x-dead-letter-exchange' => requeue_name, - :'x-message-ttl' => @opts[:retry_timeout] || 60000 - }) + :arguments => retry_args) @retry_queue.bind(@retry_exchange, :routing_key => '#') Sneakers.logger.debug do "#{log_prefix} creating queue=#{error_name}" end - @error_queue = @channel.queue(error_name, - :durable => queue_durable?) + error_args = retry_queue_arguments + if error_args.empty? + @error_queue = @channel.queue(error_name, :durable => queue_durable?) + else + @error_queue = @channel.queue(error_name, :durable => queue_durable?, :arguments => error_args) + end @error_queue.bind(@error_exchange, :routing_key => '#') # Finally, bind the worker queue to our requeue exchange @@ -218,6 +223,17 @@ def queue_durable? def exchange_durable? queue_durable? end + + def retry_queue_arguments + if @opts[:retry_queue_arguments] + @opts[:retry_queue_arguments].transform_keys(&:to_sym) + elsif (queue_type = @opts.dig(:queue_options, :arguments, :'x-queue-type') || + @opts.dig(:queue_options, :arguments, 'x-queue-type')) + { :'x-queue-type' => queue_type } + else + {} + end + end end end end diff --git a/spec/sneakers/worker_handlers_spec.rb b/spec/sneakers/worker_handlers_spec.rb index ef059238..7828f241 100644 --- a/spec/sneakers/worker_handlers_spec.rb +++ b/spec/sneakers/worker_handlers_spec.rb @@ -466,4 +466,118 @@ def publish(data, opts) end end end + + describe 'Maxretry queue arguments inheritance' do + let(:channel) { Object.new } + let(:queue) { Object.new } + + before(:each) do + Sneakers.configure(:daemonize => true, :log => 'sneakers.log') + Sneakers::Worker.configure_logger(Logger.new('/dev/null')) + Sneakers::Worker.configure_metrics + end + + describe 'with x-queue-type in queue_options' do + before do + @opts = { + :exchange => 'sneakers', + :queue_options => { + :durable => 'true', + :arguments => { :'x-queue-type' => 'quorum' } + } + } + + mock(queue).name { 'downloads' } + + @retry_exchange = Object.new + @error_exchange = Object.new + @requeue_exchange = Object.new + @retry_queue = Object.new + @error_queue = Object.new + + mock(channel).exchange('downloads-retry', + :type => 'topic', + :durable => 'true').once { @retry_exchange } + mock(channel).exchange('downloads-error', + :type => 'topic', + :durable => 'true').once { @error_exchange } + mock(channel).exchange('downloads-retry-requeue', + :type => 'topic', + :durable => 'true').once { @requeue_exchange } + + mock(channel).queue('downloads-retry', + :durable => 'true', + :arguments => { + :'x-dead-letter-exchange' => 'downloads-retry-requeue', + :'x-message-ttl' => 60000, + :'x-queue-type' => 'quorum' + }).once { @retry_queue } + mock(@retry_queue).bind(@retry_exchange, :routing_key => '#') + + mock(channel).queue('downloads-error', + :durable => 'true', + :arguments => { :'x-queue-type' => 'quorum' }).once { @error_queue } + mock(@error_queue).bind(@error_exchange, :routing_key => '#') + + mock(queue).bind(@requeue_exchange, :routing_key => '#') + end + + it 'inherits queue type for retry and error queues' do + handler = Sneakers::Handlers::Maxretry.new(channel, queue, @opts) + _(handler).wont_be_nil + end + end + + describe 'with explicit retry_queue_arguments override' do + before do + @opts = { + :exchange => 'sneakers', + :queue_options => { + :durable => 'true', + :arguments => { :'x-queue-type' => 'quorum' } + }, + :retry_queue_arguments => { :'x-queue-type' => 'classic' } + } + + mock(queue).name { 'downloads' } + + @retry_exchange = Object.new + @error_exchange = Object.new + @requeue_exchange = Object.new + @retry_queue = Object.new + @error_queue = Object.new + + mock(channel).exchange('downloads-retry', + :type => 'topic', + :durable => 'true').once { @retry_exchange } + mock(channel).exchange('downloads-error', + :type => 'topic', + :durable => 'true').once { @error_exchange } + mock(channel).exchange('downloads-retry-requeue', + :type => 'topic', + :durable => 'true').once { @requeue_exchange } + + mock(channel).queue('downloads-retry', + :durable => 'true', + :arguments => { + :'x-dead-letter-exchange' => 'downloads-retry-requeue', + :'x-message-ttl' => 60000, + :'x-queue-type' => 'classic' + }).once { @retry_queue } + mock(@retry_queue).bind(@retry_exchange, :routing_key => '#') + + mock(channel).queue('downloads-error', + :durable => 'true', + :arguments => { :'x-queue-type' => 'classic' }).once { @error_queue } + mock(@error_queue).bind(@error_exchange, :routing_key => '#') + + mock(queue).bind(@requeue_exchange, :routing_key => '#') + end + + it 'uses explicit retry_queue_arguments over inherited' do + handler = Sneakers::Handlers::Maxretry.new(channel, queue, @opts) + _(handler).wont_be_nil + end + end + end end