Skip to content

Commit bfa71e0

Browse files
authored
Active Load: Add cleanup for active load listening directories on DataNode first startup (#16854)
* Add cleanup for active load listening directories on DataNode first startup - Add cleanupListeningDirectories() method in ActiveLoadAgent to clean up all listening directories - Call cleanup method when DataNode starts for the first time - Clean up pending, pipe, and failed directories - Silent execution with minimal logging * update * fix
1 parent d7898c4 commit bfa71e0

File tree

2 files changed

+93
-0
lines changed

2 files changed

+93
-0
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@
111111
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
112112
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
113113
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALMode;
114+
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadAgent;
114115
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
115116
import org.apache.iotdb.db.subscription.agent.SubscriptionAgent;
116117
import org.apache.iotdb.db.trigger.executor.TriggerExecutor;
@@ -245,6 +246,8 @@ protected void start() {
245246
sendRegisterRequestToConfigNode(false);
246247
saveSecretKey();
247248
saveHardwareCode();
249+
// Clean up active load listening directories on first startup
250+
ActiveLoadAgent.cleanupListeningDirectories();
248251
} else {
249252
/* Check encrypt magic string */
250253
try {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadAgent.java

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,21 @@
1919

2020
package org.apache.iotdb.db.storageengine.load.active;
2121

22+
import org.apache.iotdb.commons.utils.FileUtils;
23+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
24+
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import java.io.File;
29+
import java.util.ArrayList;
30+
import java.util.Arrays;
31+
import java.util.List;
32+
2233
public class ActiveLoadAgent {
2334

35+
private static final Logger LOGGER = LoggerFactory.getLogger(ActiveLoadAgent.class);
36+
2437
private final ActiveLoadTsFileLoader activeLoadTsFileLoader;
2538
private final ActiveLoadDirScanner activeLoadDirScanner;
2639
private final ActiveLoadMetricsCollector activeLoadMetricsCollector;
@@ -48,4 +61,81 @@ public synchronized void start() {
4861
activeLoadDirScanner.start();
4962
activeLoadMetricsCollector.start();
5063
}
64+
65+
/**
66+
* Clean up all listening directories for active load on DataNode first startup. This method will
67+
* clean up all files and subdirectories in the listening directories, including: 1. Pending
68+
* directories (configured by load_active_listening_dirs) 2. Pipe directory (for pipe data sync)
69+
* 3. Failed directory (for failed files)
70+
*
71+
* <p>This method is called during DataNode startup and must not throw any exceptions to ensure
72+
* startup can proceed normally. All exceptions are caught and logged internally.
73+
*/
74+
public static void cleanupListeningDirectories() {
75+
try {
76+
final List<String> dirsToClean = new ArrayList<>();
77+
78+
dirsToClean.addAll(
79+
Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs()));
80+
81+
// Add pipe dir
82+
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningPipeDir());
83+
84+
// Add failed dir
85+
dirsToClean.add(IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningFailDir());
86+
87+
// Clean up each directory
88+
for (final String dirPath : dirsToClean) {
89+
try {
90+
if (dirPath == null || dirPath.isEmpty()) {
91+
continue;
92+
}
93+
94+
final File dir = new File(dirPath);
95+
96+
// Check if directory exists and is a directory
97+
// These methods may throw SecurityException if access is denied
98+
try {
99+
if (!dir.exists() || !dir.isDirectory()) {
100+
continue;
101+
}
102+
} catch (Exception e) {
103+
LOGGER.debug("Failed to check directory: {}", dirPath, e);
104+
continue;
105+
}
106+
107+
// Only delete contents inside the directory, not the directory itself
108+
// listFiles() may throw SecurityException if access is denied
109+
File[] files = null;
110+
try {
111+
files = dir.listFiles();
112+
} catch (Exception e) {
113+
LOGGER.warn("Failed to list files in directory: {}", dirPath, e);
114+
continue;
115+
}
116+
117+
if (files != null) {
118+
for (final File file : files) {
119+
// FileUtils.deleteFileOrDirectory internally calls file.isDirectory() and
120+
// file.listFiles() without try-catch, so exceptions may propagate here.
121+
// We need to catch it to prevent one file failure from stopping the cleanup.
122+
try {
123+
FileUtils.deleteFileOrDirectory(file, true);
124+
} catch (Exception e) {
125+
LOGGER.debug("Failed to delete file or directory: {}", file.getAbsolutePath(), e);
126+
}
127+
}
128+
}
129+
} catch (Exception e) {
130+
LOGGER.warn("Failed to cleanup directory: {}", dirPath, e);
131+
}
132+
}
133+
134+
LOGGER.info("Cleaned up active load listening directories");
135+
} catch (Throwable t) {
136+
// Catch all exceptions and errors (including OutOfMemoryError, etc.)
137+
// to ensure startup process is not affected
138+
LOGGER.warn("Unexpected error during cleanup of active load listening directories", t);
139+
}
140+
}
51141
}

0 commit comments

Comments
 (0)