Skip to content

Commit 43f75df

Browse files
authored
[fix][client] Skip processing messages in the listener when the consumer has been closed (#25006)
1 parent 807dcaf commit 43f75df

File tree

4 files changed

+199
-2
lines changed

4 files changed

+199
-2
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/client/api/MultiTopicsConsumerTest.java

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,15 +34,21 @@
3434
import java.util.HashMap;
3535
import java.util.List;
3636
import java.util.Map;
37+
import java.util.Optional;
38+
import java.util.concurrent.CountDownLatch;
3739
import java.util.concurrent.ExecutionException;
3840
import java.util.concurrent.ExecutorService;
41+
import java.util.concurrent.Executors;
3942
import java.util.concurrent.ScheduledExecutorService;
4043
import java.util.concurrent.TimeUnit;
4144
import java.util.concurrent.TimeoutException;
45+
import java.util.concurrent.atomic.AtomicInteger;
4246
import java.util.concurrent.atomic.AtomicLong;
4347
import java.util.stream.Collectors;
4448
import java.util.stream.IntStream;
4549
import lombok.Cleanup;
50+
import lombok.extern.slf4j.Slf4j;
51+
import org.apache.logging.log4j.Level;
4652
import org.apache.pulsar.broker.BrokerTestUtil;
4753
import org.apache.pulsar.client.admin.PulsarAdminException;
4854
import org.apache.pulsar.client.impl.ClientBuilderImpl;
@@ -53,6 +59,7 @@
5359
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
5460
import org.apache.pulsar.common.naming.TopicName;
5561
import org.apache.pulsar.common.util.FutureUtil;
62+
import org.apache.pulsar.utils.TestLogAppender;
5663
import org.awaitility.Awaitility;
5764
import org.mockito.AdditionalAnswers;
5865
import org.mockito.Mockito;
@@ -62,6 +69,7 @@
6269
import org.testng.annotations.DataProvider;
6370
import org.testng.annotations.Test;
6471

72+
@Slf4j
6573
@Test(groups = "broker")
6674
public class MultiTopicsConsumerTest extends ProducerConsumerBase {
6775
private ScheduledExecutorService internalExecutorServiceDelegate;
@@ -431,4 +439,78 @@ public void testSubscriptionNotFound() throws PulsarAdminException, PulsarClient
431439

432440
pulsar.getConfiguration().setAllowAutoSubscriptionCreation(true);
433441
}
442+
443+
@Test(timeOut = 30000)
444+
public void testMessageListenerStopsProcessingAfterClosing() throws Exception {
445+
int numMessages = 100;
446+
String topic1 = newTopicName();
447+
String topic2 = newTopicName();
448+
final CountDownLatch consumerClosedLatch = new CountDownLatch(1);
449+
final CountDownLatch messageProcessedLatch = new CountDownLatch(1);
450+
AtomicInteger messageProcessedCount = new AtomicInteger(0);
451+
AtomicInteger messagesQueuedForExecutor = new AtomicInteger(0);
452+
AtomicInteger messagesCurrentlyInExecutor = new AtomicInteger(0);
453+
454+
@Cleanup("shutdownNow")
455+
ExecutorService executor = Executors.newSingleThreadExecutor();
456+
@Cleanup
457+
Consumer<byte[]> consumer = pulsarClient.newConsumer()
458+
.topics(List.of(topic1, topic2))
459+
.subscriptionName("my-subscriber-name")
460+
.messageListenerExecutor(new MessageListenerExecutor() {
461+
@Override
462+
public void execute(Message<?> message, Runnable runnable) {
463+
messagesQueuedForExecutor.incrementAndGet();
464+
messagesCurrentlyInExecutor.incrementAndGet();
465+
executor.execute(() -> {
466+
try {
467+
runnable.run();
468+
} finally {
469+
messagesCurrentlyInExecutor.decrementAndGet();
470+
}
471+
});
472+
}
473+
})
474+
.messageListener((c1, msg) -> {
475+
messageProcessedCount.incrementAndGet();
476+
c1.acknowledgeAsync(msg);
477+
messageProcessedLatch.countDown();
478+
try {
479+
consumerClosedLatch.await();
480+
} catch (InterruptedException e) {
481+
Thread.currentThread().interrupt();
482+
throw new RuntimeException(e);
483+
}
484+
}).subscribe();
485+
486+
@Cleanup
487+
Producer<byte[]> producer = pulsarClient.newProducer()
488+
.topic(topic1)
489+
.enableBatching(false)
490+
.create();
491+
492+
for (int i = 0; i < numMessages; i++) {
493+
final String message = "my-message-" + i;
494+
producer.send(message.getBytes());
495+
}
496+
497+
assertTrue(messageProcessedLatch.await(5, TimeUnit.SECONDS));
498+
// wait until all messages have been queued in the listener
499+
Awaitility.await().untilAsserted(() -> assertEquals(messagesQueuedForExecutor.get(), numMessages));
500+
@Cleanup
501+
TestLogAppender testLogAppender = TestLogAppender.create(Optional.empty());
502+
consumer.close();
503+
consumerClosedLatch.countDown();
504+
// only a single message should be processed
505+
assertEquals(messageProcessedCount.get(), 1);
506+
// wait until all messages have been drained from the executor
507+
Awaitility.await().untilAsserted(() -> assertEquals(messagesCurrentlyInExecutor.get(), 0));
508+
testLogAppender.getEvents().forEach(logEvent -> {
509+
if (logEvent.getLevel() == Level.ERROR) {
510+
org.apache.logging.log4j.message.Message logEventMessage = logEvent.getMessage();
511+
fail("No error should be logged when closing a consumer. Got: " + logEventMessage
512+
.getFormattedMessage() + " throwable:" + logEventMessage.getThrowable());
513+
}
514+
});
515+
}
434516
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.utils;
20+
21+
import java.util.ArrayList;
22+
import java.util.Collections;
23+
import java.util.List;
24+
import java.util.Optional;
25+
import java.util.concurrent.TimeUnit;
26+
import java.util.concurrent.atomic.AtomicInteger;
27+
import org.apache.logging.log4j.Level;
28+
import org.apache.logging.log4j.LogManager;
29+
import org.apache.logging.log4j.core.LogEvent;
30+
import org.apache.logging.log4j.core.LoggerContext;
31+
import org.apache.logging.log4j.core.appender.AbstractAppender;
32+
import org.apache.logging.log4j.core.config.Configuration;
33+
import org.apache.logging.log4j.core.config.LoggerConfig;
34+
import org.apache.logging.log4j.core.layout.PatternLayout;
35+
36+
/**
37+
* Log4J appender that captures all log events for a specified logger.
38+
*/
39+
public class TestLogAppender extends AbstractAppender implements AutoCloseable {
40+
private final List<LogEvent> events = Collections.synchronizedList(new ArrayList<>());
41+
private static AtomicInteger idGenerator = new AtomicInteger(0);
42+
private final LoggerConfig loggerConfig;
43+
private final Runnable onConfigurationChange;
44+
45+
/**
46+
* Create a new TestLogAppender. Use the {@link #close()} method to stop it and unregister it from Log4J.
47+
* @param loggerName the logger name to register to. Pass Optional.empty() to register to the root logger.
48+
* @return return the new TestLogAppender instance.
49+
*/
50+
public static TestLogAppender create(Optional<String> loggerName) {
51+
LoggerContext context = (LoggerContext) LogManager.getContext(false);
52+
Configuration config = context.getConfiguration();
53+
LoggerConfig loggerConfig = loggerName.map(config::getLoggerConfig).orElseGet(config::getRootLogger);
54+
TestLogAppender testAppender = new TestLogAppender(loggerConfig, context::updateLoggers);
55+
testAppender.start();
56+
loggerConfig.addAppender(testAppender, Level.ALL, null);
57+
context.updateLoggers();
58+
return testAppender;
59+
}
60+
61+
TestLogAppender(LoggerConfig loggerConfig, Runnable onConfigurationChange) {
62+
super("TestAppender" + idGenerator.incrementAndGet(), null, PatternLayout.createDefaultLayout(), false, null);
63+
this.loggerConfig = loggerConfig;
64+
this.onConfigurationChange = onConfigurationChange;
65+
}
66+
67+
@Override
68+
public void append(LogEvent event) {
69+
events.add(event.toImmutable());
70+
}
71+
72+
public List<LogEvent> getEvents() {
73+
return new ArrayList<>(events);
74+
}
75+
76+
public void clearEvents() {
77+
events.clear();
78+
}
79+
80+
@Override
81+
public void close() throws Exception {
82+
stop(1, TimeUnit.SECONDS);
83+
}
84+
85+
@Override
86+
protected boolean stop(long timeout, TimeUnit timeUnit, boolean changeLifeCycleState) {
87+
boolean stopped = super.stop(timeout, timeUnit, changeLifeCycleState);
88+
loggerConfig.removeAppender(getName());
89+
onConfigurationChange.run();
90+
return stopped;
91+
}
92+
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T
9090
protected final MessageListenerExecutor messageListenerExecutor;
9191
protected final ExecutorService externalPinnedExecutor;
9292
protected final ExecutorService internalPinnedExecutor;
93-
protected UnAckedMessageTracker unAckedMessageTracker;
93+
protected final UnAckedMessageTracker unAckedMessageTracker;
9494
final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
9595
protected Map<MessageIdAdv, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap = new ConcurrentHashMap<>();
9696
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
@@ -1176,12 +1176,36 @@ private void executeKeySharedMessageListener(Message<?> message, Runnable runnab
11761176

11771177
protected void callMessageListener(Message<T> msg) {
11781178
try {
1179+
State state = getState();
1180+
if (state == State.Closing || state == State.Closed) {
1181+
if (log.isDebugEnabled()) {
1182+
log.debug("[{}][{}] Consumer has been closed. Skipping message {}.", topic, subscription,
1183+
msg.getMessageId());
1184+
}
1185+
msg.release();
1186+
return;
1187+
}
1188+
11791189
if (log.isDebugEnabled()) {
11801190
log.debug("[{}][{}] Calling message listener for message {}", topic, subscription,
11811191
msg.getMessageId());
11821192
}
11831193
ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl)
11841194
? ((TopicMessageImpl<T>) msg).receivedByconsumer : (ConsumerImpl) this;
1195+
1196+
// check the internal consumer state
1197+
if (receivedConsumer != this) {
1198+
State receivedByConsumerState = receivedConsumer.getState();
1199+
if (receivedByConsumerState == State.Closing || receivedByConsumerState == State.Closed) {
1200+
if (log.isDebugEnabled()) {
1201+
log.debug("[{}][{}] Consumer that received the message has been closed. Skipping message {}.",
1202+
topic, subscription, msg.getMessageId());
1203+
}
1204+
msg.release();
1205+
return;
1206+
}
1207+
}
1208+
11851209
// Increase the permits here since we will not increase permits while receive messages from consumer
11861210
// after enabled message listener.
11871211
receivedConsumer.increaseAvailablePermits((MessageImpl<?>) (msg instanceof TopicMessageImpl

pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,6 @@ public CompletableFuture<Void> closeAsync() {
683683
private void cleanupMultiConsumer() {
684684
if (unAckedMessageTracker != null) {
685685
unAckedMessageTracker.close();
686-
unAckedMessageTracker = null;
687686
}
688687
if (partitionsAutoUpdateTimeout != null) {
689688
partitionsAutoUpdateTimeout.cancel();

0 commit comments

Comments
 (0)