Skip to content

Conversation

@TakaHiR07
Copy link
Contributor

@TakaHiR07 TakaHiR07 commented Nov 14, 2025

Fixes #24977

Motivation

As shown in the issue, fix two problem: 1. cleanCacheAndCloseReader() executed twice cause concurrent error, which result in too many orphan reader remain in SystemTopicBasedTopicPoliciesService 2. double update in policyCacheInitMap cause recursive update error

Modifications

  1. do cleanPoliciesCacheInitMap only once when throw exception
  2. avoid double update in policyCacheInitMap. use putIfAbsent instead of computeIfAbsent. It is not appropriate to add so many operation into compute().
  3. add two test, to simulate if throw exception in createReader, initPolicyCache, readMorePolicy of prepareInitPoliciesCacheAsync. By the way, it seems lack of unittest in SystemTopicBasedTopicPoliciesService.
  4. "newReader()" remove some logic, it is confused when readCompletableFuture throw exception.
  5. not remove cleanPoliciesCacheInitMap() in initPolicesCache() when closed.get()==true, since broker is closed, clean twice is ok.

There is one point should be consider in this pr

  1. When use putIfAbsent, if too many getTopicPolicy() trigger prepareInitPoliciesCacheAsync, it would generate many empty completableFuture. Further more, we can use double check in the code to avoid this object gc.(the code would be ugly).

Besides, this case still exist: if failed to close reader in cleanCacheAndCloseReader(), this closing reader maybe have chance to reconnect and become orphan reader. This is not this pr's work.

Verifying this change

  • Make sure that the change passes the CI checks.

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the great work @TakaHiR07

@codecov-commenter
Copy link

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 74.26%. Comparing base (6fdb4b9) to head (09e9b0b).
⚠️ Report is 1 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24980      +/-   ##
============================================
- Coverage     74.29%   74.26%   -0.03%     
+ Complexity    34026    34025       -1     
============================================
  Files          1920     1920              
  Lines        150252   150252              
  Branches      17428    17428              
============================================
- Hits         111634   111592      -42     
- Misses        29706    29749      +43     
+ Partials       8912     8911       -1     
Flag Coverage Δ
inttests 26.29% <75.00%> (-0.26%) ⬇️
systests 22.89% <67.85%> (+0.01%) ⬆️
unittests 73.79% <100.00%> (-0.03%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
.../service/SystemTopicBasedTopicPoliciesService.java 78.62% <100.00%> (+0.95%) ⬆️

... and 78 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR fixes issues in SystemTopicBasedTopicPoliciesService.prepareInitPoliciesCacheAsync related to duplicate cleanup execution and recursive update errors when exceptions occur during policy cache initialization.

Key Changes:

  • Replaced computeIfAbsent with putIfAbsent to avoid recursive update errors when modifying the map during computation
  • Consolidated exception handling to ensure cleanPoliciesCacheInitMap is called only once per exception
  • Removed redundant cleanup calls from initPolicesCache method to prevent double cleanup
  • Simplified reader creation logic in newReader by removing special exception handling that is now redundant

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.

File Description
SystemTopicBasedTopicPoliciesService.java Refactored prepareInitPoliciesCacheAsync to use putIfAbsent instead of computeIfAbsent, consolidated exception handling to prevent double cleanup, simplified newReader logic, and changed cleanPoliciesCacheInitMap visibility for testing
SystemTopicBasedTopicPoliciesServiceTest.java Added two comprehensive test cases to verify correct cleanup behavior when exceptions occur during reader creation and policy cache initialization

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 565 to 567
// make sure not do cleanPoliciesCacheInitMap() twice
// totally trigger prepareInitPoliciesCacheAsync() twice, so the time of cleanCacheAndCloseReader() is 2.
// in previous code, the time would be 3
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment on line 566 says "totally trigger prepareInitPoliciesCacheAsync() twice, so the time of cleanCacheAndCloseReader() is 2", but the code actually verifies cleanPoliciesCacheInitMap is called 2 times on line 574. The comment should refer to cleanPoliciesCacheInitMap instead of cleanCacheAndCloseReader for clarity.

Suggested change
// make sure not do cleanPoliciesCacheInitMap() twice
// totally trigger prepareInitPoliciesCacheAsync() twice, so the time of cleanCacheAndCloseReader() is 2.
// in previous code, the time would be 3
// make sure not to call cleanPoliciesCacheInitMap() more than twice
// totally trigger prepareInitPoliciesCacheAsync() twice, so the number of times cleanPoliciesCacheInitMap() is called is 2.
// in previous code, the number would be 3

Copilot uses AI. Check for mistakes.


// make sure not do cleanPoliciesCacheInitMap() twice
// totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanCacheAndCloseReader() is 1.
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment on line 647 says "totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanCacheAndCloseReader() is 1", but the code actually verifies cleanPoliciesCacheInitMap is called 1 time on line 655. The comment should refer to cleanPoliciesCacheInitMap instead of cleanCacheAndCloseReader for clarity.

Suggested change
// totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanCacheAndCloseReader() is 1.
// totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanPoliciesCacheInitMap() is 1.

Copilot uses AI. Check for mistakes.
}
};
appender.start();
logger.get().addAppender(appender, null, null);
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 598 calls logger.get() which is unnecessary. The logger variable is already of type Logger, so calling addAppender(appender) directly on line 599 is sufficient. The call on line 598 should be removed as it's redundant and Logger.get() may not be the intended API.

Suggested change
logger.get().addAppender(appender, null, null);

Copilot uses AI. Check for mistakes.
Comment on lines 555 to 562
// sleep 500ms to make sure clean operation finish.
Thread.sleep(500);
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
Assert.assertNull(future);
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 =
spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
Assert.assertNull(readerCompletableFuture1);
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test uses Thread.sleep(500) to wait for cleanup operations to finish, which makes the test non-deterministic and slower than necessary. Consider using Awaitility.await() with appropriate conditions (like checking if the future is completed exceptionally or if caches are cleared) to make the test more reliable and potentially faster.

Suggested change
// sleep 500ms to make sure clean operation finish.
Thread.sleep(500);
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
Assert.assertNull(future);
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 =
spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
Assert.assertNull(readerCompletableFuture1);
// Await until cleanup operation finishes.
Awaitility.await().untilAsserted(() -> {
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
CompletableFuture<Boolean> future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
Assert.assertNull(future);
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 =
spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
Assert.assertNull(readerCompletableFuture1);
});

Copilot uses AI. Check for mistakes.
Comment on lines 636 to 643
// sleep 500ms to make sure clean operation finish.
Thread.sleep(500);
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
Assert.assertNull(future);
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 =
spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
Assert.assertNull(readerCompletableFuture1);
Copy link

Copilot AI Nov 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test uses Thread.sleep(500) to wait for cleanup operations to finish, which makes the test non-deterministic and slower than necessary. Consider using Awaitility.await() with appropriate conditions (like checking if the future is completed exceptionally or if caches are cleared) to make the test more reliable and potentially faster.

Suggested change
// sleep 500ms to make sure clean operation finish.
Thread.sleep(500);
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
Assert.assertNull(future);
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 =
spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
Assert.assertNull(readerCompletableFuture1);
// Await cleanup operation to finish.
Awaitility.await().untilAsserted(() -> {
Assert.assertTrue(prepareFuture.isCompletedExceptionally());
CompletableFuture<Boolean> futureCheck = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5));
Assert.assertNull(futureCheck);
CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 =
spyReaderCaches.get(NamespaceName.get(NAMESPACE5));
Assert.assertNull(readerCompletableFuture1);
});

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] [broker] Concurrent error in SystemTopicBasedTopicPoliciesService#prepareInitPoliciesCacheAsync

3 participants