From ad9841dfe7d929d45411d07e9cbf9356637b4779 Mon Sep 17 00:00:00 2001 From: ruanwenjun Date: Thu, 7 May 2026 22:58:04 +0800 Subject: [PATCH 1/2] [Fix-18222][JdbcRegistry] Reuse a singleton scheduler executor in JdbcRegistryThreadFactory JdbcRegistryThreadFactory#getDefaultSchedulerThreadExecutor returned a freshly created ScheduledExecutorService on every call, so the four scheduleWithFixedDelay sites in JdbcRegistryServer#start and JdbcRegistryDataManager#start each ran on their own pool, while the shutdown/shutdownNow calls in JdbcRegistry#close and JdbcRegistryServer#close closed yet another empty pool instead of the running ones. Daemon threads kept this from leaking on JVM exit, but the logic was inconsistent and the process held 4x the configured scheduled threads. Cache the executor as a lazy singleton (double-checked locking with volatile) so all callers share one pool and the close calls land on the actually-running pool. --- .../registry/jdbc/JdbcRegistryThreadFactory.java | 16 +++++++++++++--- .../jdbc/JdbcRegistryThreadFactoryTest.java | 7 +++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactory.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactory.java index 7296955d5213..6ab0907e376a 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactory.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactory.java @@ -26,10 +26,20 @@ @NoArgsConstructor(access = lombok.AccessLevel.PRIVATE) public class JdbcRegistryThreadFactory { + private static volatile ScheduledExecutorService defaultSchedulerThreadExecutor; + public static ScheduledExecutorService getDefaultSchedulerThreadExecutor() { - final String threadNameFormat = "ds-jdbc-registry-default-scheduler-thread-%d"; - final int threadSize = Runtime.getRuntime().availableProcessors(); - return ThreadUtils.newDaemonScheduledExecutorService(threadNameFormat, threadSize); + if (defaultSchedulerThreadExecutor == null) { + synchronized (JdbcRegistryThreadFactory.class) { + if (defaultSchedulerThreadExecutor == null) { + final String threadNameFormat = "ds-jdbc-registry-default-scheduler-thread-%d"; + final int threadSize = Runtime.getRuntime().availableProcessors(); + defaultSchedulerThreadExecutor = + ThreadUtils.newDaemonScheduledExecutorService(threadNameFormat, threadSize); + } + } + } + return defaultSchedulerThreadExecutor; } } diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactoryTest.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactoryTest.java index cd5e08594993..782a86856ba7 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactoryTest.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactoryTest.java @@ -40,4 +40,11 @@ void getDefaultSchedulerThreadExecutor() { .atMost(10, TimeUnit.SECONDS) .untilAsserted(() -> assertThat(atomicInteger.get()).isEqualTo(100)); } + + @Test + void getDefaultSchedulerThreadExecutor_returnsSameInstance() { + ScheduledExecutorService first = JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor(); + ScheduledExecutorService second = JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor(); + assertThat(first).isSameInstanceAs(second); + } } From 4d7c498340e450dd1bbc1b9b462735e18f78faf3 Mon Sep 17 00:00:00 2001 From: ruanwenjun Date: Sat, 9 May 2026 22:51:51 +0800 Subject: [PATCH 2/2] [Fix-18222][JdbcRegistry] Bind scheduler executor to JdbcRegistryServer instance MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The JdbcRegistryThreadFactory class held a JVM-global singleton scheduled executor. Multiple JdbcRegistryServer instances (tests, embedded StandaloneServer) shared the same pool, blurring lifecycle ownership and forcing JdbcRegistry#close() to shutdownNow() on a pool that other instances might still be using. Move the executor onto JdbcRegistryServer as an instance field, inject it into JdbcRegistryDataManager via the constructor, and shut it down in JdbcRegistryServer#close(). The state-guard at the top of the periodic tasks short-circuits any task that observes STOPPED, so a graceful shutdown() — combined with daemon threads — is enough to stop the JVM stalling. Delete the now-unused JdbcRegistryThreadFactory class and its test; the test only covered JDK ScheduledExecutorService basics and the static singleton identity that no longer exists. --- .../plugin/registry/jdbc/JdbcRegistry.java | 1 - .../jdbc/JdbcRegistryThreadFactory.java | 45 ----------------- .../jdbc/server/JdbcRegistryDataManager.java | 12 +++-- .../jdbc/server/JdbcRegistryServer.java | 16 ++++-- .../jdbc/JdbcRegistryThreadFactoryTest.java | 50 ------------------- 5 files changed, 19 insertions(+), 105 deletions(-) delete mode 100644 dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactory.java delete mode 100644 dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactoryTest.java diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java index f9614d3c388d..016af846bd27 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java @@ -217,7 +217,6 @@ public boolean releaseLock(String key) { public void close() { log.info("Closing JdbcRegistry..."); // remove the current Ephemeral node, if can connect to jdbc - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow(); try (final JdbcRegistryClient closed1 = jdbcRegistryClient) { // ignore } catch (Exception e) { diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactory.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactory.java deleted file mode 100644 index 6ab0907e376a..000000000000 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactory.java +++ /dev/null @@ -1,45 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.plugin.registry.jdbc; - -import org.apache.dolphinscheduler.common.thread.ThreadUtils; - -import java.util.concurrent.ScheduledExecutorService; - -import lombok.NoArgsConstructor; - -@NoArgsConstructor(access = lombok.AccessLevel.PRIVATE) -public class JdbcRegistryThreadFactory { - - private static volatile ScheduledExecutorService defaultSchedulerThreadExecutor; - - public static ScheduledExecutorService getDefaultSchedulerThreadExecutor() { - if (defaultSchedulerThreadExecutor == null) { - synchronized (JdbcRegistryThreadFactory.class) { - if (defaultSchedulerThreadExecutor == null) { - final String threadNameFormat = "ds-jdbc-registry-default-scheduler-thread-%d"; - final int threadSize = Runtime.getRuntime().availableProcessors(); - defaultSchedulerThreadExecutor = - ThreadUtils.newDaemonScheduledExecutorService(threadNameFormat, threadSize); - } - } - } - return defaultSchedulerThreadExecutor; - } - -} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataManager.java index 08eb0531760b..b49ebc6b2f2a 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataManager.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataManager.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties; -import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryThreadFactory; import org.apache.dolphinscheduler.plugin.registry.jdbc.KeyUtils; import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType; import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataChangeEventDTO; @@ -36,6 +35,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -61,6 +61,8 @@ public class JdbcRegistryDataManager private final TransactionTemplate jdbcRegistryTransactionTemplate; + private final ScheduledExecutorService schedulerThreadExecutor; + private final List> registryRowChangeListeners; private long lastDetectedJdbcRegistryDataChangeEventId = -1; @@ -68,11 +70,13 @@ public class JdbcRegistryDataManager public JdbcRegistryDataManager(JdbcRegistryProperties registryProperties, JdbcRegistryDataRepository jdbcRegistryDataRepository, JdbcRegistryDataChangeEventRepository jdbcRegistryDataChangeEventRepository, - TransactionTemplate jdbcRegistryTransactionTemplate) { + TransactionTemplate jdbcRegistryTransactionTemplate, + ScheduledExecutorService schedulerThreadExecutor) { this.registryProperties = registryProperties; this.jdbcRegistryDataChangeEventRepository = jdbcRegistryDataChangeEventRepository; this.jdbcRegistryDataRepository = jdbcRegistryDataRepository; this.jdbcRegistryTransactionTemplate = jdbcRegistryTransactionTemplate; + this.schedulerThreadExecutor = schedulerThreadExecutor; this.registryRowChangeListeners = new CopyOnWriteArrayList<>(); } @@ -80,13 +84,13 @@ public JdbcRegistryDataManager(JdbcRegistryProperties registryProperties, public void start() { this.lastDetectedJdbcRegistryDataChangeEventId = jdbcRegistryDataChangeEventRepository.getMaxJdbcRegistryDataChangeEventId(); - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( + schedulerThreadExecutor.scheduleWithFixedDelay( this::detectJdbcRegistryDataChangeEvent, registryProperties.getHeartbeatRefreshInterval().toMillis(), registryProperties.getHeartbeatRefreshInterval().toMillis(), TimeUnit.MILLISECONDS); - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( + schedulerThreadExecutor.scheduleWithFixedDelay( this::purgeHistoryJdbcRegistryDataChangeEvent, 0, Duration.ofHours(keepJdbcRegistryDataChangeEventHours).toHours(), diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java index e3070bdec81b..3e8b11c81351 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java @@ -19,8 +19,8 @@ import static com.google.common.base.Preconditions.checkNotNull; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties; -import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryThreadFactory; import org.apache.dolphinscheduler.plugin.registry.jdbc.client.IJdbcRegistryClient; import org.apache.dolphinscheduler.plugin.registry.jdbc.client.JdbcRegistryClientIdentify; import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType; @@ -43,6 +43,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -78,6 +79,8 @@ public class JdbcRegistryServer implements IJdbcRegistryServer { private final Map jdbcRegistryClientDTOMap = new ConcurrentHashMap<>(); + private final ScheduledExecutorService schedulerThreadExecutor; + private Long lastSuccessHeartbeat; public JdbcRegistryServer(JdbcRegistryDataRepository jdbcRegistryDataRepository, @@ -89,9 +92,12 @@ public JdbcRegistryServer(JdbcRegistryDataRepository jdbcRegistryDataRepository, this.jdbcRegistryLockRepository = checkNotNull(jdbcRegistryLockRepository); this.jdbcRegistryClientRepository = checkNotNull(jdbcRegistryClientRepository); this.jdbcRegistryProperties = checkNotNull(jdbcRegistryProperties); + this.schedulerThreadExecutor = ThreadUtils.newDaemonScheduledExecutorService( + "ds-jdbc-registry-default-scheduler-thread-%d", + Runtime.getRuntime().availableProcessors()); this.jdbcRegistryDataManager = new JdbcRegistryDataManager( jdbcRegistryProperties, jdbcRegistryDataRepository, jdbcRegistryDataChangeEventRepository, - transactionTemplate); + transactionTemplate, schedulerThreadExecutor); this.jdbcRegistryLockManager = new JdbcRegistryLockManager( jdbcRegistryProperties, jdbcRegistryLockRepository); this.jdbcRegistryServerState = JdbcRegistryServerState.INIT; @@ -107,7 +113,7 @@ public void start() { // Start the Purge thread // The Purge thread will clear the invalidated data purgeInvalidJdbcRegistryMetadata(); - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( + schedulerThreadExecutor.scheduleWithFixedDelay( this::purgeInvalidJdbcRegistryMetadata, jdbcRegistryProperties.getSessionTimeout().toMillis(), jdbcRegistryProperties.getSessionTimeout().toMillis(), @@ -115,7 +121,7 @@ public void start() { jdbcRegistryDataManager.start(); jdbcRegistryServerState = JdbcRegistryServerState.STARTED; doTriggerOnConnectedListener(); - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( + schedulerThreadExecutor.scheduleWithFixedDelay( this::refreshClientsHeartbeat, 0, jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(), @@ -250,7 +256,7 @@ public void releaseJdbcRegistryLock(Long clientId, String lockKey) { @Override public void close() { jdbcRegistryServerState = JdbcRegistryServerState.STOPPED; - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdown(); + schedulerThreadExecutor.shutdown(); List clientIds = jdbcRegistryClients.stream() .map(IJdbcRegistryClient::getJdbcRegistryClientIdentify) .map(JdbcRegistryClientIdentify::getClientId) diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactoryTest.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactoryTest.java deleted file mode 100644 index 782a86856ba7..000000000000 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactoryTest.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.plugin.registry.jdbc; - -import static com.google.common.truth.Truth.assertThat; -import static org.awaitility.Awaitility.await; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.jupiter.api.Test; - -class JdbcRegistryThreadFactoryTest { - - @Test - void getDefaultSchedulerThreadExecutor() { - ScheduledExecutorService schedulerThreadExecutor = - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor(); - AtomicInteger atomicInteger = new AtomicInteger(0); - for (int i = 0; i < 100; i++) { - schedulerThreadExecutor.scheduleWithFixedDelay(atomicInteger::incrementAndGet, 0, 1, TimeUnit.SECONDS); - } - await() - .atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(atomicInteger.get()).isEqualTo(100)); - } - - @Test - void getDefaultSchedulerThreadExecutor_returnsSameInstance() { - ScheduledExecutorService first = JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor(); - ScheduledExecutorService second = JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor(); - assertThat(first).isSameInstanceAs(second); - } -}