resourceMap = getResourceMap(pipeName);
+ return resourceMap.size();
+ }
+
+ /////////////////////////////// Static Utility Methods ///////////////////////////////
+
+ /**
+ * Get Object file hard link path in pipe directory
+ *
+ * This method mimics the implementation of {@link
+ * org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager#getHardlinkOrCopiedFileInPipeDir}
+ *
+ * @param objectFile original Object file (relative to TSFile directory)
+ * @param tsFileResource TSFile resource, used to determine which TSFile the Object file belongs
+ * to
+ * @param pipeName pipe name, if null means it's assigner's public resource
+ * @return Object file hard link path in pipe directory
+ * @throws IOException when path cannot be determined
+ */
+ public static File getObjectFileHardlinkInPipeDir(
+ final File objectFile, final TsFileResource tsFileResource, final @Nullable String pipeName)
+ throws IOException {
+ try {
+ final File tsFile = tsFileResource.getTsFile();
+ if (tsFile == null) {
+ throw new IOException("TSFile is null in TsFileResource");
+ }
+
+ // Get Object file relative path (relative to TSFile directory)
+ final String relativePath = getObjectFileRelativePath(objectFile, tsFile);
+
+ // Get Object file directory
+ final File objectFileDir = getPipeObjectFileDir(tsFile, pipeName);
+
+ // Return hard link path
+ return new File(objectFileDir, relativePath);
+ } catch (final Exception e) {
+ throw new IOException(
+ String.format(
+ "failed to get object file hardlink in pipe dir " + "for object file %s, tsfile: %s",
+ objectFile.getPath(),
+ tsFileResource.getTsFile() != null ? tsFileResource.getTsFile().getPath() : "null"),
+ e);
+ }
+ }
+
+ /**
+ * Get Object file relative path relative to TSFile directory
+ *
+ * @param objectFile Object file
+ * @param tsFile TSFile file
+ * @return Object file relative path relative to TSFile directory
+ * @throws IOException when relative path cannot be calculated
+ */
+ private static String getObjectFileRelativePath(final File objectFile, final File tsFile)
+ throws IOException {
+ final File tsFileDir = tsFile.getParentFile();
+ if (tsFileDir == null) {
+ throw new IOException("TSFile has no parent directory: " + tsFile.getPath());
+ }
+
+ final String objectFilePath = objectFile.getCanonicalPath();
+ final String tsFileDirPath = tsFileDir.getCanonicalPath();
+
+ if (!objectFilePath.startsWith(tsFileDirPath)) {
+ throw new IOException(
+ String.format(
+ "Object file %s is not under TSFile directory %s", objectFilePath, tsFileDirPath));
+ }
+
+ // Calculate relative path
+ final String relativePath = objectFilePath.substring(tsFileDirPath.length());
+ // Remove leading path separator
+ return relativePath.startsWith(File.separator) || relativePath.startsWith("/")
+ ? relativePath.substring(1)
+ : relativePath;
+ }
+
+ /**
+ * Get Object file directory in pipe directory
+ *
+ *
Directory structure description:
+ *
+ *
+ * data/
+ * sequence/
+ * database/
+ * dataRegionId/
+ * timePartitionId/
+ * tsfile.tsfile
+ * pipe/
+ * tsfile/ # TSFile hardlink directory
+ * [pipeName]/ # If pipeName exists, under this subdirectory
+ * tsfile.tsfile # TSFile hardlink
+ * object/ # Object file hardlink directory (same level as tsfile)
+ * [pipeName]/ # If pipeName exists, under this subdirectory
+ * [tsfilename]/ # Directory named after TSFile, containing all Object files for this TSFile
+ * object1.obj # Object file hardlink
+ * object2.obj
+ *
+ *
+ * @param tsFile TSFile file (original file, not hardlink)
+ * @param pipeName pipe name, if null means it's assigner's public resource
+ * @return Object file directory
+ * @throws IOException when path cannot be determined
+ */
+ private static File getPipeObjectFileDir(final File tsFile, final @Nullable String pipeName)
+ throws IOException {
+ // Traverse up from TSFile path until finding sequence/unsequence directory
+ // Mimic implementation of PipeTsFileResourceManager.getPipeTsFileDirPath
+ File file = tsFile;
+ while (!file.getName().equals(IoTDBConstant.SEQUENCE_FOLDER_NAME)
+ && !file.getName().equals(IoTDBConstant.UNSEQUENCE_FOLDER_NAME)
+ && !file.getName().equals(PipeConfig.getInstance().getPipeHardlinkBaseDirName())) {
+ file = file.getParentFile();
+ if (file == null) {
+ throw new IOException("Cannot find sequence/unsequence folder for: " + tsFile.getPath());
+ }
+ }
+
+ // Build object directory path
+ // Structure: data/pipe/object/[pipeName]/[tsfilename]/
+ // tsfilename is obtained from tsFile.getName()
+ File objectDir =
+ new File(
+ file.getParentFile().getCanonicalPath()
+ + File.separator
+ + PipeConfig.getInstance().getPipeHardlinkBaseDirName()
+ + File.separator
+ + "object");
+
+ if (Objects.nonNull(pipeName)) {
+ objectDir = new File(objectDir, pipeName);
+ }
+
+ // Add TSFile name as subdirectory
+ objectDir = new File(objectDir, tsFile.getName());
+
+ return objectDir.getCanonicalFile();
+ }
+
+ /**
+ * Get Object directory for specified Pipe
+ *
+ * Use cached object/ root directory to build Pipe's Object directory
+ *
+ * @param pipeName pipe name
+ * @return Pipe's Object directory path: data/pipe/object/[pipeName]/, or null if not cached
+ */
+ private File getPipeObjectDirForPipe(final @Nullable String pipeName) {
+ if (objectBaseDirCache == null) {
+ LOGGER.warn(
+ "Object base directory not cached yet, cannot get pipe object directory for pipe: {}",
+ pipeName);
+ return null;
+ }
+
+ // Build Pipe's directory from cached root directory
+ // objectBaseDirCache: data/pipe/object/
+ // Return: data/pipe/object/[pipeName]/
+ File pipeObjectDir = objectBaseDirCache;
+ if (pipeName != null) {
+ pipeObjectDir = new File(pipeObjectDir, pipeName);
+ }
+
+ return pipeObjectDir;
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index 275bc694397d..649db8ebe44b 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -25,6 +25,7 @@
import org.apache.iotdb.db.pipe.sink.util.builder.PipeTableModelTsFileBuilderV2;
import org.apache.iotdb.db.pipe.sink.util.builder.PipeTreeModelTsFileBuilderV2;
import org.apache.iotdb.db.pipe.sink.util.builder.PipeTsFileBuilder;
+import org.apache.iotdb.db.pipe.sink.util.builder.PipeTsFileIdGenerator;
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
@@ -50,7 +51,18 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch {
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventTsFileBatch.class);
private static final AtomicLong BATCH_ID_GENERATOR = new AtomicLong(0);
- private final AtomicLong currentBatchId = new AtomicLong(BATCH_ID_GENERATOR.incrementAndGet());
+ private final AtomicLong currentBatchId = new AtomicLong(PipeTsFileIdGenerator.getNextBatchId());
+
+ /**
+ * Get the next BatchID
+ *
+ * @return the next BatchID
+ * @deprecated Use {@link PipeTsFileIdGenerator#getNextBatchId()} instead
+ */
+ @Deprecated
+ public static long getNextBatchId() {
+ return PipeTsFileIdGenerator.getNextBatchId();
+ }
private final PipeTsFileBuilder treeModeTsFileBuilder;
private final PipeTsFileBuilder tableModeTsFileBuilder;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/FileTransfer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/FileTransfer.java
new file mode 100644
index 000000000000..849cec1be7cf
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/FileTransfer.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.tsfile;
+
+import java.io.File;
+
+/**
+ * FileTransfer - File transfer interface
+ *
+ *
Used to abstract different file transfer methods, such as local copy, SCP transfer, etc.
+ */
+public interface FileTransfer extends AutoCloseable {
+
+ /**
+ * Test connection (if needed)
+ *
+ * @throws Exception throws exception when connection fails
+ */
+ void testConnection() throws Exception;
+
+ /**
+ * Transfer file
+ *
+ * @param localFile local file
+ * @throws Exception throws exception when transfer fails
+ */
+ void transferFile(final File localFile) throws Exception;
+
+ /**
+ * Transfer directory
+ *
+ * @param localDir local directory
+ * @throws Exception throws exception when transfer fails
+ */
+ void transferDirectory(final File localDir) throws Exception;
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/LocalFileTransfer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/LocalFileTransfer.java
new file mode 100644
index 000000000000..07b58ab866c6
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/LocalFileTransfer.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.tsfile;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+
+/**
+ * LocalFileTransfer - Local file transfer implementation
+ *
+ *
Use local file system for file copying
+ */
+public class LocalFileTransfer implements FileTransfer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(LocalFileTransfer.class);
+
+ private final String targetPath;
+
+ public LocalFileTransfer(final String targetPath) {
+ this.targetPath = targetPath;
+ }
+
+ @Override
+ public void testConnection() throws Exception {
+ // Local mode does not need to test connection, just check if target directory exists
+ final File targetDir = new File(targetPath);
+ if (!targetDir.exists()) {
+ if (!targetDir.mkdirs()) {
+ throw new PipeException("Failed to create target directory: " + targetPath);
+ }
+ LOGGER.info("Created target directory: {}", targetPath);
+ } else if (!targetDir.isDirectory()) {
+ throw new PipeException("Target path is not a directory: " + targetPath);
+ }
+ LOGGER.debug("Local file transfer target directory verified: {}", targetPath);
+ }
+
+ @Override
+ public void transferFile(final File localFile) throws Exception {
+ if (!localFile.exists()) {
+ throw new PipeException("Local file does not exist: " + localFile);
+ }
+
+ if (localFile.isDirectory()) {
+ transferDirectory(localFile);
+ return;
+ }
+
+ // Build target file path
+ final File targetFile = new File(targetPath, localFile.getName());
+
+ // Ensure target directory exists
+ final File targetDir = targetFile.getParentFile();
+ if (targetDir != null && !targetDir.exists()) {
+ if (!targetDir.mkdirs()) {
+ throw new PipeException("Failed to create target directory: " + targetDir);
+ }
+ }
+
+ // Copy file
+ try {
+ Files.copy(localFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ LOGGER.info("Copied file {} to {}", localFile, targetFile);
+ } catch (final IOException e) {
+ throw new PipeException(
+ String.format("Failed to copy file %s to %s: %s", localFile, targetFile, e.getMessage()),
+ e);
+ }
+ }
+
+ @Override
+ public void transferDirectory(final File localDir) throws Exception {
+ if (!localDir.exists() || !localDir.isDirectory()) {
+ throw new PipeException("Local directory does not exist or is not a directory: " + localDir);
+ }
+
+ // Build target directory path
+ final File targetDir = new File(targetPath, localDir.getName());
+
+ // Ensure target parent directory exists
+ final File targetParentDir = targetDir.getParentFile();
+ if (targetParentDir != null && !targetParentDir.exists()) {
+ if (!targetParentDir.mkdirs()) {
+ throw new PipeException("Failed to create target parent directory: " + targetParentDir);
+ }
+ }
+
+ // Recursively copy directory
+ try {
+ copyDirectoryRecursive(localDir, targetDir);
+ LOGGER.info("Copied directory {} to {}", localDir, targetDir);
+ } catch (final IOException e) {
+ throw new PipeException(
+ String.format(
+ "Failed to copy directory %s to %s: %s", localDir, targetDir, e.getMessage()),
+ e);
+ }
+ }
+
+ /**
+ * Recursively copy directory
+ *
+ * @param sourceDir source directory
+ * @param targetDir target directory
+ * @throws IOException if copy fails
+ */
+ private void copyDirectoryRecursive(final File sourceDir, final File targetDir)
+ throws IOException {
+ if (!targetDir.exists() && !targetDir.mkdirs()) {
+ throw new IOException("Failed to create target directory: " + targetDir);
+ }
+
+ final File[] files = sourceDir.listFiles();
+ if (files != null) {
+ for (final File file : files) {
+ final File targetFile = new File(targetDir, file.getName());
+ if (file.isDirectory()) {
+ copyDirectoryRecursive(file, targetFile);
+ } else {
+ Files.copy(file.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ // Local mode does not need to close connection
+ LOGGER.debug("LocalFileTransfer closed");
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/ScpFileTransfer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/ScpFileTransfer.java
new file mode 100644
index 000000000000..9b4961b441eb
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/ScpFileTransfer.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.tsfile;
+
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import com.jcraft.jsch.Channel;
+import com.jcraft.jsch.ChannelExec;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Properties;
+
+/** ScpFileTransfer - Transfer files to remote server via SCP using JSch library */
+public class ScpFileTransfer implements FileTransfer {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ScpFileTransfer.class);
+
+ private final String host;
+ private final int port;
+ private final String user;
+ private final String password;
+ private final String remotePath;
+
+ private JSch jsch;
+ private Session session;
+
+ public ScpFileTransfer(
+ final String host,
+ final int port,
+ final String user,
+ final String password,
+ final String remotePath) {
+ this.host = host;
+ this.port = port;
+ this.user = user;
+ this.password = password;
+ this.remotePath = remotePath;
+ this.jsch = new JSch();
+ }
+
+ /** Get or create SSH session */
+ private Session getSession() throws JSchException {
+ if (session == null || !session.isConnected()) {
+ session = jsch.getSession(user, host, port);
+ session.setPassword(password);
+
+ // Set SSH configuration
+ final Properties config = new Properties();
+ config.put("StrictHostKeyChecking", "no");
+ config.put("PreferredAuthentications", "password");
+ session.setConfig(config);
+
+ // Set connection timeout
+ session.setTimeout(10000);
+
+ session.connect();
+ LOGGER.debug("SSH session connected to {}@{}:{}", user, host, port);
+ }
+ return session;
+ }
+
+ /** Test SCP connection */
+ public void testConnection() throws Exception {
+ try {
+ final Session testSession = getSession();
+ if (testSession.isConnected()) {
+ LOGGER.info("SCP connection test successful: {}@{}:{}", user, host, port);
+ } else {
+ throw new PipeException(String.format("Failed to connect to %s@%s:%d", user, host, port));
+ }
+ } catch (final JSchException e) {
+ throw new PipeException(
+ String.format(
+ "Failed to test SCP connection to %s@%s:%d: %s", user, host, port, e.getMessage()),
+ e);
+ }
+ }
+
+ /**
+ * Transfer file to remote server
+ *
+ * @param localFile local file
+ * @throws Exception throws exception when transfer fails
+ */
+ public void transferFile(final File localFile) throws Exception {
+ if (!localFile.exists()) {
+ throw new PipeException("Local file does not exist: " + localFile);
+ }
+
+ if (localFile.isDirectory()) {
+ transferDirectory(localFile);
+ return;
+ }
+
+ final Session currentSession = getSession();
+ Channel channel = null;
+ FileInputStream fileInputStream = null;
+
+ try {
+ // Create SCP channel
+ final String command = String.format("scp -t %s", remotePath);
+ channel = currentSession.openChannel("exec");
+ ((ChannelExec) channel).setCommand(command);
+
+ // Get input/output streams
+ final InputStream inputStream = channel.getInputStream();
+ final java.io.OutputStream outputStream = channel.getOutputStream();
+
+ channel.connect();
+
+ // Check if remote command succeeded
+ checkAck(inputStream);
+
+ // Send file information
+ final long fileSize = localFile.length();
+ final String fileCommand = String.format("C0644 %d %s\n", fileSize, localFile.getName());
+ outputStream.write(fileCommand.getBytes());
+ outputStream.flush();
+ checkAck(inputStream);
+
+ // Transfer file content
+ fileInputStream = new FileInputStream(localFile);
+ final byte[] buffer = new byte[1024];
+ long totalBytes = 0;
+ while (totalBytes < fileSize) {
+ final int bytesRead =
+ fileInputStream.read(buffer, 0, (int) Math.min(buffer.length, fileSize - totalBytes));
+ if (bytesRead < 0) {
+ break;
+ }
+ outputStream.write(buffer, 0, bytesRead);
+ totalBytes += bytesRead;
+ }
+
+ // Send end marker
+ outputStream.write(0);
+ outputStream.flush();
+ checkAck(inputStream);
+
+ LOGGER.info(
+ "Successfully transferred file {} to {}@{}:{}", localFile, user, host, remotePath);
+ } catch (final Exception e) {
+ throw new PipeException(
+ String.format(
+ "Failed to transfer file %s to %s@%s:%s: %s",
+ localFile, user, host, remotePath, e.getMessage()),
+ e);
+ } finally {
+ if (fileInputStream != null) {
+ try {
+ fileInputStream.close();
+ } catch (final IOException e) {
+ LOGGER.warn("Failed to close file input stream", e);
+ }
+ }
+ if (channel != null) {
+ channel.disconnect();
+ }
+ }
+ }
+
+ /**
+ * Transfer directory to remote server
+ *
+ * @param localDir local directory
+ * @throws Exception throws exception when transfer fails
+ */
+ public void transferDirectory(final File localDir) throws Exception {
+ if (!localDir.exists() || !localDir.isDirectory()) {
+ throw new PipeException("Local directory does not exist or is not a directory: " + localDir);
+ }
+
+ final Session currentSession = getSession();
+ Channel channel = null;
+
+ try {
+ // Create SCP channel for directory transfer
+ final String command = String.format("scp -r -t %s", remotePath);
+ channel = currentSession.openChannel("exec");
+ ((ChannelExec) channel).setCommand(command);
+
+ final InputStream inputStream = channel.getInputStream();
+ final java.io.OutputStream outputStream = channel.getOutputStream();
+
+ channel.connect();
+ checkAck(inputStream);
+
+ // Recursively transfer directory
+ transferDirectoryRecursive(localDir, localDir.getName(), outputStream, inputStream);
+
+ LOGGER.info(
+ "Successfully transferred directory {} to {}@{}:{}", localDir, user, host, remotePath);
+ } catch (final Exception e) {
+ throw new PipeException(
+ String.format(
+ "Failed to transfer directory %s to %s@%s:%s: %s",
+ localDir, user, host, remotePath, e.getMessage()),
+ e);
+ } finally {
+ if (channel != null) {
+ channel.disconnect();
+ }
+ }
+ }
+
+ /** Recursively transfer directory */
+ private void transferDirectoryRecursive(
+ final File localDir,
+ final String remoteDirName,
+ final java.io.OutputStream outputStream,
+ final InputStream inputStream)
+ throws Exception {
+ // Create remote directory
+ final String dirCommand = String.format("D0755 0 %s\n", remoteDirName);
+ outputStream.write(dirCommand.getBytes());
+ outputStream.flush();
+ checkAck(inputStream);
+
+ // Transfer files in directory
+ final File[] files = localDir.listFiles();
+ if (files != null) {
+ for (final File file : files) {
+ if (file.isFile()) {
+ transferFileInDirectory(file, file.getName(), outputStream, inputStream);
+ } else if (file.isDirectory()) {
+ transferDirectoryRecursive(file, file.getName(), outputStream, inputStream);
+ }
+ }
+ }
+
+ // End directory
+ outputStream.write("E\n".getBytes());
+ outputStream.flush();
+ checkAck(inputStream);
+ }
+
+ /** Transfer single file in directory */
+ private void transferFileInDirectory(
+ final File localFile,
+ final String remoteFileName,
+ final java.io.OutputStream outputStream,
+ final InputStream inputStream)
+ throws Exception {
+ FileInputStream fileInputStream = null;
+ try {
+ final long fileSize = localFile.length();
+ final String fileCommand = String.format("C0644 %d %s\n", fileSize, remoteFileName);
+ outputStream.write(fileCommand.getBytes());
+ outputStream.flush();
+ checkAck(inputStream);
+
+ fileInputStream = new FileInputStream(localFile);
+ final byte[] buffer = new byte[1024];
+ long totalBytes = 0;
+ while (totalBytes < fileSize) {
+ final int bytesRead =
+ fileInputStream.read(buffer, 0, (int) Math.min(buffer.length, fileSize - totalBytes));
+ if (bytesRead < 0) {
+ break;
+ }
+ outputStream.write(buffer, 0, bytesRead);
+ totalBytes += bytesRead;
+ }
+
+ outputStream.write(0);
+ outputStream.flush();
+ checkAck(inputStream);
+ } finally {
+ if (fileInputStream != null) {
+ try {
+ fileInputStream.close();
+ } catch (final IOException e) {
+ LOGGER.warn("Failed to close file input stream", e);
+ }
+ }
+ }
+ }
+
+ /** Check SCP command response */
+ private void checkAck(final InputStream inputStream) throws IOException {
+ final int b = inputStream.read();
+ if (b == 0) {
+ return; // Success
+ }
+ if (b == -1) {
+ throw new IOException("Unexpected end of stream");
+ }
+ if (b == 1 || b == 2) {
+ // Error or warning
+ final StringBuilder errorMsg = new StringBuilder();
+ int c;
+ while ((c = inputStream.read()) != '\n') {
+ errorMsg.append((char) c);
+ }
+ if (b == 1) {
+ throw new IOException("SCP error: " + errorMsg.toString());
+ } else {
+ LOGGER.warn("SCP warning: {}", errorMsg.toString());
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (session != null && session.isConnected()) {
+ session.disconnect();
+ LOGGER.debug("SSH session disconnected");
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/TsFileSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/TsFileSink.java
new file mode 100644
index 000000000000..6267dfa4010a
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/TsFileSink.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.tsfile;
+
+import org.apache.iotdb.pipe.api.PipeConnector;
+import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
+import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
+import org.apache.iotdb.pipe.api.event.Event;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * TsFileSink - Export Pipe data as TSFile format
+ *
+ *
Supports two modes:
+ *
+ *
+ * - Local mode (local): Write directly to local directory
+ *
- SCP mode (scp): Write to local temporary directory first, then transfer to remote server
+ * via SCP
+ *
+ *
+ * Supports two event types:
+ *
+ *
+ * - TabletInsertionEvent: Build new TSFile, write InsertNode data
+ *
- TsFileInsertionEvent: Copy existing TSFile to target directory
+ *
+ *
+ * Object file handling:
+ *
+ *
+ * - Scan Object data paths in events
+ *
- Hard link Object files to ${tsfilename}/ subdirectory in target directory
+ *
- Maintain original relative path structure
+ *
+ */
+public class TsFileSink implements PipeConnector {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TsFileSink.class);
+
+ // Configuration parameter keys
+ private static final String SINK_FILE_MODE_KEY = "sink.file-mode";
+ private static final String SINK_TARGET_PATH_KEY = "sink.target-path";
+ private static final String SINK_SCP_REMOTE_PATH_KEY = "sink.scp.remote-path";
+ private static final String SINK_SCP_HOST_KEY = "sink.scp.host";
+ private static final String SINK_SCP_PORT_KEY = "sink.scp.port";
+ private static final String SINK_SCP_USER_KEY = "sink.scp.user";
+ private static final String SINK_SCP_PASSWORD_KEY = "sink.scp.password";
+
+ // Default values
+ private static final String FILE_MODE_LOCAL = "local";
+ private static final String FILE_MODE_SCP = "scp";
+ private static final int DEFAULT_SCP_PORT = 22;
+
+ // Runtime configuration
+ private String fileMode;
+ private String
+ targetPath; // Target path for local mode, or temporary path for SCP mode (Writer's target
+ // directory)
+
+ // SCP configuration
+ private String scpRemotePath;
+ private String scpHost;
+ private int scpPort;
+ private String scpUser;
+ private String scpPassword;
+
+ // Runtime environment
+ private String pipeName;
+ private long creationTime;
+
+ // TSFile builder
+ private TsFileSinkWriter tsFileWriter;
+
+ // File transfer
+ private FileTransfer fileTransfer;
+
+ // SCP file transfer (for type conversion)
+ private ScpFileTransfer scpTransfer;
+
+ @Override
+ public void validate(final PipeParameterValidator validator) throws Exception {
+ final PipeParameters parameters = validator.getParameters();
+
+ // Validate file-mode
+ final String mode =
+ parameters
+ .getStringOrDefault(Arrays.asList(SINK_FILE_MODE_KEY), FILE_MODE_LOCAL)
+ .toLowerCase();
+
+ if (!FILE_MODE_LOCAL.equals(mode) && !FILE_MODE_SCP.equals(mode)) {
+ throw new PipeException(
+ String.format("Invalid file-mode '%s'. Must be 'local' or 'scp'.", mode));
+ }
+
+ // Validate target path (required for both local and SCP modes)
+ validator.validateRequiredAttribute(SINK_TARGET_PATH_KEY);
+
+ // Validate SCP mode parameters
+ if (FILE_MODE_SCP.equals(mode)) {
+ validator.validateRequiredAttribute(SINK_SCP_REMOTE_PATH_KEY);
+ validator.validateRequiredAttribute(SINK_SCP_HOST_KEY);
+ validator.validateRequiredAttribute(SINK_SCP_USER_KEY);
+ validator.validateRequiredAttribute(SINK_SCP_PASSWORD_KEY);
+ }
+ }
+
+ @Override
+ public void customize(
+ final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration)
+ throws Exception {
+ this.pipeName = configuration.getRuntimeEnvironment().getPipeName();
+ this.creationTime = configuration.getRuntimeEnvironment().getCreationTime();
+
+ // Read configuration
+ this.fileMode =
+ parameters
+ .getStringOrDefault(Arrays.asList(SINK_FILE_MODE_KEY), FILE_MODE_LOCAL)
+ .toLowerCase();
+
+ // Read target path (target path for local mode, or temporary path for SCP mode)
+ this.targetPath = parameters.getString(SINK_TARGET_PATH_KEY);
+ ensureDirectoryExists(targetPath);
+
+ if (FILE_MODE_LOCAL.equals(fileMode)) {
+ // Local mode
+ LOGGER.info("TsFileSink configured in LOCAL mode, target path: {}", targetPath);
+ } else {
+ // SCP mode
+ this.scpRemotePath = parameters.getString(SINK_SCP_REMOTE_PATH_KEY);
+ this.scpHost = parameters.getString(SINK_SCP_HOST_KEY);
+ this.scpPort = parameters.getIntOrDefault(Arrays.asList(SINK_SCP_PORT_KEY), DEFAULT_SCP_PORT);
+ this.scpUser = parameters.getString(SINK_SCP_USER_KEY);
+ this.scpPassword = parameters.getString(SINK_SCP_PASSWORD_KEY);
+
+ LOGGER.info(
+ "TsFileSink configured in SCP mode, local tmp: {}, remote: {}@{}:{}",
+ targetPath,
+ scpUser,
+ scpHost,
+ scpRemotePath);
+ }
+
+ // Initialize TSFile writer (use targetPath as target directory)
+ this.tsFileWriter = new TsFileSinkWriter(targetPath, pipeName, creationTime);
+
+ // Initialize file transfer
+ if (FILE_MODE_LOCAL.equals(fileMode)) {
+ // Local mode: use local file transfer
+ this.fileTransfer = new LocalFileTransfer(targetPath);
+ } else {
+ // SCP mode: use SCP transfer
+ this.scpTransfer = new ScpFileTransfer(scpHost, scpPort, scpUser, scpPassword, scpRemotePath);
+ this.fileTransfer = scpTransfer;
+ }
+ }
+
+ @Override
+ public void handshake() throws Exception {
+ // Test file transfer connection
+ if (fileTransfer != null) {
+ fileTransfer.testConnection();
+ LOGGER.info("File transfer connection test successful");
+ }
+ }
+
+ @Override
+ public void heartbeat() throws Exception {
+ // Check file transfer connection
+ if (fileTransfer != null) {
+ fileTransfer.testConnection();
+ }
+ }
+
+ @Override
+ public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception {
+ // Handle TabletInsertionEvent: build new TSFile
+ tsFileWriter.writeTablet(tabletInsertionEvent);
+
+ // Check if flush is needed and transfer files
+ final List generatedFiles = tsFileWriter.flushTabletsIfNeeded();
+ if (!generatedFiles.isEmpty() && fileTransfer != null) {
+ transferFiles(generatedFiles);
+ }
+ }
+
+ @Override
+ public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exception {
+ // Handle TsFileInsertionEvent: copy TSFile
+ final File copiedFile = tsFileWriter.copyTsFile(tsFileInsertionEvent);
+
+ // Transfer files
+ if (copiedFile != null && fileTransfer != null) {
+ transferFiles(Collections.singletonList(copiedFile));
+ }
+ }
+
+ /** Transfer files (including TSFile and related Object files) */
+ private void transferFiles(final List tsFiles) throws Exception {
+ for (final File tsFile : tsFiles) {
+ // Transfer TSFile
+ fileTransfer.transferFile(tsFile);
+
+ // Transfer Tablet Object directory (if exists)
+ final File tabletObjectDir = new File(tsFileWriter.getTargetDirectory(), "tablet_objects");
+ if (tabletObjectDir.exists() && tabletObjectDir.isDirectory()) {
+ fileTransfer.transferDirectory(tabletObjectDir);
+ LOGGER.info("Transferred tablet object directory");
+ }
+
+ // Transfer TsFile Object directory (if exists)
+ final String tsFileName = tsFile.getName();
+ final String tsFileNameWithoutSuffix = tsFileName.replace(".tsfile", "");
+ final File tsFileObjectDir =
+ new File(tsFileWriter.getTargetDirectory(), tsFileNameWithoutSuffix);
+
+ if (tsFileObjectDir.exists() && tsFileObjectDir.isDirectory()) {
+ fileTransfer.transferDirectory(tsFileObjectDir);
+ LOGGER.info("Transferred tsfile object directory {}", tsFileObjectDir.getName());
+ }
+ }
+ }
+
+ @Override
+ public void transfer(final Event event) throws Exception {
+ if (event instanceof TabletInsertionEvent) {
+ transfer((TabletInsertionEvent) event);
+ } else if (event instanceof TsFileInsertionEvent) {
+ transfer((TsFileInsertionEvent) event);
+ } else {
+ LOGGER.warn("Unsupported event type: {}", event.getClass().getName());
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ try {
+ // Flush remaining Tablets and transfer
+ if (tsFileWriter != null) {
+ final List remainingFiles = tsFileWriter.flushTablets();
+ if (!remainingFiles.isEmpty() && fileTransfer != null) {
+ transferFiles(remainingFiles);
+ }
+ tsFileWriter.close();
+ }
+ } finally {
+ // Close file transfer
+ if (fileTransfer != null) {
+ fileTransfer.close();
+ }
+ }
+
+ LOGGER.info("TsFileSink closed for pipe: {}", pipeName);
+ }
+
+ /** Ensure directory exists */
+ private void ensureDirectoryExists(final String path) throws PipeException {
+ final File dir = new File(path);
+ if (!dir.exists()) {
+ if (!dir.mkdirs()) {
+ throw new PipeException("Failed to create directory: " + path);
+ }
+ }
+ if (!dir.isDirectory()) {
+ throw new PipeException("Path is not a directory: " + path);
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/TsFileSinkWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/TsFileSinkWriter.java
new file mode 100644
index 000000000000..1e216db4d774
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/TsFileSinkWriter.java
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.protocol.tsfile;
+
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
+import org.apache.iotdb.db.pipe.sink.util.builder.PipeTableModelTsFileBuilderV2;
+import org.apache.iotdb.db.pipe.sink.util.builder.PipeTreeModelTsFileBuilderV2;
+import org.apache.iotdb.db.pipe.sink.util.builder.PipeTsFileBuilder;
+import org.apache.iotdb.db.pipe.sink.util.builder.PipeTsFileIdGenerator;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
+import org.apache.iotdb.pipe.api.exception.PipeException;
+
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.StandardCopyOption;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * TsFileSinkWriter - Responsible for writing event data to TSFile and handling Object files
+ *
+ * Features:
+ *
+ *
+ * - Use PipeTsFileBuilder to cache TabletInsertionEvent and batch write to TSFile
+ *
- Copy TSFile corresponding to TsFileInsertionEvent
+ *
- Handle hard links for Object files
+ *
+ */
+public class TsFileSinkWriter implements AutoCloseable {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(TsFileSinkWriter.class);
+
+ private static final int TABLET_BUFFER_SIZE = 1000; // Number of Tablets to buffer before flushing
+
+ private final String targetDirectory;
+ private final String pipeName;
+ private final long creationTime;
+
+ // TSFile Builders
+ private final AtomicLong currentBatchId = new AtomicLong(PipeTsFileIdGenerator.getNextBatchId());
+ private final AtomicLong tsFileIdGenerator = new AtomicLong(0);
+ private PipeTsFileBuilder treeModelBuilder;
+ private PipeTsFileBuilder tableModelBuilder;
+
+ private int bufferedTabletCount = 0;
+
+ public TsFileSinkWriter(
+ final String targetDirectory, final String pipeName, final long creationTime) {
+ this.targetDirectory = targetDirectory;
+ this.pipeName = pipeName;
+ this.creationTime = creationTime;
+
+ // Initialize Builders
+ this.treeModelBuilder = new PipeTreeModelTsFileBuilderV2(currentBatchId, tsFileIdGenerator);
+ this.tableModelBuilder = new PipeTableModelTsFileBuilderV2(currentBatchId, tsFileIdGenerator);
+ }
+
+ /** Write TabletInsertionEvent */
+ public synchronized void writeTablet(final TabletInsertionEvent event) throws Exception {
+ if (event instanceof PipeInsertNodeTabletInsertionEvent) {
+ writeInsertNodeTablet((PipeInsertNodeTabletInsertionEvent) event);
+ } else if (event instanceof PipeRawTabletInsertionEvent) {
+ writeRawTablet((PipeRawTabletInsertionEvent) event);
+ } else {
+ LOGGER.warn("Unsupported TabletInsertionEvent type: {}", event.getClass().getName());
+ }
+ }
+
+ /** Write PipeInsertNodeTabletInsertionEvent */
+ private void writeInsertNodeTablet(final PipeInsertNodeTabletInsertionEvent event)
+ throws Exception {
+ // Immediately handle Object file hard links
+ handleObjectFilesForTabletEvent(event);
+
+ // Convert to Tablet list
+ final List tablets = event.convertToTablets();
+ if (tablets == null || tablets.isEmpty()) {
+ return;
+ }
+
+ // Process each Tablet
+ for (int i = 0; i < tablets.size(); i++) {
+ final Tablet tablet = tablets.get(i);
+ if (tablet == null || tablet.getRowSize() == 0) {
+ continue;
+ }
+
+ // Select Builder based on event type
+ if (event.isTableModelEvent()) {
+ // Table model
+ tableModelBuilder.bufferTableModelTablet(
+ event.getSourceDatabaseNameFromDataRegion(), tablet);
+ } else {
+ // Tree model
+ treeModelBuilder.bufferTreeModelTablet(tablet, event.isAligned(i));
+ }
+
+ bufferedTabletCount++;
+ }
+ }
+
+ /** Write PipeRawTabletInsertionEvent */
+ private void writeRawTablet(final PipeRawTabletInsertionEvent event) throws Exception {
+ // Immediately handle Object file hard links
+ handleObjectFilesForTabletEvent(event);
+
+ // Get Tablet from event
+ final Tablet tablet = event.convertToTablet();
+ if (tablet == null || tablet.getRowSize() == 0) {
+ return;
+ }
+
+ // Select Builder based on event type
+ if (event.isTableModelEvent()) {
+ // Table model
+ tableModelBuilder.bufferTableModelTablet(event.getSourceDatabaseNameFromDataRegion(), tablet);
+ } else {
+ // Tree model
+ treeModelBuilder.bufferTreeModelTablet(tablet, event.isAligned());
+ }
+
+ bufferedTabletCount++;
+ }
+
+ /**
+ * Flush buffered Tablets to TSFile
+ *
+ * @return list of generated TSFiles
+ */
+ public List flushTablets() throws Exception {
+ final List generatedFiles = new ArrayList<>();
+
+ if (bufferedTabletCount == 0) {
+ return generatedFiles;
+ }
+
+ try {
+ // Flush Tree model Tablets
+ if (!treeModelBuilder.isEmpty()) {
+ final List> sealedFiles =
+ treeModelBuilder.convertTabletToTsFileWithDBInfo();
+
+ for (final Pair pair : sealedFiles) {
+ final File tsFile = pair.getRight();
+ // Generate final file name (using global generator to ensure uniqueness)
+ final String finalFileName = generateFinalTsFileName(tsFile.getName());
+ final File targetFile = new File(targetDirectory, finalFileName);
+
+ // If target file exists, generate new file name with sequence number
+ File actualTargetFile = targetFile;
+ long sequence = 0;
+ while (actualTargetFile.exists()) {
+ sequence++;
+ final String nameWithoutSuffix =
+ finalFileName.replace(TsFileConstant.TSFILE_SUFFIX, "");
+ final String newFileName =
+ nameWithoutSuffix + "_" + sequence + TsFileConstant.TSFILE_SUFFIX;
+ actualTargetFile = new File(targetDirectory, newFileName);
+ }
+
+ // Move file to target directory
+ Files.move(
+ tsFile.toPath(), actualTargetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+
+ LOGGER.info("Created TSFile: {}", actualTargetFile.getAbsolutePath());
+ generatedFiles.add(actualTargetFile);
+ }
+
+ treeModelBuilder.onSuccess();
+ }
+
+ // Flush Table model Tablets
+ if (!tableModelBuilder.isEmpty()) {
+ final List> sealedFiles =
+ tableModelBuilder.convertTabletToTsFileWithDBInfo();
+
+ for (final Pair pair : sealedFiles) {
+ final File tsFile = pair.getRight();
+ // Generate final file name (using global generator to ensure uniqueness)
+ final String finalFileName = generateFinalTsFileName(tsFile.getName());
+ final File targetFile = new File(targetDirectory, finalFileName);
+
+ // If target file exists, generate new file name with sequence number
+ File actualTargetFile = targetFile;
+ long sequence = 0;
+ while (actualTargetFile.exists()) {
+ sequence++;
+ final String nameWithoutSuffix =
+ finalFileName.replace(TsFileConstant.TSFILE_SUFFIX, "");
+ final String newFileName =
+ nameWithoutSuffix + "_" + sequence + TsFileConstant.TSFILE_SUFFIX;
+ actualTargetFile = new File(targetDirectory, newFileName);
+ }
+
+ // Move file to target directory
+ Files.move(
+ tsFile.toPath(), actualTargetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+
+ LOGGER.info("Created TSFile: {}", actualTargetFile.getAbsolutePath());
+ generatedFiles.add(actualTargetFile);
+ }
+
+ tableModelBuilder.onSuccess();
+ }
+
+ } finally {
+ // Reset counter
+ bufferedTabletCount = 0;
+
+ // Recreate Builders (using new BatchID)
+ currentBatchId.set(PipeTsFileIdGenerator.getNextBatchId());
+ treeModelBuilder = new PipeTreeModelTsFileBuilderV2(currentBatchId, tsFileIdGenerator);
+ tableModelBuilder = new PipeTableModelTsFileBuilderV2(currentBatchId, tsFileIdGenerator);
+ }
+
+ return generatedFiles;
+ }
+
+ /**
+ * Check if flush is needed, flush if needed and return generated file list
+ *
+ * @return list of generated files, empty list if flush is not needed
+ * @throws Exception if flush fails
+ */
+ public List flushTabletsIfNeeded() throws Exception {
+ if (bufferedTabletCount >= TABLET_BUFFER_SIZE) {
+ return flushTablets();
+ }
+ return Collections.emptyList();
+ }
+
+ /**
+ * Handle Object file hard links for TabletInsertionEvent Process immediately when data is
+ * captured
+ */
+ private void handleObjectFilesForTabletEvent(final EnrichedEvent event) throws Exception {
+ // First check if there is Object data
+ if (!event.hasObjectData()) {
+ return;
+ }
+
+ // Scan Object data
+ event.scanForObjectData();
+
+ final String[] objectPaths = event.getObjectPaths();
+ if (objectPaths == null || objectPaths.length == 0) {
+ return;
+ }
+
+ // Get TsFileResource
+ final TsFileResource tsFileResource = (TsFileResource) event.getTsFileResource();
+ if (tsFileResource == null || tsFileResource.getTsFile() == null) {
+ LOGGER.warn("TsFileResource is null, cannot process object files");
+ return;
+ }
+
+ // Create Object directory: ${targetDirectory}/tablet_objects/
+ // Use unified directory to store Tablet Object files
+ final File objectDir = new File(targetDirectory, "tablet_objects");
+ if (!objectDir.exists() && !objectDir.mkdirs()) {
+ throw new PipeException("Failed to create object directory: " + objectDir);
+ }
+
+ // Get pipeName (from event)
+ final String pipeName = event.getPipeName();
+
+ // Create hard links for each Object file
+ int linkedCount = 0;
+ for (final String relativePath : objectPaths) {
+ if (relativePath == null || relativePath.isEmpty()) {
+ continue;
+ }
+
+ try {
+ // Get Object file hard link File object through PipeObjectResourceManager
+ final File hardlinkSourceFile =
+ PipeDataNodeResourceManager.object()
+ .getObjectFileHardlink(tsFileResource, relativePath, pipeName);
+
+ if (hardlinkSourceFile == null || !hardlinkSourceFile.exists()) {
+ LOGGER.warn("Hardlink source file does not exist for relative path: {}", relativePath);
+ continue;
+ }
+
+ // Target Object file path (maintain relative path)
+ final File targetObjectFile = new File(objectDir, relativePath);
+
+ // Ensure parent directory exists
+ final File targetObjectFileParent = targetObjectFile.getParentFile();
+ if (targetObjectFileParent != null && !targetObjectFileParent.exists()) {
+ targetObjectFileParent.mkdirs();
+ }
+
+ // Create hard link (skip if already exists)
+ if (!targetObjectFile.exists()) {
+ FileUtils.createHardLink(hardlinkSourceFile, targetObjectFile);
+ linkedCount++;
+ LOGGER.debug("Linked object file: {} -> {}", hardlinkSourceFile, targetObjectFile);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to link object file {}: {}", relativePath, e.getMessage(), e);
+ }
+ }
+
+ if (linkedCount > 0) {
+ LOGGER.info(
+ "Linked {} object files for TabletInsertionEvent to directory: {}",
+ linkedCount,
+ objectDir.getAbsolutePath());
+ }
+ }
+
+ /**
+ * Copy TSFile corresponding to TsFileInsertionEvent
+ *
+ * @return copied TSFile, or null if failed
+ */
+ public synchronized File copyTsFile(final TsFileInsertionEvent event) throws Exception {
+ if (!(event instanceof PipeTsFileInsertionEvent)) {
+ LOGGER.warn("Unsupported TsFileInsertionEvent type: {}", event.getClass().getName());
+ return null;
+ }
+
+ final PipeTsFileInsertionEvent tsFileEvent = (PipeTsFileInsertionEvent) event;
+ final File sourceTsFile = tsFileEvent.getTsFile();
+
+ if (sourceTsFile == null || !sourceTsFile.exists()) {
+ LOGGER.warn("Source TSFile does not exist: {}", sourceTsFile);
+ return null;
+ }
+
+ // First flush currently buffered Tablets
+ if (bufferedTabletCount > 0) {
+ flushTablets();
+ }
+
+ // Generate target file name
+ final String targetFileName = sourceTsFile.getName();
+ final File targetTsFile = new File(targetDirectory, targetFileName);
+
+ // Copy TSFile
+ Files.copy(sourceTsFile.toPath(), targetTsFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
+
+ LOGGER.info("Copied TSFile from {} to {}", sourceTsFile, targetTsFile);
+
+ // Handle Object files
+ handleObjectFilesForTsFile(tsFileEvent, targetTsFile);
+
+ return targetTsFile;
+ }
+
+ /** Get target directory */
+ public String getTargetDirectory() {
+ return targetDirectory;
+ }
+
+ /**
+ * Generate final TSFile file name
+ *
+ * If original file name is a temporary file name (tb_*), keep original file name Otherwise use
+ * original file name
+ *
+ * @param originalFileName original file name
+ * @return final file name
+ */
+ private String generateFinalTsFileName(final String originalFileName) {
+ // If already in temporary file name format, keep as is
+ if (PipeTsFileIdGenerator.isValidTempFileName(originalFileName)) {
+ return originalFileName;
+ }
+ // Otherwise keep original file name
+ return originalFileName;
+ }
+
+ /** Handle Object file hard links for TSFile */
+ private void handleObjectFilesForTsFile(
+ final PipeTsFileInsertionEvent event, final File targetTsFile) throws Exception {
+ // Scan Object data
+ event.scanForObjectData();
+
+ final String[] objectPaths = event.getObjectPaths();
+ if (objectPaths == null || objectPaths.length == 0) {
+ LOGGER.debug("No object files to link for TSFile: {}", targetTsFile.getName());
+ return;
+ }
+
+ // Create Object directory: ${targetDirectory}/${tsfilename}/
+ final String tsFileName = targetTsFile.getName();
+ final String tsFileNameWithoutSuffix = tsFileName.replace(TsFileConstant.TSFILE_SUFFIX, "");
+ final File objectDir = new File(targetDirectory, tsFileNameWithoutSuffix);
+
+ if (!objectDir.exists() && !objectDir.mkdirs()) {
+ throw new PipeException("Failed to create object directory: " + objectDir);
+ }
+
+ // Get source TSFile directory
+ final File sourceTsFile = event.getTsFile();
+ final File sourceTsFileDir = sourceTsFile.getParentFile();
+
+ // Create hard links for each Object file
+ int linkedCount = 0;
+ for (final String relativePath : objectPaths) {
+ if (relativePath == null || relativePath.isEmpty()) {
+ continue;
+ }
+
+ try {
+ // Source Object file path (relative to source TSFile directory)
+ final File sourceObjectFile = new File(sourceTsFileDir, relativePath);
+ if (!sourceObjectFile.exists()) {
+ LOGGER.warn("Source object file does not exist: {}", sourceObjectFile);
+ continue;
+ }
+
+ // Target Object file path (maintain relative path)
+ final File targetObjectFile = new File(objectDir, relativePath);
+
+ // Ensure parent directory exists
+ final File targetObjectFileParent = targetObjectFile.getParentFile();
+ if (targetObjectFileParent != null && !targetObjectFileParent.exists()) {
+ targetObjectFileParent.mkdirs();
+ }
+
+ // Create hard link
+ FileUtils.createHardLink(sourceObjectFile, targetObjectFile);
+ linkedCount++;
+
+ LOGGER.debug("Linked object file: {} -> {}", sourceObjectFile, targetObjectFile);
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to link object file {}: {}", relativePath, e.getMessage(), e);
+ }
+ }
+
+ LOGGER.info(
+ "Linked {} object files for TSFile: {} to directory: {}",
+ linkedCount,
+ tsFileName,
+ objectDir.getAbsolutePath());
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ try {
+ // Flush remaining Tablets
+ if (bufferedTabletCount > 0) {
+ flushTablets();
+ }
+ } finally {
+ // Close Builders
+ if (treeModelBuilder != null) {
+ treeModelBuilder.close();
+ }
+ if (tableModelBuilder != null) {
+ tableModelBuilder.close();
+ }
+ }
+
+ LOGGER.info("TsFileSinkWriter closed");
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileIdGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileIdGenerator.java
new file mode 100644
index 000000000000..079fb05beb38
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileIdGenerator.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink.util.builder;
+
+import org.apache.tsfile.common.constant.TsFileConstant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * PipeTsFileIdGenerator - Global TSFile ID Generator
+ *
+ *
Provides unified BatchID and TSFile ID allocation, as well as file name generation rules
+ */
+public class PipeTsFileIdGenerator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileIdGenerator.class);
+
+ // Global BatchID generator
+ private static final AtomicLong GLOBAL_BATCH_ID_GENERATOR = new AtomicLong(0);
+
+ // Global TSFile ID generator
+ private static final AtomicLong GLOBAL_TSFILE_ID_GENERATOR = new AtomicLong(0);
+
+ // TSFile file name prefix
+ private static final String TS_FILE_PREFIX = "tb"; // tb means tablet batch
+
+ /**
+ * Get the next BatchID
+ *
+ * @return the next BatchID
+ */
+ public static long getNextBatchId() {
+ return GLOBAL_BATCH_ID_GENERATOR.incrementAndGet();
+ }
+
+ /**
+ * Get the next TSFile ID
+ *
+ * @return the next TSFile ID
+ */
+ public static long getNextTsFileId() {
+ return GLOBAL_TSFILE_ID_GENERATOR.incrementAndGet();
+ }
+
+ /**
+ * Generate temporary TSFile file name
+ *
+ *
File name format: tb_{dataNodeId}_{batchId}_{tsFileId}.tsfile
+ *
+ * @param dataNodeId DataNode ID
+ * @param batchId Batch ID
+ * @param tsFileId TSFile ID
+ * @return temporary TSFile file name
+ */
+ public static String generateTempTsFileName(
+ final int dataNodeId, final long batchId, final long tsFileId) {
+ return TS_FILE_PREFIX
+ + "_"
+ + dataNodeId
+ + "_"
+ + batchId
+ + "_"
+ + tsFileId
+ + TsFileConstant.TSFILE_SUFFIX;
+ }
+
+ /**
+ * Generate temporary TSFile file path
+ *
+ * @param baseDir base directory
+ * @param dataNodeId DataNode ID
+ * @param batchId Batch ID
+ * @param tsFileId TSFile ID
+ * @return temporary TSFile file path
+ */
+ public static File generateTempTsFilePath(
+ final File baseDir, final int dataNodeId, final long batchId, final long tsFileId) {
+ return new File(baseDir, generateTempTsFileName(dataNodeId, batchId, tsFileId));
+ }
+
+ /**
+ * Parse BatchID from temporary file name
+ *
+ * @param fileName temporary file name (format: tb_{dataNodeId}_{batchId}_{tsFileId}.tsfile)
+ * @return BatchID, or -1 if parsing fails
+ */
+ public static long parseBatchIdFromTempFileName(final String fileName) {
+ try {
+ if (fileName == null || !fileName.startsWith(TS_FILE_PREFIX + "_")) {
+ return -1;
+ }
+ final String[] parts = fileName.substring(TS_FILE_PREFIX.length() + 1).split("_");
+ if (parts.length >= 2) {
+ return Long.parseLong(parts[1]);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to parse batch ID from file name: {}", fileName, e);
+ }
+ return -1;
+ }
+
+ /**
+ * Parse TSFile ID from temporary file name
+ *
+ * @param fileName temporary file name (format: tb_{dataNodeId}_{batchId}_{tsFileId}.tsfile)
+ * @return TSFile ID, or -1 if parsing fails
+ */
+ public static long parseTsFileIdFromTempFileName(final String fileName) {
+ try {
+ if (fileName == null || !fileName.startsWith(TS_FILE_PREFIX + "_")) {
+ return -1;
+ }
+ final String[] parts = fileName.substring(TS_FILE_PREFIX.length() + 1).split("_");
+ if (parts.length >= 3) {
+ final String tsFileIdPart = parts[2];
+ // Remove .tsfile suffix
+ final int suffixIndex = tsFileIdPart.indexOf(TsFileConstant.TSFILE_SUFFIX);
+ if (suffixIndex > 0) {
+ return Long.parseLong(tsFileIdPart.substring(0, suffixIndex));
+ }
+ return Long.parseLong(tsFileIdPart);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("Failed to parse TSFile ID from file name: {}", fileName, e);
+ }
+ return -1;
+ }
+
+ /**
+ * Generate final TSFile file name (for renaming)
+ *
+ *
Can customize naming rules as needed, for example: - Keep original file name - Use
+ * timestamp: {timestamp}.tsfile - Use sequence number: tsfile_{sequence}.tsfile
+ *
+ * @param originalFileName original temporary file name
+ * @param targetDirectory target directory
+ * @param sequenceNumber sequence number (optional, for generating unique file names)
+ * @return final TSFile file name
+ */
+ public static String generateFinalTsFileName(
+ final String originalFileName, final File targetDirectory, final long sequenceNumber) {
+ // Default to keep original file name, add sequence number if file already exists in target
+ // directory
+ final String baseName = originalFileName != null ? originalFileName : "tsfile";
+ final File targetFile = new File(targetDirectory, baseName);
+
+ if (targetFile.exists() && sequenceNumber > 0) {
+ // If file exists, add sequence number
+ final String nameWithoutSuffix = baseName.replace(TsFileConstant.TSFILE_SUFFIX, "");
+ return nameWithoutSuffix + "_" + sequenceNumber + TsFileConstant.TSFILE_SUFFIX;
+ }
+
+ return baseName;
+ }
+
+ /**
+ * Validate if file name conforms to temporary file naming rules
+ *
+ * @param fileName file name
+ * @return true if conforms to rules, false otherwise
+ */
+ public static boolean isValidTempFileName(final String fileName) {
+ if (fileName == null || !fileName.endsWith(TsFileConstant.TSFILE_SUFFIX)) {
+ return false;
+ }
+ return fileName.startsWith(TS_FILE_PREFIX + "_");
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 8ea973fbf0a3..ac3bc7851cec 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -52,6 +52,7 @@
import java.io.Closeable;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
public class PipeDataRegionAssigner implements Closeable {
@@ -72,6 +73,9 @@ public class PipeDataRegionAssigner implements Closeable {
private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter();
+ /** Track whether this data region has Object type write operations */
+ public final AtomicBoolean hasObjectData = new AtomicBoolean(false);
+
public String getDataRegionId() {
return dataRegionId;
}
@@ -162,6 +166,9 @@ private void assignToExtractor(
return;
}
+ event.setHasObject(hasObjectData.get());
+ event.scanForObjectData();
+
final PipeRealtimeEvent copiedEvent =
event.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
extractor.getPipeName(),
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 8c49dd0a3dd8..a4f2b0cd71a2 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -34,6 +34,7 @@
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
@@ -54,13 +55,25 @@ public class PipeInsertionDataNodeListener {
private final AtomicInteger listenToTsFileExtractorCount = new AtomicInteger(0);
private final AtomicInteger listenToInsertNodeExtractorCount = new AtomicInteger(0);
+ // Independent tracking for Object type write operations
+ // This map has independent lifecycle from assigner
+ private final ConcurrentMap dataRegionId2HasObjectWrite =
+ new ConcurrentHashMap<>();
+
//////////////////////////// start & stop ////////////////////////////
public synchronized void startListenAndAssign(
final String dataRegionId, final PipeRealtimeDataRegionSource extractor) {
- dataRegionId2Assigner
- .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner(dataRegionId))
- .startAssignTo(extractor);
+ final PipeDataRegionAssigner assigner =
+ dataRegionId2Assigner.computeIfAbsent(
+ dataRegionId, k -> new PipeDataRegionAssigner(dataRegionId));
+ assigner.startAssignTo(extractor);
+
+ // Sync Object write flag from independent tracking to assigner
+ final AtomicBoolean hasObjectWrite = dataRegionId2HasObjectWrite.get(dataRegionId);
+ if (hasObjectWrite != null && hasObjectWrite.get()) {
+ assigner.hasObjectData.set(true);
+ }
if (extractor.isNeedListenToTsFile()) {
listenToTsFileExtractorCount.incrementAndGet();
@@ -117,6 +130,25 @@ public void listenToTsFile(
assigner.isTableModel(), databaseName, tsFileResource, isLoaded));
}
+ /**
+ * Listen to Object type write operations. Mark that this data region has Object type write
+ * operations. Use independent map to track Object write even before assigner is created.
+ *
+ * @param dataRegionId the data region id
+ */
+ public void listenToObjectNode(final String dataRegionId) {
+ // Track Object write independently
+ dataRegionId2HasObjectWrite
+ .computeIfAbsent(dataRegionId, k -> new AtomicBoolean(false))
+ .set(true);
+
+ // Also mark assigner if it exists
+ final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId);
+ if (assigner != null) {
+ assigner.hasObjectData.set(true);
+ }
+ }
+
public void listenToInsertNode(
final String dataRegionId,
final String databaseName,
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 1124e33a7df9..e2983f320248 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3612,6 +3612,7 @@ public void writeObject(ObjectNode objectNode) throws Exception {
}
FileMetrics.getInstance().increaseObjectFileNum(1);
FileMetrics.getInstance().increaseObjectFileSize(objectFile.length());
+ PipeInsertionDataNodeListener.getInstance().listenToObjectNode(dataRegionIdString);
}
getWALNode()
.ifPresent(walNode -> walNode.log(TsFileProcessor.MEMTABLE_NOT_EXIST, objectNode));
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
index a011490ca497..75b5f83ffe7d 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java
@@ -85,6 +85,7 @@ public Optional visitLoadTsFile(
null,
"root",
null,
+ true,
true)) {
for (final TabletInsertionEvent tabletInsertionEvent : parser.toTabletInsertionEvents()) {
if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) {
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
index 226966454aaa..f26a28f8903c 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -97,6 +97,7 @@ public Optional visitLoadFile(
Long.MAX_VALUE,
null,
null,
+ true,
true)) {
for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) {
final PipeTransferTabletRawReq tabletRawReq =
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 7bfde3b158d5..bc466cfc31f5 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -584,7 +584,7 @@ private void testTsFilePointNum(
? new TsFileInsertionEventQueryParser(
tsFile, pattern, startTime, endTime, tsFileInsertionEvent)
: new TsFileInsertionEventScanParser(
- tsFile, pattern, startTime, endTime, null, tsFileInsertionEvent, false)) {
+ tsFile, pattern, startTime, endTime, null, tsFileInsertionEvent, false, true)) {
final AtomicInteger count1 = new AtomicInteger(0);
final AtomicInteger count2 = new AtomicInteger(0);
final AtomicInteger count3 = new AtomicInteger(0);
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
index 74bc0d9815aa..242499e68235 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java
@@ -104,6 +104,9 @@ public enum BuiltinPipePlugin {
WRITE_BACK_SINK("write-back-sink", WriteBackSink.class),
SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class),
PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", PipeConsensusAsyncSink.class),
+
+ // TsFile export sink (datanode only, placeholder class here)
+ TSFILE_SINK("tsfile-sink", DoNothingSink.class),
;
private final String pipePluginName;
diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
index afe0ebe22df5..f90a119ce227 100644
--- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
+++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java
@@ -488,6 +488,70 @@ public boolean isReleased() {
return isReleased.get();
}
+ /**
+ * Scan and detect if the event contains any Object type (BLOB or STRING) data. This method should
+ * be called before {@link #hasObjectData()} and {@link #getObjectPaths()}. Default implementation
+ * does nothing. Only events that may contain Object types need to override this.
+ */
+ public void scanForObjectData() {
+ // Default implementation does nothing
+ }
+
+ /**
+ * Set whether the event has Object type data manually. This can be used to manually mark Object
+ * data without scanning. Default implementation does nothing. Only events that may contain Object
+ * types need to override this.
+ *
+ * @param hasObject whether the event has Object type data
+ */
+ public void setHasObject(boolean hasObject) {
+ // Default implementation does nothing
+ }
+
+ /**
+ * Check if the event contains any Object type (BLOB or STRING) data. Default implementation
+ * returns false. Only events that may contain Object types need to override this.
+ *
+ * @return true if the event contains Object type data, false otherwise
+ */
+ public boolean hasObjectData() {
+ return false;
+ }
+
+ /**
+ * Get the paths of measurements that contain Object type (BLOB or STRING) data. Default
+ * implementation returns an empty array. Only events that may contain Object types need to
+ * override this.
+ *
+ * @return array of measurement names that contain Object type data, empty array if none
+ */
+ public String[] getObjectPaths() {
+ return new String[0];
+ }
+
+ /////////////////////////// Object Resource Management ///////////////////////////
+
+ /**
+ * Get the TsFileResource associated with this event. Default implementation returns null. Only
+ * events that are associated with a TsFile need to override this.
+ *
+ * @return TsFileResource if this event is associated with a TsFile, null otherwise
+ */
+ public Object getTsFileResource() {
+ // Default implementation returns null
+ return null;
+ }
+
+ /**
+ * Set the TsFileResource for this event. Default implementation does nothing. Only events that
+ * need to store TsFileResource need to override this.
+ *
+ * @param tsFileResource the TsFileResource to set
+ */
+ public void setTsFileResource(Object tsFileResource) {
+ // Default implementation does nothing
+ }
+
/**
* Used for pipeConsensus. In PipeConsensus, we only need committerKey, commitId and rebootTimes
* to uniquely identify an event