Skip to content

Fix/connection on queue work#337

Open
LucasOtaviani wants to merge 10 commits into
convenia:masterfrom
LucasOtaviani:fix/connection-on-queue-work
Open

Fix/connection on queue work#337
LucasOtaviani wants to merge 10 commits into
convenia:masterfrom
LucasOtaviani:fix/connection-on-queue-work

Conversation

@LucasOtaviani
Copy link
Copy Markdown
Contributor

@LucasOtaviani LucasOtaviani commented Feb 27, 2026

Hello, @jleonardolemos! I’ve opened this pull request to address an issue that occurs when using Pigeon with a queue worker.

The Problem

When running Pigeon consumers under Supervisor as long-lived processes, the AMQP connection can be closed unexpectedly. When this happens, the Consumer throws an unhandled AMQPConnectionClosedException and the process dies.

image

The root cause is that Consumer stores a channel reference at onstruction time and never refreshes it. Although RabbitDriver::getConnection() already has reconnection logic, the Consumer never calls it again after initialization, so it keeps using a stale channel.

Proposed Solution

  • Consumer::wait() now catches AMQPConnectionClosedException and triggers a reconnection flow instead of crashing.
  • Consumer::reconnect() retries getting a fresh channel from the driver with linear backoff. After exhausting all attempts, it throws a RuntimeException.
  • New config: pigeon.consumer.reconnect_attempts: max reconnection attempts (default: 3)
  • New config: pigeon.consumer.enable_consumer_logs: toggle consumer log output (default: true)

What are your thoughts?

Additional bug fix

When calling the basic_consume method, the consumer tag setting was being read from amqp config file, which doesn't exist anymore, so it was changed to read from the pigeon config file. It ensures the correct tag is passed to the basic_consume.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant