Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions lib/sneakers/handlers/maxretry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,24 @@ def initialize(channel, queue, opts)
Sneakers.logger.debug do
"#{log_prefix} creating queue=#{retry_name} x-dead-letter-exchange=#{requeue_name}"
end
retry_args = retry_queue_arguments.merge(
:'x-dead-letter-exchange' => requeue_name,
:'x-message-ttl' => @opts[:retry_timeout] || 60000
)
@retry_queue = @channel.queue(retry_name,
:durable => queue_durable?,
:arguments => {
:'x-dead-letter-exchange' => requeue_name,
:'x-message-ttl' => @opts[:retry_timeout] || 60000
})
:arguments => retry_args)
@retry_queue.bind(@retry_exchange, :routing_key => '#')

Sneakers.logger.debug do
"#{log_prefix} creating queue=#{error_name}"
end
@error_queue = @channel.queue(error_name,
:durable => queue_durable?)
error_args = retry_queue_arguments
if error_args.empty?
@error_queue = @channel.queue(error_name, :durable => queue_durable?)
else
@error_queue = @channel.queue(error_name, :durable => queue_durable?, :arguments => error_args)
end
@error_queue.bind(@error_exchange, :routing_key => '#')

# Finally, bind the worker queue to our requeue exchange
Expand Down Expand Up @@ -218,6 +223,17 @@ def queue_durable?
def exchange_durable?
queue_durable?
end

def retry_queue_arguments
if @opts[:retry_queue_arguments]
@opts[:retry_queue_arguments].transform_keys(&:to_sym)
elsif (queue_type = @opts.dig(:queue_options, :arguments, :'x-queue-type') ||
@opts.dig(:queue_options, :arguments, 'x-queue-type'))
{ :'x-queue-type' => queue_type }
else
{}
end
end
end
end
end
114 changes: 114 additions & 0 deletions spec/sneakers/worker_handlers_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -466,4 +466,118 @@ def publish(data, opts)
end
end
end

describe 'Maxretry queue arguments inheritance' do
let(:channel) { Object.new }
let(:queue) { Object.new }

before(:each) do
Sneakers.configure(:daemonize => true, :log => 'sneakers.log')
Sneakers::Worker.configure_logger(Logger.new('/dev/null'))
Sneakers::Worker.configure_metrics
end

describe 'with x-queue-type in queue_options' do
before do
@opts = {
:exchange => 'sneakers',
:queue_options => {
:durable => 'true',
:arguments => { :'x-queue-type' => 'quorum' }
}
}

mock(queue).name { 'downloads' }

@retry_exchange = Object.new
@error_exchange = Object.new
@requeue_exchange = Object.new
@retry_queue = Object.new
@error_queue = Object.new

mock(channel).exchange('downloads-retry',
:type => 'topic',
:durable => 'true').once { @retry_exchange }
mock(channel).exchange('downloads-error',
:type => 'topic',
:durable => 'true').once { @error_exchange }
mock(channel).exchange('downloads-retry-requeue',
:type => 'topic',
:durable => 'true').once { @requeue_exchange }

mock(channel).queue('downloads-retry',
:durable => 'true',
:arguments => {
:'x-dead-letter-exchange' => 'downloads-retry-requeue',
:'x-message-ttl' => 60000,
:'x-queue-type' => 'quorum'
}).once { @retry_queue }
mock(@retry_queue).bind(@retry_exchange, :routing_key => '#')

mock(channel).queue('downloads-error',
:durable => 'true',
:arguments => { :'x-queue-type' => 'quorum' }).once { @error_queue }
mock(@error_queue).bind(@error_exchange, :routing_key => '#')

mock(queue).bind(@requeue_exchange, :routing_key => '#')
end

it 'inherits queue type for retry and error queues' do
handler = Sneakers::Handlers::Maxretry.new(channel, queue, @opts)
_(handler).wont_be_nil
end
end

describe 'with explicit retry_queue_arguments override' do
before do
@opts = {
:exchange => 'sneakers',
:queue_options => {
:durable => 'true',
:arguments => { :'x-queue-type' => 'quorum' }
},
:retry_queue_arguments => { :'x-queue-type' => 'classic' }
}

mock(queue).name { 'downloads' }

@retry_exchange = Object.new
@error_exchange = Object.new
@requeue_exchange = Object.new
@retry_queue = Object.new
@error_queue = Object.new

mock(channel).exchange('downloads-retry',
:type => 'topic',
:durable => 'true').once { @retry_exchange }
mock(channel).exchange('downloads-error',
:type => 'topic',
:durable => 'true').once { @error_exchange }
mock(channel).exchange('downloads-retry-requeue',
:type => 'topic',
:durable => 'true').once { @requeue_exchange }

mock(channel).queue('downloads-retry',
:durable => 'true',
:arguments => {
:'x-dead-letter-exchange' => 'downloads-retry-requeue',
:'x-message-ttl' => 60000,
:'x-queue-type' => 'classic'
}).once { @retry_queue }
mock(@retry_queue).bind(@retry_exchange, :routing_key => '#')

mock(channel).queue('downloads-error',
:durable => 'true',
:arguments => { :'x-queue-type' => 'classic' }).once { @error_queue }
mock(@error_queue).bind(@error_exchange, :routing_key => '#')

mock(queue).bind(@requeue_exchange, :routing_key => '#')
end

it 'uses explicit retry_queue_arguments over inherited' do
handler = Sneakers::Handlers::Maxretry.new(channel, queue, @opts)
_(handler).wont_be_nil
end
end
end
end