@@ -552,18 +552,18 @@ public void append(LogEvent event) {
552552
553553 // since prepareInitPoliciesCacheAsync() throw exception when initPolicesCache(),
554554 // would clean readerCache and policyCacheInitMap.
555- // sleep 500ms to make sure clean operation finish.
556- Thread .sleep (500 );
557555 Assert .assertTrue (prepareFuture .isCompletedExceptionally ());
558- future = spyService .getPoliciesCacheInit (NamespaceName .get (NAMESPACE5 ));
559- Assert .assertNull (future );
560- CompletableFuture <SystemTopicClient .Reader <PulsarEvent >> readerCompletableFuture1 =
561- spyReaderCaches .get (NamespaceName .get (NAMESPACE5 ));
562- Assert .assertNull (readerCompletableFuture1 );
556+ Awaitility .await ().untilAsserted (() -> {
557+ CompletableFuture <Void > future1 = spyService .getPoliciesCacheInit (NamespaceName .get (NAMESPACE5 ));
558+ Assert .assertNull (future1 );
559+ CompletableFuture <SystemTopicClient .Reader <PulsarEvent >> readerCompletableFuture1 =
560+ spyReaderCaches .get (NamespaceName .get (NAMESPACE5 ));
561+ Assert .assertNull (readerCompletableFuture1 );
562+ });
563563
564564
565565 // make sure not do cleanPoliciesCacheInitMap() twice
566- // totally trigger prepareInitPoliciesCacheAsync() twice, so the time of cleanCacheAndCloseReader () is 2.
566+ // totally trigger prepareInitPoliciesCacheAsync() twice, so the time of cleanPoliciesCacheInitMap () is 2.
567567 // in previous code, the time would be 3
568568 boolean logFound = logMessages .stream ()
569569 .anyMatch (msg -> msg .contains ("Failed to create reader on __change_events topic" ));
@@ -595,7 +595,6 @@ public void append(LogEvent event) {
595595 }
596596 };
597597 appender .start ();
598- logger .get ().addAppender (appender , null , null );
599598 logger .addAppender (appender );
600599
601600 // create namespace-5 and topic
@@ -633,18 +632,18 @@ public void append(LogEvent event) {
633632
634633 // since prepareInitPoliciesCacheAsync() throw exception when createReader,
635634 // would clean readerCache and policyCacheInitMap.
636- // sleep 500ms to make sure clean operation finish.
637- Thread .sleep (500 );
638635 Assert .assertTrue (prepareFuture .isCompletedExceptionally ());
639- future = spyService .getPoliciesCacheInit (NamespaceName .get (NAMESPACE5 ));
640- Assert .assertNull (future );
641- CompletableFuture <SystemTopicClient .Reader <PulsarEvent >> readerCompletableFuture1 =
642- spyReaderCaches .get (NamespaceName .get (NAMESPACE5 ));
643- Assert .assertNull (readerCompletableFuture1 );
636+ Awaitility .await ().untilAsserted (() -> {
637+ CompletableFuture <Void > future1 = spyService .getPoliciesCacheInit (NamespaceName .get (NAMESPACE5 ));
638+ Assert .assertNull (future1 );
639+ CompletableFuture <SystemTopicClient .Reader <PulsarEvent >> readerCompletableFuture1 =
640+ spyReaderCaches .get (NamespaceName .get (NAMESPACE5 ));
641+ Assert .assertNull (readerCompletableFuture1 );
642+ });
644643
645644
646645 // make sure not do cleanPoliciesCacheInitMap() twice
647- // totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanCacheAndCloseReader () is 1.
646+ // totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanPoliciesCacheInitMap () is 1.
648647 boolean logFound = logMessages .stream ()
649648 .anyMatch (msg -> msg .contains ("Failed to create reader on __change_events topic" ));
650649 assertTrue (logFound );
0 commit comments