Skip to content

Commit 736f027

Browse files
authored
[ISSUE #9870] Ensure metadata provider cache executors are shutdown correctly (#9871)
* [ISSUE #9870] Ensure metadata provider cache executors are shutdown correctly * Improve test comments
1 parent 5892de6 commit 736f027

File tree

4 files changed

+112
-2
lines changed

4 files changed

+112
-2
lines changed

auth/src/main/java/org/apache/rocketmq/auth/authentication/provider/LocalAuthenticationMetadataProvider.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,16 @@ public class LocalAuthenticationMetadataProvider implements AuthenticationMetada
4646

4747
private LoadingCache<String, User> userCache;
4848

49+
protected ThreadPoolExecutor cacheRefreshExecutor;
50+
4951
@Override
5052
public void initialize(AuthConfig authConfig, Supplier<?> metadataService) {
5153
this.storage = ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator + "users", false);
5254
if (!this.storage.start()) {
5355
throw new RuntimeException("Failed to load rocksdb for auth_user, please check whether it is occupied");
5456
}
5557

56-
ThreadPoolExecutor cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
58+
this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
5759
1,
5860
1,
5961
1000 * 60,
@@ -144,6 +146,9 @@ public void shutdown() {
144146
if (this.storage != null) {
145147
this.storage.shutdown();
146148
}
149+
if (this.cacheRefreshExecutor != null) {
150+
this.cacheRefreshExecutor.shutdown();
151+
}
147152
}
148153

149154
private static class UserCacheLoader implements CacheLoader<String, User> {

auth/src/main/java/org/apache/rocketmq/auth/authorization/provider/LocalAuthorizationMetadataProvider.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,13 +51,15 @@ public class LocalAuthorizationMetadataProvider implements AuthorizationMetadata
5151

5252
private LoadingCache<String, Acl> aclCache;
5353

54+
protected ThreadPoolExecutor cacheRefreshExecutor;
55+
5456
@Override
5557
public void initialize(AuthConfig authConfig, Supplier<?> metadataService) {
5658
this.storage = ConfigRocksDBStorage.getStore(authConfig.getAuthConfigPath() + File.separator + "acls", false);
5759
if (!this.storage.start()) {
5860
throw new RuntimeException("Failed to load rocksdb for auth_acl, please check whether it is occupied.");
5961
}
60-
ThreadPoolExecutor cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
62+
this.cacheRefreshExecutor = ThreadPoolMonitor.createAndMonitor(
6163
1,
6264
1,
6365
1000 * 60,
@@ -172,6 +174,9 @@ public void shutdown() {
172174
if (this.storage != null) {
173175
this.storage.shutdown();
174176
}
177+
if (this.cacheRefreshExecutor != null) {
178+
this.cacheRefreshExecutor.shutdown();
179+
}
175180
}
176181

177182
private static class AclCacheLoader implements CacheLoader<String, Acl> {
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.auth.authentication.provider;
18+
19+
import org.apache.rocketmq.auth.config.AuthConfig;
20+
import org.apache.rocketmq.auth.helper.AuthTestHelper;
21+
import org.junit.Assert;
22+
import org.junit.Rule;
23+
import org.junit.Test;
24+
import org.junit.rules.TemporaryFolder;
25+
26+
public class LocalAuthenticationMetadataProviderTest {
27+
28+
@Rule
29+
public TemporaryFolder tempFolder = new TemporaryFolder();
30+
31+
@Test
32+
public void testShutdownReleasesCacheExecutor() throws Exception {
33+
AuthConfig authConfig = AuthTestHelper.createDefaultConfig();
34+
authConfig.setAuthConfigPath(tempFolder.newFolder("auth-test").getAbsolutePath());
35+
36+
LocalAuthenticationMetadataProvider provider = new LocalAuthenticationMetadataProvider();
37+
// Initialize provider to create the internal cache refresh executor
38+
provider.initialize(authConfig, () -> null);
39+
40+
// After initialization, the executor should exist and not be shutdown
41+
Assert.assertNotNull(provider.cacheRefreshExecutor);
42+
Assert.assertFalse(provider.cacheRefreshExecutor.isShutdown());
43+
44+
// Shutdown provider should also shutdown its executor to release resources
45+
provider.shutdown();
46+
47+
// Verify that the cache refresh executor has been shutdown
48+
Assert.assertTrue(provider.cacheRefreshExecutor.isShutdown());
49+
}
50+
}
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.rocketmq.auth.authorization.provider;
18+
19+
import org.apache.rocketmq.auth.config.AuthConfig;
20+
import org.apache.rocketmq.auth.helper.AuthTestHelper;
21+
import org.junit.Assert;
22+
import org.junit.Rule;
23+
import org.junit.Test;
24+
import org.junit.rules.TemporaryFolder;
25+
26+
public class LocalAuthorizationMetadataProviderTest {
27+
28+
@Rule
29+
public TemporaryFolder tempFolder = new TemporaryFolder();
30+
31+
@Test
32+
public void testShutdownReleasesCacheExecutor() throws Exception {
33+
AuthConfig authConfig = AuthTestHelper.createDefaultConfig();
34+
authConfig.setAuthConfigPath(tempFolder.newFolder("auth-test").getAbsolutePath());
35+
36+
LocalAuthorizationMetadataProvider provider = new LocalAuthorizationMetadataProvider();
37+
// Initialize provider to create the internal cache refresh executor
38+
provider.initialize(authConfig, () -> null);
39+
40+
// After initialization, the executor should exist and not be shutdown
41+
Assert.assertNotNull(provider.cacheRefreshExecutor);
42+
Assert.assertFalse(provider.cacheRefreshExecutor.isShutdown());
43+
44+
// Shutdown provider should also shutdown its executor to release resources
45+
provider.shutdown();
46+
47+
// Verify that the cache refresh executor has been shutdown
48+
Assert.assertTrue(provider.cacheRefreshExecutor.isShutdown());
49+
}
50+
}

0 commit comments

Comments
 (0)