Skip to content

Commit 09e9b0b

Browse files
author
fanjianye
committed
fix prepareInitPoliciesCacheAsync() in SystemTopicBasedTopicPoliciesService
1 parent 6fdb4b9 commit 09e9b0b

File tree

2 files changed

+233
-37
lines changed

2 files changed

+233
-37
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java

Lines changed: 40 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -570,42 +570,53 @@ public void addOwnedNamespaceBundleAsync(NamespaceBundle namespaceBundle) {
570570
return CompletableFuture.completedFuture(false);
571571
}
572572
return pulsarService.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace)
573-
.thenCompose(namespacePolicies -> {
574-
if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) {
575-
log.info("[{}] skip prepare init policies cache since the namespace is deleted",
576-
namespace);
577-
return CompletableFuture.completedFuture(false);
578-
}
573+
.thenCompose(namespacePolicies -> {
574+
if (namespacePolicies.isEmpty() || namespacePolicies.get().deleted) {
575+
log.info("[{}] skip prepare init policies cache since the namespace is deleted",
576+
namespace);
577+
return CompletableFuture.completedFuture(false);
578+
}
579579

580-
return policyCacheInitMap.computeIfAbsent(namespace, (k) -> {
581-
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
582-
newReader(namespace);
583-
final CompletableFuture<Void> initFuture = readerCompletableFuture
584-
.thenCompose(reader -> {
585-
final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
586-
initPolicesCache(reader, stageFuture);
587-
return stageFuture
588-
// Read policies in background
589-
.thenAccept(__ -> readMorePoliciesAsync(reader));
590-
});
591-
initFuture.exceptionallyAsync(ex -> {
580+
CompletableFuture<Void> initNamespacePolicyFuture = new CompletableFuture<>();
581+
CompletableFuture<Void> existingFuture =
582+
policyCacheInitMap.putIfAbsent(namespace, initNamespacePolicyFuture);
583+
if (existingFuture == null) {
584+
final CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
585+
newReader(namespace);
586+
readerCompletableFuture
587+
.thenCompose(reader -> {
588+
final CompletableFuture<Void> stageFuture = new CompletableFuture<>();
589+
initPolicesCache(reader, stageFuture);
590+
return stageFuture
591+
// Read policies in background
592+
.thenAccept(__ -> readMorePoliciesAsync(reader));
593+
}).thenApply(__ -> {
594+
initNamespacePolicyFuture.complete(null);
595+
return null;
596+
}).exceptionally(ex -> {
592597
try {
593-
if (closed.get()) {
594-
return null;
598+
if (readerCompletableFuture.isCompletedExceptionally()) {
599+
log.error("[{}] Failed to create reader on __change_events topic",
600+
namespace, ex);
601+
initNamespacePolicyFuture.completeExceptionally(ex);
602+
cleanPoliciesCacheInitMap(namespace, true);
603+
} else {
604+
initNamespacePolicyFuture.completeExceptionally(ex);
605+
cleanPoliciesCacheInitMap(namespace, isAlreadyClosedException(ex));
595606
}
596-
cleanPoliciesCacheInitMap(
597-
namespace, readerCompletableFuture.isCompletedExceptionally());
598607
} catch (Throwable cleanupEx) {
599608
// Adding this catch to avoid break callback chain
600609
log.error("[{}] Failed to cleanup reader on __change_events topic",
601610
namespace, cleanupEx);
602611
}
603612
return null;
604-
}, pulsarService.getExecutor());
605-
// let caller know we've got an exception.
606-
return initFuture;
607-
}).thenApply(__ -> true);
608-
});
613+
});
614+
615+
return initNamespacePolicyFuture.thenApply(__ -> true);
616+
} else {
617+
return existingFuture.thenApply(__ -> true);
618+
}
619+
});
609620
}
610621

611622
private CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> newReader(NamespaceName ns) {
@@ -614,10 +625,6 @@ private CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> newReader(Names
614625
return createSystemTopicClient(ns);
615626
}
616627

617-
if (existingFuture.isDone() && existingFuture.isCompletedExceptionally()) {
618-
return existingFuture.exceptionallyCompose(ex ->
619-
isAlreadyClosedException(ex) ? existingFuture : createSystemTopicClient(ns));
620-
}
621628
return existingFuture;
622629
});
623630
}
@@ -687,8 +694,6 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
687694
log.error("[{}] Failed to check the move events for the system topic",
688695
reader.getSystemTopic().getTopicName(), ex);
689696
future.completeExceptionally(ex);
690-
cleanPoliciesCacheInitMap(reader.getSystemTopic().getTopicName().getNamespaceObject(),
691-
isAlreadyClosedException(ex));
692697
return;
693698
}
694699
if (hasMore) {
@@ -707,8 +712,6 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
707712
log.error("[{}] Failed to read event from the system topic.",
708713
reader.getSystemTopic().getTopicName(), e);
709714
future.completeExceptionally(e);
710-
cleanPoliciesCacheInitMap(reader.getSystemTopic().getTopicName().getNamespaceObject(),
711-
isAlreadyClosedException(ex));
712715
return null;
713716
});
714717
} else {
@@ -734,8 +737,8 @@ private void initPolicesCache(SystemTopicClient.Reader<PulsarEvent> reader, Comp
734737
});
735738
}
736739

737-
738-
private void cleanPoliciesCacheInitMap(@NonNull NamespaceName namespace, boolean closeReader) {
740+
@VisibleForTesting
741+
void cleanPoliciesCacheInitMap(@NonNull NamespaceName namespace, boolean closeReader) {
739742
if (!closeReader) {
740743
policyCacheInitMap.remove(namespace);
741744
return;

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesServiceTest.java

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,17 @@
1818
*/
1919
package org.apache.pulsar.broker.service;
2020

21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.ArgumentMatchers.anyBoolean;
2123
import static org.mockito.Mockito.spy;
24+
import static org.mockito.Mockito.times;
25+
import static org.mockito.Mockito.verify;
2226
import static org.testng.AssertJUnit.assertEquals;
27+
import static org.testng.AssertJUnit.assertFalse;
2328
import static org.testng.AssertJUnit.assertNotNull;
2429
import static org.testng.AssertJUnit.assertNull;
30+
import static org.testng.AssertJUnit.assertTrue;
31+
import java.util.ArrayList;
2532
import java.util.HashSet;
2633
import java.util.List;
2734
import java.util.Map;
@@ -36,6 +43,10 @@
3643
import lombok.Cleanup;
3744
import lombok.extern.slf4j.Slf4j;
3845
import org.apache.commons.lang3.reflect.FieldUtils;
46+
import org.apache.logging.log4j.LogManager;
47+
import org.apache.logging.log4j.core.LogEvent;
48+
import org.apache.logging.log4j.core.Logger;
49+
import org.apache.logging.log4j.core.appender.AbstractAppender;
3950
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
4051
import org.apache.pulsar.broker.systopic.SystemTopicClient;
4152
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -465,4 +476,186 @@ public void testCreateNamespaceEventsSystemTopicFactoryException() throws Except
465476
Assert.assertNotNull(topicPolicies);
466477
Assert.assertEquals(topicPolicies.getMaxConsumerPerTopic(), 10);
467478
}
479+
480+
@Test
481+
public void testPrepareInitPoliciesCacheAsyncThrowExceptionAfterCreateReader() throws Exception {
482+
// catch the log output in SystemTopicBasedTopicPoliciesService
483+
Logger logger = (Logger) LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
484+
List<String> logMessages = new ArrayList<>();
485+
AbstractAppender appender = new AbstractAppender("TestAppender", null, null) {
486+
@Override
487+
public void append(LogEvent event) {
488+
logMessages.add(event.getMessage().getFormattedMessage());
489+
}
490+
};
491+
appender.start();
492+
logger.addAppender(appender);
493+
494+
// create namespace-5 and topic
495+
SystemTopicBasedTopicPoliciesService spyService =
496+
Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
497+
FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true);
498+
499+
500+
admin.namespaces().createNamespace(NAMESPACE5);
501+
final String topic = "persistent://" + NAMESPACE5 + "/test" + UUID.randomUUID();
502+
admin.topics().createPartitionedTopic(topic, 1);
503+
504+
CompletableFuture<Void> future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
505+
Assert.assertNull(future);
506+
507+
// mock readerCache and new a reader, then put this reader in readerCache.
508+
// when new reader, would trigger __change_event topic of namespace-5 created
509+
// and would trigger prepareInitPoliciesCacheAsync()
510+
ConcurrentHashMap<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
511+
spyReaderCaches = new ConcurrentHashMap<>();
512+
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture =
513+
spyService.createSystemTopicClient(NamespaceName.get(NAMESPACE5));
514+
spyReaderCaches.put(NamespaceName.get(NAMESPACE5), readerCompletableFuture);
515+
FieldUtils.writeDeclaredField(spyService, "readerCaches", spyReaderCaches, true);
516+
517+
// set topic policy. create producer for __change_event topic
518+
admin.topicPolicies().setMaxConsumersPerSubscription(topic, 1);
519+
future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
520+
Assert.assertNotNull(future);
521+
522+
// trigger close reader of __change_event directly, simulate that reader
523+
// is closed for some reason, such as topic unload or broker restart.
524+
// since prepareInitPoliciesCacheAsync() has been executed, it would go into readMorePoliciesAsync(),
525+
// throw exception, output "Closing the topic policies reader for" and do cleanPoliciesCacheInitMap()
526+
SystemTopicClient.Reader<PulsarEvent> reader = readerCompletableFuture.get();
527+
reader.close();
528+
log.info("successfully close spy reader");
529+
Awaitility.await().untilAsserted(() -> {
530+
boolean logFound = logMessages.stream()
531+
.anyMatch(msg -> msg.contains("Closing the topic policies reader for"));
532+
assertTrue(logFound);
533+
});
534+
535+
536+
// Since cleanPoliciesCacheInitMap() is executed, should add the failed reader into readerCache again.
537+
// Then in SystemTopicBasedTopicPoliciesService, readerCache has a closed reader,
538+
// and policyCacheInitMap do not contain a future.
539+
// To simulate the situation: when getTopicPolicy() execute, it will do prepareInitPoliciesCacheAsync() and
540+
// use a closed reader to read the __change_event topic. Then throw exception
541+
spyReaderCaches.put(NamespaceName.get(NAMESPACE5), readerCompletableFuture);
542+
FieldUtils.writeDeclaredField(spyService, "readerCaches", spyReaderCaches, true);
543+
544+
CompletableFuture<Boolean> prepareFuture = new CompletableFuture<>();
545+
try {
546+
prepareFuture = spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
547+
prepareFuture.get();
548+
Assert.fail();
549+
} catch (Exception e) {
550+
// that is ok
551+
}
552+
553+
// since prepareInitPoliciesCacheAsync() throw exception when initPolicesCache(),
554+
// would clean readerCache and policyCacheInitMap.
555+
// sleep 500ms to make sure clean operation finish.
556+
Thread.sleep(500);
557+
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
558+
future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
559+
Assert.assertNull(future);
560+
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 =
561+
spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
562+
Assert.assertNull(readerCompletableFuture1);
563+
564+
565+
// make sure not do cleanPoliciesCacheInitMap() twice
566+
// totally trigger prepareInitPoliciesCacheAsync() twice, so the time of cleanCacheAndCloseReader() is 2.
567+
// in previous code, the time would be 3
568+
boolean logFound = logMessages.stream()
569+
.anyMatch(msg -> msg.contains("Failed to create reader on __change_events topic"));
570+
assertFalse(logFound);
571+
boolean logFound2 = logMessages.stream()
572+
.anyMatch(msg -> msg.contains("Failed to check the move events for the system topic"));
573+
assertTrue(logFound2);
574+
verify(spyService, times(2)).cleanPoliciesCacheInitMap(any(), anyBoolean());
575+
576+
// make sure not occur Recursive update
577+
boolean logFound3 = logMessages.stream()
578+
.anyMatch(msg -> msg.contains("Recursive update"));
579+
assertFalse(logFound3);
580+
581+
// clean log appender
582+
appender.stop();
583+
logger.removeAppender(appender);
584+
}
585+
586+
@Test
587+
public void testPrepareInitPoliciesCacheAsyncThrowExceptionInCreateReader() throws Exception {
588+
// catch the log output in SystemTopicBasedTopicPoliciesService
589+
Logger logger = (Logger) LogManager.getLogger(SystemTopicBasedTopicPoliciesService.class);
590+
List<String> logMessages = new ArrayList<>();
591+
AbstractAppender appender = new AbstractAppender("TestAppender", null, null) {
592+
@Override
593+
public void append(LogEvent event) {
594+
logMessages.add(event.getMessage().getFormattedMessage());
595+
}
596+
};
597+
appender.start();
598+
logger.get().addAppender(appender, null, null);
599+
logger.addAppender(appender);
600+
601+
// create namespace-5 and topic
602+
SystemTopicBasedTopicPoliciesService spyService =
603+
Mockito.spy(new SystemTopicBasedTopicPoliciesService(pulsar));
604+
FieldUtils.writeField(pulsar, "topicPoliciesService", spyService, true);
605+
606+
607+
admin.namespaces().createNamespace(NAMESPACE5);
608+
final String topic = "persistent://" + NAMESPACE5 + "/test" + UUID.randomUUID();
609+
admin.topics().createPartitionedTopic(topic, 1);
610+
611+
CompletableFuture<Void> future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
612+
Assert.assertNull(future);
613+
614+
// mock readerCache and put a failed readerCreateFuture in readerCache.
615+
// simulate that when trigger prepareInitPoliciesCacheAsync(),
616+
// it would use this failed readerFuture and go into corresponding logic
617+
ConcurrentHashMap<NamespaceName, CompletableFuture<SystemTopicClient.Reader<PulsarEvent>>>
618+
spyReaderCaches = new ConcurrentHashMap<>();
619+
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture = new CompletableFuture<>();
620+
readerCompletableFuture.completeExceptionally(new Exception("create reader fail"));
621+
spyReaderCaches.put(NamespaceName.get(NAMESPACE5), readerCompletableFuture);
622+
FieldUtils.writeDeclaredField(spyService, "readerCaches", spyReaderCaches, true);
623+
624+
// trigger prepareInitPoliciesCacheAsync()
625+
CompletableFuture<Boolean> prepareFuture = new CompletableFuture<>();
626+
try {
627+
prepareFuture = spyService.prepareInitPoliciesCacheAsync(NamespaceName.get(NAMESPACE5));
628+
prepareFuture.get();
629+
Assert.fail();
630+
} catch (Exception e) {
631+
// that is ok
632+
}
633+
634+
// since prepareInitPoliciesCacheAsync() throw exception when createReader,
635+
// would clean readerCache and policyCacheInitMap.
636+
// sleep 500ms to make sure clean operation finish.
637+
Thread.sleep(500);
638+
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
639+
future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
640+
Assert.assertNull(future);
641+
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 =
642+
spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
643+
Assert.assertNull(readerCompletableFuture1);
644+
645+
646+
// make sure not do cleanPoliciesCacheInitMap() twice
647+
// totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanCacheAndCloseReader() is 1.
648+
boolean logFound = logMessages.stream()
649+
.anyMatch(msg -> msg.contains("Failed to create reader on __change_events topic"));
650+
assertTrue(logFound);
651+
boolean logFound2 = logMessages.stream()
652+
.anyMatch(msg -> msg.contains("Failed to check the move events for the system topic")
653+
|| msg.contains("Failed to read event from the system topic"));
654+
assertFalse(logFound2);
655+
verify(spyService, times(1)).cleanPoliciesCacheInitMap(any(), anyBoolean());
656+
657+
// clean log appender
658+
appender.stop();
659+
logger.removeAppender(appender);
660+
}
468661
}

0 commit comments

Comments
 (0)