|
59 | 59 | CID-{queue} as consumer group of queues. When configuring multiple queues, be sure to maintain |
60 | 60 | consistency of the `subscription relationship <https://rocketmq.apache.org/docs/domainModel/09subscription>`_. |
61 | 61 |
|
62 | | -* Queues are implicitly durable, ant they do not support features such as reply_to or |
| 62 | +* Queues are implicitly durable, and they do not support features such as reply_to or |
63 | 63 | exclusive queues. |
64 | 64 |
|
65 | 65 | Transport Options |
|
83 | 83 | with Connection('rocketmq://ak:sk@endpoint:port//') as conn: |
84 | 84 | # or |
85 | 85 | with Connection('rocketmq://ak:sk@endpoint:port//', transport_options={ |
86 | | - 'backoff_policy': [30, 60, 90, 120, 240, 300], |
87 | | - 'consumer_options': { |
88 | | - 'global_group_id': 'CID-ALL-IN-ONE', |
89 | | - 'group_format': 'CID-{}', |
90 | | - 'topic_config': { |
91 | | - 'topic_xx': { |
92 | | - 'group_id': 'CID-xx', |
93 | | - 'filter_exp': 'exp' |
94 | | - } |
95 | | - } |
96 | | - } |
97 | | - }) as conn: |
| 86 | + 'backoff_policy': [30, 60, 90, 120, 240, 300], |
| 87 | + 'consumer_options': { |
| 88 | + 'global_group_id': 'CID-ALL-IN-ONE', |
| 89 | + 'group_format': 'CID-{}', |
| 90 | + 'topic_config': { |
| 91 | + 'topic_xx': { |
| 92 | + 'group_id': 'CID-xx', |
| 93 | + 'filter_exp': 'exp' |
| 94 | + } |
| 95 | + } |
| 96 | + } |
| 97 | + }) as conn: |
98 | 98 | # The group_format and global_group_id are exclusive, and global_group_id has higher priority. |
99 | 99 | # If topic config exists, use its specified group, and others use global or formatted one. |
100 | 100 |
|
@@ -245,7 +245,7 @@ def reject(self, delivery_tag, requeue=False): |
245 | 245 | :raises Exception: if operation fails |
246 | 246 | """ |
247 | 247 | if delivery_tag not in self._not_yet_acked: |
248 | | - logger.warn(f'reject message not found. {delivery_tag}') |
| 248 | + logger.warning(f'reject message not found. {delivery_tag}') |
249 | 249 | return |
250 | 250 | if requeue: |
251 | 251 | message = self._not_yet_acked.pop(delivery_tag) |
@@ -607,9 +607,6 @@ def _get_bulk(self, queue, callback=None): |
607 | 607 | max_retries=1, interval_start=2, interval_step=0, timeout=self.await_duration * 2 + 3 |
608 | 608 | ) |
609 | 609 |
|
610 | | - if not messages: |
611 | | - pass |
612 | | - |
613 | 610 | for message in messages: |
614 | 611 | if message.topic in self._auto_ack_topics: |
615 | 612 | _ack_rocketmq_message(consumer, message, False) |
@@ -846,7 +843,7 @@ def _ack_rocketmq_message(consumer: SimpleConsumer, message: RocketmqMessage, |
846 | 843 | :raises KombuError: If acknowledgment fails and `raise_exception` is True. |
847 | 844 | """ |
848 | 845 | if not consumer or not consumer.is_running: |
849 | | - logger.warn(f'unexpected condition: consumer is None or not running. {message}') |
| 846 | + logger.warning(f'unexpected condition: consumer is None or not running. {message}') |
850 | 847 | return |
851 | 848 | try: |
852 | 849 | retry_over_time( |
|
0 commit comments