diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java index f9614d3c388d..016af846bd27 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistry.java @@ -217,7 +217,6 @@ public boolean releaseLock(String key) { public void close() { log.info("Closing JdbcRegistry..."); // remove the current Ephemeral node, if can connect to jdbc - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdownNow(); try (final JdbcRegistryClient closed1 = jdbcRegistryClient) { // ignore } catch (Exception e) { diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactory.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactory.java deleted file mode 100644 index 7296955d5213..000000000000 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.plugin.registry.jdbc; - -import org.apache.dolphinscheduler.common.thread.ThreadUtils; - -import java.util.concurrent.ScheduledExecutorService; - -import lombok.NoArgsConstructor; - -@NoArgsConstructor(access = lombok.AccessLevel.PRIVATE) -public class JdbcRegistryThreadFactory { - - public static ScheduledExecutorService getDefaultSchedulerThreadExecutor() { - final String threadNameFormat = "ds-jdbc-registry-default-scheduler-thread-%d"; - final int threadSize = Runtime.getRuntime().availableProcessors(); - return ThreadUtils.newDaemonScheduledExecutorService(threadNameFormat, threadSize); - } - -} diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataManager.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataManager.java index 08eb0531760b..b49ebc6b2f2a 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataManager.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryDataManager.java @@ -20,7 +20,6 @@ import static com.google.common.base.Preconditions.checkNotNull; import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties; -import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryThreadFactory; import org.apache.dolphinscheduler.plugin.registry.jdbc.KeyUtils; import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType; import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.JdbcRegistryDataChangeEventDTO; @@ -36,6 +35,7 @@ import java.util.List; import java.util.Optional; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -61,6 +61,8 @@ public class JdbcRegistryDataManager private final TransactionTemplate jdbcRegistryTransactionTemplate; + private final ScheduledExecutorService schedulerThreadExecutor; + private final List> registryRowChangeListeners; private long lastDetectedJdbcRegistryDataChangeEventId = -1; @@ -68,11 +70,13 @@ public class JdbcRegistryDataManager public JdbcRegistryDataManager(JdbcRegistryProperties registryProperties, JdbcRegistryDataRepository jdbcRegistryDataRepository, JdbcRegistryDataChangeEventRepository jdbcRegistryDataChangeEventRepository, - TransactionTemplate jdbcRegistryTransactionTemplate) { + TransactionTemplate jdbcRegistryTransactionTemplate, + ScheduledExecutorService schedulerThreadExecutor) { this.registryProperties = registryProperties; this.jdbcRegistryDataChangeEventRepository = jdbcRegistryDataChangeEventRepository; this.jdbcRegistryDataRepository = jdbcRegistryDataRepository; this.jdbcRegistryTransactionTemplate = jdbcRegistryTransactionTemplate; + this.schedulerThreadExecutor = schedulerThreadExecutor; this.registryRowChangeListeners = new CopyOnWriteArrayList<>(); } @@ -80,13 +84,13 @@ public JdbcRegistryDataManager(JdbcRegistryProperties registryProperties, public void start() { this.lastDetectedJdbcRegistryDataChangeEventId = jdbcRegistryDataChangeEventRepository.getMaxJdbcRegistryDataChangeEventId(); - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( + schedulerThreadExecutor.scheduleWithFixedDelay( this::detectJdbcRegistryDataChangeEvent, registryProperties.getHeartbeatRefreshInterval().toMillis(), registryProperties.getHeartbeatRefreshInterval().toMillis(), TimeUnit.MILLISECONDS); - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( + schedulerThreadExecutor.scheduleWithFixedDelay( this::purgeHistoryJdbcRegistryDataChangeEvent, 0, Duration.ofHours(keepJdbcRegistryDataChangeEventHours).toHours(), diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java index e3070bdec81b..3e8b11c81351 100644 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java +++ b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryServer.java @@ -19,8 +19,8 @@ import static com.google.common.base.Preconditions.checkNotNull; +import org.apache.dolphinscheduler.common.thread.ThreadUtils; import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryProperties; -import org.apache.dolphinscheduler.plugin.registry.jdbc.JdbcRegistryThreadFactory; import org.apache.dolphinscheduler.plugin.registry.jdbc.client.IJdbcRegistryClient; import org.apache.dolphinscheduler.plugin.registry.jdbc.client.JdbcRegistryClientIdentify; import org.apache.dolphinscheduler.plugin.registry.jdbc.model.DTO.DataType; @@ -43,6 +43,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; @@ -78,6 +79,8 @@ public class JdbcRegistryServer implements IJdbcRegistryServer { private final Map jdbcRegistryClientDTOMap = new ConcurrentHashMap<>(); + private final ScheduledExecutorService schedulerThreadExecutor; + private Long lastSuccessHeartbeat; public JdbcRegistryServer(JdbcRegistryDataRepository jdbcRegistryDataRepository, @@ -89,9 +92,12 @@ public JdbcRegistryServer(JdbcRegistryDataRepository jdbcRegistryDataRepository, this.jdbcRegistryLockRepository = checkNotNull(jdbcRegistryLockRepository); this.jdbcRegistryClientRepository = checkNotNull(jdbcRegistryClientRepository); this.jdbcRegistryProperties = checkNotNull(jdbcRegistryProperties); + this.schedulerThreadExecutor = ThreadUtils.newDaemonScheduledExecutorService( + "ds-jdbc-registry-default-scheduler-thread-%d", + Runtime.getRuntime().availableProcessors()); this.jdbcRegistryDataManager = new JdbcRegistryDataManager( jdbcRegistryProperties, jdbcRegistryDataRepository, jdbcRegistryDataChangeEventRepository, - transactionTemplate); + transactionTemplate, schedulerThreadExecutor); this.jdbcRegistryLockManager = new JdbcRegistryLockManager( jdbcRegistryProperties, jdbcRegistryLockRepository); this.jdbcRegistryServerState = JdbcRegistryServerState.INIT; @@ -107,7 +113,7 @@ public void start() { // Start the Purge thread // The Purge thread will clear the invalidated data purgeInvalidJdbcRegistryMetadata(); - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( + schedulerThreadExecutor.scheduleWithFixedDelay( this::purgeInvalidJdbcRegistryMetadata, jdbcRegistryProperties.getSessionTimeout().toMillis(), jdbcRegistryProperties.getSessionTimeout().toMillis(), @@ -115,7 +121,7 @@ public void start() { jdbcRegistryDataManager.start(); jdbcRegistryServerState = JdbcRegistryServerState.STARTED; doTriggerOnConnectedListener(); - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().scheduleWithFixedDelay( + schedulerThreadExecutor.scheduleWithFixedDelay( this::refreshClientsHeartbeat, 0, jdbcRegistryProperties.getHeartbeatRefreshInterval().toMillis(), @@ -250,7 +256,7 @@ public void releaseJdbcRegistryLock(Long clientId, String lockKey) { @Override public void close() { jdbcRegistryServerState = JdbcRegistryServerState.STOPPED; - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor().shutdown(); + schedulerThreadExecutor.shutdown(); List clientIds = jdbcRegistryClients.stream() .map(IJdbcRegistryClient::getJdbcRegistryClientIdentify) .map(JdbcRegistryClientIdentify::getClientId) diff --git a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactoryTest.java b/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactoryTest.java deleted file mode 100644 index cd5e08594993..000000000000 --- a/dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/test/java/org/apache/dolphinscheduler/plugin/registry/jdbc/JdbcRegistryThreadFactoryTest.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.dolphinscheduler.plugin.registry.jdbc; - -import static com.google.common.truth.Truth.assertThat; -import static org.awaitility.Awaitility.await; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.jupiter.api.Test; - -class JdbcRegistryThreadFactoryTest { - - @Test - void getDefaultSchedulerThreadExecutor() { - ScheduledExecutorService schedulerThreadExecutor = - JdbcRegistryThreadFactory.getDefaultSchedulerThreadExecutor(); - AtomicInteger atomicInteger = new AtomicInteger(0); - for (int i = 0; i < 100; i++) { - schedulerThreadExecutor.scheduleWithFixedDelay(atomicInteger::incrementAndGet, 0, 1, TimeUnit.SECONDS); - } - await() - .atMost(10, TimeUnit.SECONDS) - .untilAsserted(() -> assertThat(atomicInteger.get()).isEqualTo(100)); - } -}