-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][broker] fix prepareInitPoliciesCacheAsync in SystemTopicBasedTopicPoliciesService #24980
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[fix][broker] fix prepareInitPoliciesCacheAsync in SystemTopicBasedTopicPoliciesService #24980
Conversation
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the great work @TakaHiR07
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #24980 +/- ##
============================================
- Coverage 74.29% 74.26% -0.03%
+ Complexity 34026 34025 -1
============================================
Files 1920 1920
Lines 150252 150252
Branches 17428 17428
============================================
- Hits 111634 111592 -42
- Misses 29706 29749 +43
+ Partials 8912 8911 -1
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR fixes issues in SystemTopicBasedTopicPoliciesService.prepareInitPoliciesCacheAsync related to duplicate cleanup execution and recursive update errors when exceptions occur during policy cache initialization.
Key Changes:
- Replaced
computeIfAbsentwithputIfAbsentto avoid recursive update errors when modifying the map during computation - Consolidated exception handling to ensure
cleanPoliciesCacheInitMapis called only once per exception - Removed redundant cleanup calls from
initPolicesCachemethod to prevent double cleanup - Simplified reader creation logic in
newReaderby removing special exception handling that is now redundant
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 5 comments.
| File | Description |
|---|---|
| SystemTopicBasedTopicPoliciesService.java | Refactored prepareInitPoliciesCacheAsync to use putIfAbsent instead of computeIfAbsent, consolidated exception handling to prevent double cleanup, simplified newReader logic, and changed cleanPoliciesCacheInitMap visibility for testing |
| SystemTopicBasedTopicPoliciesServiceTest.java | Added two comprehensive test cases to verify correct cleanup behavior when exceptions occur during reader creation and policy cache initialization |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // make sure not do cleanPoliciesCacheInitMap() twice | ||
| // totally trigger prepareInitPoliciesCacheAsync() twice, so the time of cleanCacheAndCloseReader() is 2. | ||
| // in previous code, the time would be 3 |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment on line 566 says "totally trigger prepareInitPoliciesCacheAsync() twice, so the time of cleanCacheAndCloseReader() is 2", but the code actually verifies cleanPoliciesCacheInitMap is called 2 times on line 574. The comment should refer to cleanPoliciesCacheInitMap instead of cleanCacheAndCloseReader for clarity.
| // make sure not do cleanPoliciesCacheInitMap() twice | |
| // totally trigger prepareInitPoliciesCacheAsync() twice, so the time of cleanCacheAndCloseReader() is 2. | |
| // in previous code, the time would be 3 | |
| // make sure not to call cleanPoliciesCacheInitMap() more than twice | |
| // totally trigger prepareInitPoliciesCacheAsync() twice, so the number of times cleanPoliciesCacheInitMap() is called is 2. | |
| // in previous code, the number would be 3 |
|
|
||
|
|
||
| // make sure not do cleanPoliciesCacheInitMap() twice | ||
| // totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanCacheAndCloseReader() is 1. |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment on line 647 says "totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanCacheAndCloseReader() is 1", but the code actually verifies cleanPoliciesCacheInitMap is called 1 time on line 655. The comment should refer to cleanPoliciesCacheInitMap instead of cleanCacheAndCloseReader for clarity.
| // totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanCacheAndCloseReader() is 1. | |
| // totally trigger prepareInitPoliciesCacheAsync() once, so the time of cleanPoliciesCacheInitMap() is 1. |
| } | ||
| }; | ||
| appender.start(); | ||
| logger.get().addAppender(appender, null, null); |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line 598 calls logger.get() which is unnecessary. The logger variable is already of type Logger, so calling addAppender(appender) directly on line 599 is sufficient. The call on line 598 should be removed as it's redundant and Logger.get() may not be the intended API.
| logger.get().addAppender(appender, null, null); |
| // sleep 500ms to make sure clean operation finish. | ||
| Thread.sleep(500); | ||
| Assert.assertTrue(prepareFuture.isCompletedExceptionally()); | ||
| future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)); | ||
| Assert.assertNull(future); | ||
| CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 = | ||
| spyReaderCaches.get(NamespaceName.get(NAMESPACE5)); | ||
| Assert.assertNull(readerCompletableFuture1); |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test uses Thread.sleep(500) to wait for cleanup operations to finish, which makes the test non-deterministic and slower than necessary. Consider using Awaitility.await() with appropriate conditions (like checking if the future is completed exceptionally or if caches are cleared) to make the test more reliable and potentially faster.
| // sleep 500ms to make sure clean operation finish. | |
| Thread.sleep(500); | |
| Assert.assertTrue(prepareFuture.isCompletedExceptionally()); | |
| future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)); | |
| Assert.assertNull(future); | |
| CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 = | |
| spyReaderCaches.get(NamespaceName.get(NAMESPACE5)); | |
| Assert.assertNull(readerCompletableFuture1); | |
| // Await until cleanup operation finishes. | |
| Awaitility.await().untilAsserted(() -> { | |
| Assert.assertTrue(prepareFuture.isCompletedExceptionally()); | |
| CompletableFuture<Boolean> future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)); | |
| Assert.assertNull(future); | |
| CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 = | |
| spyReaderCaches.get(NamespaceName.get(NAMESPACE5)); | |
| Assert.assertNull(readerCompletableFuture1); | |
| }); |
| // sleep 500ms to make sure clean operation finish. | ||
| Thread.sleep(500); | ||
| Assert.assertTrue(prepareFuture.isCompletedExceptionally()); | ||
| future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)); | ||
| Assert.assertNull(future); | ||
| CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 = | ||
| spyReaderCaches.get(NamespaceName.get(NAMESPACE5)); | ||
| Assert.assertNull(readerCompletableFuture1); |
Copilot
AI
Nov 14, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The test uses Thread.sleep(500) to wait for cleanup operations to finish, which makes the test non-deterministic and slower than necessary. Consider using Awaitility.await() with appropriate conditions (like checking if the future is completed exceptionally or if caches are cleared) to make the test more reliable and potentially faster.
| // sleep 500ms to make sure clean operation finish. | |
| Thread.sleep(500); | |
| Assert.assertTrue(prepareFuture.isCompletedExceptionally()); | |
| future = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)); | |
| Assert.assertNull(future); | |
| CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 = | |
| spyReaderCaches.get(NamespaceName.get(NAMESPACE5)); | |
| Assert.assertNull(readerCompletableFuture1); | |
| // Await cleanup operation to finish. | |
| Awaitility.await().untilAsserted(() -> { | |
| Assert.assertTrue(prepareFuture.isCompletedExceptionally()); | |
| CompletableFuture<Boolean> futureCheck = spyService.getPoliciesCacheInit(NamespaceName.get(NAMESPACE5)); | |
| Assert.assertNull(futureCheck); | |
| CompletableFuture<SystemTopicClient.Reader<PulsarEvent>> readerCompletableFuture1 = | |
| spyReaderCaches.get(NamespaceName.get(NAMESPACE5)); | |
| Assert.assertNull(readerCompletableFuture1); | |
| }); |
Fixes #24977
Motivation
As shown in the issue, fix two problem: 1. cleanCacheAndCloseReader() executed twice cause concurrent error, which result in too many orphan reader remain in SystemTopicBasedTopicPoliciesService 2. double update in policyCacheInitMap cause recursive update error
Modifications
There is one point should be consider in this pr
Besides, this case still exist: if failed to close reader in cleanCacheAndCloseReader(), this closing reader maybe have chance to reconnect and become orphan reader. This is not this pr's work.
Verifying this change
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-complete