diff --git a/iotdb-core/datanode/pom.xml b/iotdb-core/datanode/pom.xml index 80bebe90f779..4ea372b735db 100644 --- a/iotdb-core/datanode/pom.xml +++ b/iotdb-core/datanode/pom.xml @@ -292,6 +292,11 @@ at.yawk.lz4 lz4-java + + com.jcraft + jsch + 0.1.55 + junit junit diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java index 536cf71cdb8d..d41a61d06910 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java @@ -30,6 +30,7 @@ import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.PipeConsensusAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.db.pipe.sink.protocol.thrift.sync.IoTDBDataRegionSyncSink; +import org.apache.iotdb.db.pipe.sink.protocol.tsfile.TsFileSink; import org.apache.iotdb.db.pipe.sink.protocol.websocket.WebSocketSink; import org.apache.iotdb.db.pipe.sink.protocol.writeback.WriteBackSink; @@ -97,5 +98,6 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName(), PipeConsensusAsyncSink::new); + pluginConstructors.put(BuiltinPipePlugin.TSFILE_SINK.getPipePluginName(), TsFileSink::new); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/ObjectDataParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/ObjectDataParser.java new file mode 100644 index 000000000000..63e00aa80267 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/ObjectDataParser.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.event.common.tablet; + +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; +import org.apache.tsfile.utils.Pair; + +import java.nio.charset.StandardCharsets; +import java.util.Objects; + +/** + * Utility class for parsing Object type data (BLOB). Object data storage format: - First 8 bytes + * (Long): file offset + content length - Following bytes: file relative path (UTF-8 encoded) + */ +public class ObjectDataParser { + + /** Long type byte length */ + private static final int LONG_BYTES = Long.BYTES; + + /** + * Parse Binary object to extract Object data information + * + * @param binary Binary object + * @return Pair(offsetPlusLength, relativePath), null if parse failed + */ + public static Pair parse(Binary binary) { + if (Objects.isNull(binary)) { + return null; + } + + byte[] bytes = binary.getValues(); + return parse(bytes); + } + + /** + * Parse byte array to extract Object data information + * + * @param bytes byte array + * @return Pair(offsetPlusLength, relativePath), null if parse failed + */ + public static Pair parse(byte[] bytes) { + if (Objects.isNull(bytes) || bytes.length < LONG_BYTES) { + return null; + } + + try { + // Parse first 8 bytes: length + long offsetPlusLength = BytesUtils.bytesToLong(bytes, 0); + + // Parse following bytes: file path + int pathLength = bytes.length - LONG_BYTES; + if (pathLength <= 0) { + return null; + } + + byte[] pathBytes = new byte[pathLength]; + System.arraycopy(bytes, LONG_BYTES, pathBytes, 0, pathLength); + String relativePath = new String(pathBytes, StandardCharsets.UTF_8); + + return new Pair<>(offsetPlusLength, relativePath); + } catch (Exception e) { + return null; + } + } + + /** + * Extract file length information from byte array + * + * @param bytes byte array + * @return offset + content.length, -1 if extraction failed + */ + public static long extractLength(byte[] bytes) { + if (Objects.isNull(bytes) || bytes.length < LONG_BYTES) { + return -1; + } + + try { + return BytesUtils.bytesToLong(bytes, 0); + } catch (Exception e) { + return -1; + } + } + + /** + * Extract file length information from Binary object + * + * @param binary Binary object + * @return offset + content.length, -1 if extraction failed + */ + public static long extractLength(Binary binary) { + if (Objects.isNull(binary)) { + return -1; + } + return extractLength(binary.getValues()); + } + + /** + * Extract file relative path from byte array + * + * @param bytes byte array + * @return file relative path, null if extraction failed + */ + public static String extractPath(byte[] bytes) { + if (Objects.isNull(bytes) || bytes.length <= LONG_BYTES) { + return null; + } + + try { + int pathLength = bytes.length - LONG_BYTES; + byte[] pathBytes = new byte[pathLength]; + System.arraycopy(bytes, LONG_BYTES, pathBytes, 0, pathLength); + return new String(pathBytes, StandardCharsets.UTF_8); + } catch (Exception e) { + return null; + } + } + + /** + * Extract file relative path from Binary object + * + * @param binary Binary object + * @return file relative path, null if extraction failed + */ + public static String extractPath(Binary binary) { + if (Objects.isNull(binary)) { + return null; + } + return extractPath(binary.getValues()); + } + + /** + * Validate if byte array is valid Object data format + * + * @param bytes byte array + * @return true if format is valid, false otherwise + */ + public static boolean isValidObjectData(byte[] bytes) { + if (Objects.isNull(bytes) || bytes.length <= LONG_BYTES) { + return false; + } + + try { + // Try to parse length + long length = BytesUtils.bytesToLong(bytes, 0); + if (length < 0) { + return false; + } + + // Try to parse path + int pathLength = bytes.length - LONG_BYTES; + byte[] pathBytes = new byte[pathLength]; + System.arraycopy(bytes, LONG_BYTES, pathBytes, 0, pathLength); + String path = new String(pathBytes, StandardCharsets.UTF_8); + + return path != null && !path.isEmpty(); + } catch (Exception e) { + return false; + } + } + + /** + * Validate if Binary object is valid Object data format + * + * @param binary Binary object + * @return true if format is valid, false otherwise + */ + public static boolean isValidObjectData(Binary binary) { + if (Objects.isNull(binary)) { + return false; + } + return isValidObjectData(binary.getValues()); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java index f5ae873bbac3..bf9f6a08dae9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java @@ -49,12 +49,14 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.exception.PipeException; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Accountable; import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.write.UnSupportedDataTypeException; @@ -64,6 +66,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.Objects; @@ -96,10 +99,19 @@ public class PipeInsertNodeTabletInsertionEvent extends PipeInsertionEvent private long extractTime = 0; + // Object type scanning fields + // objectDataPaths == null means not scanned, != null means scanned + // hasObjectData defaults to true to ensure scanning will be performed + private boolean hasObjectData = true; + private String[] objectDataPaths = null; + + private TsFileResource tsFileResource; + public PipeInsertNodeTabletInsertionEvent( final Boolean isTableModel, final String databaseNameFromDataRegion, - final InsertNode insertNode) { + final InsertNode insertNode, + final TsFileResource resource) { this( isTableModel, databaseNameFromDataRegion, @@ -114,7 +126,8 @@ public PipeInsertNodeTabletInsertionEvent( null, true, Long.MIN_VALUE, - Long.MAX_VALUE); + Long.MAX_VALUE, + resource); } private PipeInsertNodeTabletInsertionEvent( @@ -131,7 +144,8 @@ private PipeInsertNodeTabletInsertionEvent( final String cliHostname, final boolean skipIfNoPrivileges, final long startTime, - final long endTime) { + final long endTime, + final TsFileResource tsFileResource) { super( pipeName, creationTime, @@ -150,6 +164,7 @@ private PipeInsertNodeTabletInsertionEvent( this.progressIndex = insertNode.getProgressIndex(); this.allocatedMemoryBlock = new AtomicReference<>(); + this.tsFileResource = tsFileResource; } public InsertNode getInsertNode() { @@ -177,6 +192,166 @@ public long getExtractTime() { return extractTime; } + @Override + public void scanForObjectData() { + // If already scanned (objectDataPaths != null), return directly + if (objectDataPaths != null) { + return; + } + + // If flag is false (no Object), do not scan, set to empty directly + if (!hasObjectData) { + objectDataPaths = new String[0]; + return; + } + + if (Objects.isNull(insertNode)) { + hasObjectData = false; + objectDataPaths = new String[0]; + return; + } + + try { + final TSDataType[] types = insertNode.getDataTypes(); + final String[] measurements = insertNode.getMeasurements(); + + if (Objects.isNull(types) || Objects.isNull(measurements)) { + hasObjectData = false; + objectDataPaths = new String[0]; + return; + } + + final List objectPaths = new ArrayList<>(); + + // Scan actual data content, not just types + if (insertNode instanceof InsertRowNode) { + final InsertRowNode rowNode = (InsertRowNode) insertNode; + final Object[] values = rowNode.getValues(); + if (values != null) { + final int maxIndex = Math.min(types.length, Math.min(values.length, measurements.length)); + for (int i = 0; i < maxIndex; i++) { + if (types[i] == TSDataType.OBJECT && values[i] != null && measurements[i] != null) { + // Parse Binary to extract Object file path + if (values[i] instanceof org.apache.tsfile.utils.Binary) { + org.apache.tsfile.utils.Pair result = + ObjectDataParser.parse((org.apache.tsfile.utils.Binary) values[i]); + if (result != null && result.getRight() != null) { + objectPaths.add(result.getRight()); + } + } + } + } + } + } else if (insertNode instanceof InsertTabletNode) { + final InsertTabletNode tabletNode = (InsertTabletNode) insertNode; + final Object[] columns = tabletNode.getColumns(); + final org.apache.tsfile.utils.BitMap[] bitMaps = tabletNode.getBitMaps(); + if (columns != null) { + for (int i = 0; i < types.length; i++) { + if (types[i] == TSDataType.OBJECT && columns[i] != null && measurements[i] != null) { + // Extract all Object paths from this column (considering BitMap) + org.apache.tsfile.utils.BitMap bitMap = + (bitMaps != null && i < bitMaps.length) ? bitMaps[i] : null; + final List columnObjectPaths = + extractObjectPaths(columns[i], bitMap, tabletNode.getRowCount()); + if (!columnObjectPaths.isEmpty()) { + objectPaths.addAll(columnObjectPaths); + } + } + } + } + } else if (insertNode instanceof InsertRowsNode) { + final List rowNodes = ((InsertRowsNode) insertNode).getInsertRowNodeList(); + if (rowNodes != null) { + for (InsertRowNode rowNode : rowNodes) { + final Object[] values = rowNode.getValues(); + final TSDataType[] rowTypes = rowNode.getDataTypes(); + final String[] rowMeasurements = rowNode.getMeasurements(); + if (values != null && rowTypes != null && rowMeasurements != null) { + for (int i = 0; i < rowTypes.length; i++) { + if (rowTypes[i] == TSDataType.OBJECT + && values[i] != null + && rowMeasurements[i] != null) { + // Parse Binary to extract Object file path + if (values[i] instanceof org.apache.tsfile.utils.Binary) { + org.apache.tsfile.utils.Pair result = + ObjectDataParser.parse((org.apache.tsfile.utils.Binary) values[i]); + if (result != null && result.getRight() != null) { + objectPaths.add(result.getRight()); + } + } + } + } + } + } + } + } + + hasObjectData = !objectPaths.isEmpty(); + objectDataPaths = objectPaths.toArray(new String[0]); + } catch (final Exception e) { + LOGGER.warn( + "Exception occurred when scanning for object data in PipeInsertNodeTabletInsertionEvent: {}", + this, + e); + hasObjectData = false; + objectDataPaths = new String[0]; + } + } + + /** + * Extract all Object file paths from column data (considering BitMap) + * + * @param columnData column data (Binary array) + * @param bitMap BitMap for marking null values + * @param rowCount row count + * @return List of Object file paths + */ + private List extractObjectPaths( + Object columnData, org.apache.tsfile.utils.BitMap bitMap, int rowCount) { + final List paths = new ArrayList<>(); + if (columnData == null) { + return paths; + } + + // Binary array (BLOB type) + if (columnData instanceof org.apache.tsfile.utils.Binary[]) { + org.apache.tsfile.utils.Binary[] binaries = (org.apache.tsfile.utils.Binary[]) columnData; + for (int i = 0; i < Math.min(binaries.length, rowCount); i++) { + // Check if Binary is not null and not marked as null in BitMap + if (binaries[i] != null) { + // If no BitMap, or position not marked as null in BitMap + if (bitMap == null || !bitMap.isMarked(i)) { + // Parse Binary to extract Object file path + org.apache.tsfile.utils.Pair result = ObjectDataParser.parse(binaries[i]); + if (result != null && result.getRight() != null) { + paths.add(result.getRight()); + } + } + } + } + } + + return paths; + } + + /////////////////////////// Object Related Methods /////////////////////////// + + @Override + public boolean hasObjectData() { + return hasObjectData; + } + + @Override + public void setHasObject(boolean hasObject) { + this.hasObjectData = hasObject; + } + + @Override + public String[] getObjectPaths() { + return objectDataPaths != null ? objectDataPaths : new String[0]; + } + /////////////////////////// EnrichedEvent /////////////////////////// @Override @@ -184,6 +359,12 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa extractTime = System.nanoTime(); try { if (Objects.nonNull(pipeName)) { + scanForObjectData(); + if (hasObjectData) { + PipeDataNodeResourceManager.object() + .linkObjectFiles(tsFileResource, Arrays.asList(objectDataPaths), pipeName); + PipeDataNodeResourceManager.object().increaseReference(tsFileResource, pipeName); + } PipeDataNodeSinglePipeMetrics.getInstance() .increaseInsertNodeEventCount(pipeName, creationTime); PipeDataNodeAgent.task() @@ -200,6 +381,10 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa @Override public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) { try { + if (hasObjectData) { + PipeDataNodeResourceManager.object().decreaseReference(tsFileResource, pipeName); + } + // release the parsers' memory and close memory block if (eventParsers != null) { eventParsers.clear(); @@ -252,21 +437,29 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP if (Objects.isNull(node)) { throw new PipeException("InsertNode has been released"); } - return new PipeInsertNodeTabletInsertionEvent( - getRawIsTableModelEvent(), - getSourceDatabaseNameFromDataRegion(), - node, - pipeName, - creationTime, - pipeTaskMeta, - treePattern, - tablePattern, - userId, - userName, - cliHostname, - skipIfNoPrivileges, - startTime, - endTime); + final PipeInsertNodeTabletInsertionEvent copiedEvent = + new PipeInsertNodeTabletInsertionEvent( + getRawIsTableModelEvent(), + getSourceDatabaseNameFromDataRegion(), + node, + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + userId, + userName, + cliHostname, + skipIfNoPrivileges, + startTime, + endTime, + tsFileResource); + + // Copy Object-related state + copiedEvent.hasObjectData = this.hasObjectData; + copiedEvent.objectDataPaths = this.objectDataPaths; + + return copiedEvent; } @Override @@ -518,19 +711,24 @@ public List toRawTabletInsertionEvents() { final List events = initEventParsers().stream() .map( - container -> - new PipeRawTabletInsertionEvent( - getRawIsTableModelEvent(), - getSourceDatabaseNameFromDataRegion(), - getRawTableModelDataBase(), - getRawTreeModelDataBase(), - container.convertToTablet(), - container.isAligned(), - pipeName, - creationTime, - pipeTaskMeta, - this, - false)) + container -> { + final PipeRawTabletInsertionEvent event = + new PipeRawTabletInsertionEvent( + getRawIsTableModelEvent(), + getSourceDatabaseNameFromDataRegion(), + getRawTableModelDataBase(), + getRawTreeModelDataBase(), + container.convertToTablet(), + container.isAligned(), + pipeName, + creationTime, + pipeTaskMeta, + this, + false); + event.setHasObject(hasObjectData); + event.setTsFileResource(tsFileResource); + return event; + }) .filter(event -> !event.hasNoNeedParsingAndIsEmpty()) .collect(Collectors.toList()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java index adcef5128f52..0407217ef927 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java @@ -37,13 +37,20 @@ import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.pipe.resource.memory.PipeTabletMemoryBlock; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.pipe.api.access.Row; import org.apache.iotdb.pipe.api.collector.RowCollector; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.RamUsageEstimator; import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -52,6 +59,8 @@ public class PipeRawTabletInsertionEvent extends PipeInsertionEvent implements TabletInsertionEvent, ReferenceTrackableEvent, AutoCloseable { + private static final Logger LOGGER = LoggerFactory.getLogger(PipeRawTabletInsertionEvent.class); + // For better calculation private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(PipeRawTabletInsertionEvent.class); @@ -68,6 +77,14 @@ public class PipeRawTabletInsertionEvent extends PipeInsertionEvent private volatile ProgressIndex overridingProgressIndex; + // Object type scanning fields + // objectDataPaths == null means not scanned, != null means scanned + private boolean hasObjectData = true; + private String[] objectDataPaths = null; + + // TSFile resource, used for Object file management + private TsFileResource tsFileResource; + private PipeRawTabletInsertionEvent( final Boolean isTableModelEvent, final String databaseName, @@ -251,19 +268,36 @@ public PipeRawTabletInsertionEvent( @Override public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) { - PipeDataNodeResourceManager.memory() - .forceResize( - allocatedMemoryBlock, - PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + INSTANCE_SIZE); - if (Objects.nonNull(pipeName)) { - PipeDataNodeSinglePipeMetrics.getInstance() - .increaseRawTabletEventCount(pipeName, creationTime); + try { + if (hasObjectData && tsFileResource != null) { + // Only increase reference count, do not link files + PipeDataNodeResourceManager.object().increaseReference(tsFileResource, pipeName); + } + + PipeDataNodeResourceManager.memory() + .forceResize( + allocatedMemoryBlock, + PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + INSTANCE_SIZE); + if (Objects.nonNull(pipeName)) { + PipeDataNodeSinglePipeMetrics.getInstance() + .increaseRawTabletEventCount(pipeName, creationTime); + } + return true; + } catch (final Exception e) { + LOGGER.warn( + "Failed to increase resource reference count for tablet event. Holder Message: {}", + holderMessage, + e); + return false; } - return true; } @Override public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) { + if (hasObjectData && tsFileResource != null) { + PipeDataNodeResourceManager.object().decreaseReference(tsFileResource, pipeName); + } + if (Objects.nonNull(pipeName)) { PipeDataNodeSinglePipeMetrics.getInstance() .decreaseRawTabletEventCount(pipeName, creationTime); @@ -341,26 +375,34 @@ public EnrichedEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( final boolean skipIfNoPrivileges, final long startTime, final long endTime) { - return new PipeRawTabletInsertionEvent( - getRawIsTableModelEvent(), - getSourceDatabaseNameFromDataRegion(), - getRawTableModelDataBase(), - getRawTreeModelDataBase(), - tablet, - isAligned, - sourceEvent, - needToReport, - pipeName, - creationTime, - pipeTaskMeta, - treePattern, - tablePattern, - userId, - userName, - cliHostname, - skipIfNoPrivileges, - startTime, - endTime); + final PipeRawTabletInsertionEvent copiedEvent = + new PipeRawTabletInsertionEvent( + getRawIsTableModelEvent(), + getSourceDatabaseNameFromDataRegion(), + getRawTableModelDataBase(), + getRawTreeModelDataBase(), + tablet, + isAligned, + sourceEvent, + needToReport, + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + userId, + userName, + cliHostname, + skipIfNoPrivileges, + startTime, + endTime); + + // Copy Object-related state + copiedEvent.setTsFileResource(this.tsFileResource); + copiedEvent.hasObjectData = this.hasObjectData; + copiedEvent.objectDataPaths = this.objectDataPaths; + + return copiedEvent; } @Override @@ -401,6 +443,136 @@ public EnrichedEvent getSourceEvent() { return sourceEvent; } + @Override + public void scanForObjectData() { + // If already scanned (objectDataPaths != null), return directly + if (objectDataPaths != null) { + return; + } + + // If flag is false (no Object), do not scan, set to empty directly + if (!hasObjectData) { + objectDataPaths = new String[0]; + return; + } + + final Tablet tablet = this.tablet; + if (Objects.isNull(tablet)) { + hasObjectData = false; + objectDataPaths = new String[0]; + return; + } + + try { + final List schemas = tablet.getSchemas(); + final Object[] values = tablet.getValues(); + final org.apache.tsfile.utils.BitMap[] bitMaps = tablet.getBitMaps(); + + if (Objects.isNull(schemas) || schemas.isEmpty() || Objects.isNull(values)) { + hasObjectData = false; + objectDataPaths = new String[0]; + return; + } + + final List objectPaths = new ArrayList<>(); + + // Scan actual values data, not just schema types + for (int i = 0; i < schemas.size() && i < values.length; i++) { + final IMeasurementSchema schema = schemas.get(i); + if (schema == null) { + continue; + } + final TSDataType dataType = schema.getType(); + + // Only OBJECT type with actual non-null data + if (dataType == TSDataType.OBJECT && values[i] != null) { + // Extract all Object paths from this column (considering BitMap) + org.apache.tsfile.utils.BitMap bitMap = + (bitMaps != null && i < bitMaps.length) ? bitMaps[i] : null; + final List columnObjectPaths = + extractObjectPaths(values[i], bitMap, tablet.getRowSize()); + if (!columnObjectPaths.isEmpty()) { + objectPaths.addAll(columnObjectPaths); + } + } + } + + hasObjectData = !objectPaths.isEmpty(); + objectDataPaths = objectPaths.toArray(new String[0]); + } catch (final Exception e) { + LOGGER.warn( + "Exception occurred when scanning for object data in PipeRawTabletInsertionEvent: {}", + this, + e); + hasObjectData = false; + objectDataPaths = new String[0]; + } + } + + /** + * Extract all Object file paths from column data (considering BitMap) + * + * @param columnData column data (Binary array) + * @param bitMap BitMap for marking null values + * @param rowSize row count + * @return List of Object file paths + */ + private List extractObjectPaths( + Object columnData, org.apache.tsfile.utils.BitMap bitMap, int rowSize) { + final List paths = new ArrayList<>(); + if (columnData == null) { + return paths; + } + + // Binary array (BLOB type) + if (columnData instanceof org.apache.tsfile.utils.Binary[]) { + org.apache.tsfile.utils.Binary[] binaries = (org.apache.tsfile.utils.Binary[]) columnData; + final int maxIndex = Math.min(binaries.length, rowSize); + for (int i = 0; i < maxIndex; i++) { + // Check if Binary is not null and not marked as null in BitMap + if (binaries[i] != null) { + // If no BitMap, or position not marked as null in BitMap + if (bitMap == null || !bitMap.isMarked(i)) { + // Parse Binary to extract Object file path + org.apache.tsfile.utils.Pair result = ObjectDataParser.parse(binaries[i]); + if (result != null && result.getRight() != null) { + paths.add(result.getRight()); + } + } + } + } + } + + return paths; + } + + /////////////////////////// Object Related Methods /////////////////////////// + + @Override + public void setHasObject(boolean hasObject) { + this.hasObjectData = hasObject; + } + + @Override + public boolean hasObjectData() { + return hasObjectData; + } + + @Override + public String[] getObjectPaths() { + return objectDataPaths != null ? objectDataPaths : new String[0]; + } + + @Override + public TsFileResource getTsFileResource() { + return tsFileResource; + } + + @Override + public void setTsFileResource(Object tsFileResource) { + this.tsFileResource = (TsFileResource) tsFileResource; + } + /////////////////////////// TabletInsertionEvent /////////////////////////// @Override @@ -450,18 +622,24 @@ public long count() { /////////////////////////// parsePatternOrTime /////////////////////////// public PipeRawTabletInsertionEvent parseEventWithPatternOrTime() { - return new PipeRawTabletInsertionEvent( - getRawIsTableModelEvent(), - getSourceDatabaseNameFromDataRegion(), - getRawTableModelDataBase(), - getRawTreeModelDataBase(), - convertToTablet(), - isAligned, - pipeName, - creationTime, - pipeTaskMeta, - this, - needToReport); + final PipeRawTabletInsertionEvent event = + new PipeRawTabletInsertionEvent( + getRawIsTableModelEvent(), + getSourceDatabaseNameFromDataRegion(), + getRawTableModelDataBase(), + getRawTreeModelDataBase(), + convertToTablet(), + isAligned, + pipeName, + creationTime, + pipeTaskMeta, + this, + needToReport); + + // Set tsFileResource + event.setTsFileResource(tsFileResource); + + return event; } public boolean hasNoNeedParsingAndIsEmpty() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java index e10051325b31..389f1a61a77d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java @@ -559,6 +559,7 @@ private static Object filterValueColumnsByRowIndexList( case TEXT: case BLOB: case STRING: + case OBJECT: { final Binary[] binaryValueColumns = isSingleOriginValueColumn @@ -618,6 +619,7 @@ private void fillNullValue( case TEXT: case BLOB: case STRING: + case OBJECT: final Binary[] columns = new Binary[rowSize]; Arrays.fill(columns, Binary.EMPTY_VALUE); valueColumns[columnIndex] = columns; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 9db3cb57e6a0..e0b81b4baa96 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -54,8 +54,10 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; @@ -93,6 +95,12 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent protected volatile ProgressIndex overridingProgressIndex; private Set tableNames; + // Object type scanning fields + // objectDataPaths == null means not scanned, != null means scanned + // hasObjectData defaults to true to ensure scanning will be performed + private volatile boolean hasObjectData = true; + private volatile String[] objectDataPaths = null; + public PipeTsFileInsertionEvent( final Boolean isTableModelEvent, final String databaseNameFromDataRegion, @@ -307,12 +315,99 @@ public long getExtractTime() { return extractTime; } + /////////////////////////// Object Related Methods /////////////////////////// + + @Override + public void scanForObjectData() { + // If already scanned (objectDataPaths != null), return directly + if (objectDataPaths != null) { + return; + } + + // If flag is false (no Object), do not scan, set to empty directly + if (!hasObjectData) { + objectDataPaths = new String[0]; + return; + } + + // TsFile data scanning is complex and expensive, return default values for now + // To scan actual data content precisely, need to iterate all chunk data in TsFile + // If flag is true but actual scan finds no data, set to false + hasObjectData = false; + objectDataPaths = new String[0]; + } + + @Override + public void setHasObject(boolean hasObject) { + this.hasObjectData = hasObject; + } + + @Override + public boolean hasObjectData() { + return hasObjectData; + } + + @Override + public String[] getObjectPaths() { + return objectDataPaths != null ? objectDataPaths : new String[0]; + } + + public boolean linkObjectFiles() { + // First scan Object data internally + scanForObjectData(); + + // If no Object data, return success directly + if (!hasObjectData()) { + return true; + } + + final String[] objectPaths = getObjectPaths(); + if (objectPaths == null || objectPaths.length == 0) { + return true; + } + + try { + // Convert String[] to List + final List objectPathList = new ArrayList<>(); + for (final String path : objectPaths) { + if (path != null && !path.isEmpty()) { + objectPathList.add(path); + } + } + + // Call PipeObjectResourceManager to link Object files + return PipeDataNodeResourceManager.object() + .linkObjectFiles(resource, objectPathList, pipeName); + } catch (final Exception e) { + LOGGER.warn( + "Failed to link object files for TsFile {}: {}", + tsFile != null ? tsFile.getPath() : "null", + e.getMessage(), + e); + return false; + } + } + + public TsFileResource getResource() { + return resource; + } + + @Override + public TsFileResource getTsFileResource() { + return resource; + } + /////////////////////////// EnrichedEvent /////////////////////////// @Override public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) { extractTime = System.nanoTime(); try { + scanForObjectData(); + if (Objects.nonNull(pipeName) && hasObjectData) { + PipeDataNodeResourceManager.object().setTsFileClosed(resource, pipeName); + PipeDataNodeResourceManager.object().increaseReference(resource, pipeName); + } tsFile = PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, pipeName); if (isWithMod) { modFile = @@ -337,6 +432,11 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa @Override public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) { try { + if (hasObjectData) { + // Object files will be cleaned up automatically when TSFile reference count reaches 0 + // No need to manually unpin here + PipeDataNodeResourceManager.object().decreaseReference(resource, pipeName); + } PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile, pipeName); if (isWithMod) { PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile, pipeName); @@ -411,26 +511,33 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep final boolean skipIfNoPrivileges, final long startTime, final long endTime) { - return new PipeTsFileInsertionEvent( - getRawIsTableModelEvent(), - getSourceDatabaseNameFromDataRegion(), - resource, - tsFile, - isWithMod, - isLoaded, - isGeneratedByHistoricalExtractor, - tableNames, - pipeName, - creationTime, - pipeTaskMeta, - treePattern, - tablePattern, - userId, - userName, - cliHostname, - skipIfNoPrivileges, - startTime, - endTime); + final PipeTsFileInsertionEvent copiedEvent = + new PipeTsFileInsertionEvent( + getRawIsTableModelEvent(), + getSourceDatabaseNameFromDataRegion(), + resource, + tsFile, + isWithMod, + isLoaded, + isGeneratedByHistoricalExtractor, + tableNames, + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + userId, + userName, + cliHostname, + skipIfNoPrivileges, + startTime, + endTime); + + // Copy Object-related state + copiedEvent.hasObjectData = this.hasObjectData; + copiedEvent.objectDataPaths = this.objectDataPaths; + + return copiedEvent; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java index 04b2a3bfd81c..c079d38171c1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParser.java @@ -30,17 +30,22 @@ import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.utils.datastructure.PatternTreeMapFactory; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.expression.impl.GlobalTimeExpression; import org.apache.tsfile.read.filter.factory.TimeFilterApi; +import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.record.Tablet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Iterator; public abstract class TsFileInsertionEventParser implements AutoCloseable { @@ -58,6 +63,12 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable { protected final PipeTaskMeta pipeTaskMeta; // used to report progress protected final PipeInsertionEvent sourceEvent; // used to report progress + // TsFileResource, used for Object file management + protected final TsFileResource tsFileResource; + // Flag indicating whether Tablet has Object data, defaults to true to ensure scanning will be + // performed + protected boolean hasObjectData = true; + // mods entry protected PipeMemoryBlock allocatedMemoryBlockForModifications; protected PatternTreeMap currentModifications; @@ -72,6 +83,8 @@ public abstract class TsFileInsertionEventParser implements AutoCloseable { protected Iterable tabletInsertionIterable; + protected final boolean notOnlyNeedObject; + protected TsFileInsertionEventParser( final String pipeName, final long creationTime, @@ -80,7 +93,9 @@ protected TsFileInsertionEventParser( final long startTime, final long endTime, final PipeTaskMeta pipeTaskMeta, - final PipeInsertionEvent sourceEvent) { + final PipeInsertionEvent sourceEvent, + final TsFileResource tsFileResource, + final boolean notOnlyNeedObject) { this.pipeName = pipeName; this.creationTime = creationTime; @@ -96,10 +111,19 @@ protected TsFileInsertionEventParser( this.pipeTaskMeta = pipeTaskMeta; this.sourceEvent = sourceEvent; + // Get TsFileResource and hasObjectData from sourceEvent + this.tsFileResource = + tsFileResource != null + ? tsFileResource + : (sourceEvent != null ? (TsFileResource) sourceEvent.getTsFileResource() : null); + this.hasObjectData = sourceEvent != null && sourceEvent.hasObjectData(); + this.allocatedMemoryBlockForTablet = PipeDataNodeResourceManager.memory() .forceAllocateForTabletWithRetry( PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes()); + + this.notOnlyNeedObject = notOnlyNeedObject; } /** @@ -107,6 +131,133 @@ protected TsFileInsertionEventParser( */ public abstract Iterable toTabletInsertionEvents(); + public abstract Iterable getObjectTypeData(); + + /** + * Template method for creating ObjectTypeData iterator. Subclasses should implement + * createObjectTypeDataIteratorState() to provide data source specific state management. + */ + protected Iterable createObjectTypeDataIterator() { + return () -> { + final ObjectTypeDataIteratorState state = createObjectTypeDataIteratorState(); + return new Iterator() { + private Tablet tablet = null; + private int rowIndex = 0; + private int columnIndex = 0; + + @Override + public boolean hasNext() { + while (true) { + // Process current Tablet + if (tablet != null) { + final Object[] values = tablet.getValues(); + if (values == null || columnIndex >= values.length) { + resetTablet(); + continue; + } + + // Check if current column is Binary[] type + if (!(values[columnIndex] instanceof Binary[])) { + columnIndex++; + continue; + } + + final int rowSize = tablet.getRowSize(); + + while (rowIndex < rowSize) { + if (!isRowNull(tablet, columnIndex, rowIndex)) { + return true; + } + rowIndex++; + } + + columnIndex++; + rowIndex = 0; + continue; + } + + // Check if there are more data sources + if (!state.hasMoreDataSources()) { + return false; + } + + // Get next Tablet from data source + try { + tablet = state.getNextTablet(); + } catch (final Exception e) { + close(); + throw new PipeException("failed to read next Tablet", e); + } + + rowIndex = 0; + columnIndex = 0; + + // If the fetched Tablet has no data, continue to fetch the next one + if (tablet == null || tablet.getRowSize() == 0) { + tablet = null; + continue; + } + + final Object[] values = tablet.getValues(); + if (values == null || values.length == 0) { + tablet = null; + continue; + } + } + } + + private void resetTablet() { + tablet = null; + columnIndex = 0; + rowIndex = 0; + } + + private boolean isRowNull(Tablet tablet, int colIndex, int rowIdx) { + final BitMap[] bitMaps = tablet.getBitMaps(); + return bitMaps != null + && colIndex < bitMaps.length + && bitMaps[colIndex] != null + && bitMaps[colIndex].isMarked(rowIdx); + } + + @Override + public Binary next() { + final Binary[] column = (Binary[]) tablet.getValues()[columnIndex]; + return column[rowIndex++]; + } + }; + }; + } + + /** + * Create state object for ObjectTypeData iterator. Should be implemented by subclasses to provide + * data source specific state management. + * + * @return the state object for managing data source iteration + */ + protected abstract ObjectTypeDataIteratorState createObjectTypeDataIteratorState(); + + /** + * State interface for ObjectTypeData iterator. Subclasses should implement this to manage their + * specific data source iteration state. + */ + protected interface ObjectTypeDataIteratorState { + /** + * Check if there are more data sources to read from. + * + * @return true if there are more data sources, false otherwise + */ + boolean hasMoreDataSources(); + + /** + * Get the next Tablet from the data source. + * + * @return the next Tablet, or null if no more data + * @throws Exception if an error occurs while reading + */ + Tablet getNextTablet() throws Exception; + } + /** * Record parse start time when hasNext() is called for the first time and returns true. Should be * called in Iterator.hasNext() when it's the first call. diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java index 5632ab812260..e4e938015dfc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/TsFileInsertionEventParserProvider.java @@ -95,7 +95,8 @@ public TsFileInsertionEventParser provide(final boolean isWithMod) throws IOExce pipeTaskMeta, userName, sourceEvent, - isWithMod); + isWithMod, + true); } // Use scan container to save memory @@ -111,7 +112,8 @@ public TsFileInsertionEventParser provide(final boolean isWithMod) throws IOExce endTime, pipeTaskMeta, sourceEvent, - isWithMod); + isWithMod, + true); } if (treePattern instanceof IoTDBTreePatternOperations @@ -131,7 +133,8 @@ public TsFileInsertionEventParser provide(final boolean isWithMod) throws IOExce endTime, pipeTaskMeta, sourceEvent, - isWithMod); + isWithMod, + true); } final Map deviceIsAlignedMap = @@ -148,7 +151,8 @@ public TsFileInsertionEventParser provide(final boolean isWithMod) throws IOExce endTime, pipeTaskMeta, sourceEvent, - isWithMod); + isWithMod, + true); } final int originalSize = deviceIsAlignedMap.size(); @@ -166,7 +170,8 @@ public TsFileInsertionEventParser provide(final boolean isWithMod) throws IOExce endTime, pipeTaskMeta, sourceEvent, - isWithMod) + isWithMod, + true) : new TsFileInsertionEventQueryParser( pipeName, creationTime, @@ -177,7 +182,8 @@ public TsFileInsertionEventParser provide(final boolean isWithMod) throws IOExce pipeTaskMeta, sourceEvent, filteredDeviceIsAlignedMap, - isWithMod); + isWithMod, + true); } private Map filterDeviceIsAlignedMapByPattern( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java index 22f1cffdd0dd..b724cfee5da6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java @@ -41,6 +41,7 @@ import org.apache.tsfile.read.TsFileDeviceIterator; import org.apache.tsfile.read.TsFileReader; import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.Pair; import org.apache.tsfile.write.record.Tablet; import org.slf4j.Logger; @@ -78,7 +79,7 @@ public TsFileInsertionEventQueryParser( final long endTime, final PipeInsertionEvent sourceEvent) throws IOException { - this(null, 0, tsFile, pattern, startTime, endTime, null, sourceEvent, false); + this(null, 0, tsFile, pattern, startTime, endTime, null, sourceEvent, false, true); } public TsFileInsertionEventQueryParser( @@ -90,7 +91,8 @@ public TsFileInsertionEventQueryParser( final long endTime, final PipeTaskMeta pipeTaskMeta, final PipeInsertionEvent sourceEvent, - final boolean isWithMod) + final boolean isWithMod, + final boolean notOnlyNeedObject) throws IOException { this( pipeName, @@ -102,7 +104,8 @@ public TsFileInsertionEventQueryParser( pipeTaskMeta, sourceEvent, null, - isWithMod); + isWithMod, + notOnlyNeedObject); } public TsFileInsertionEventQueryParser( @@ -115,9 +118,20 @@ public TsFileInsertionEventQueryParser( final PipeTaskMeta pipeTaskMeta, final PipeInsertionEvent sourceEvent, final Map deviceIsAlignedMap, - final boolean isWithMod) + final boolean isWithMod, + final boolean notOnlyNeedObject) throws IOException { - super(pipeName, creationTime, pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent); + super( + pipeName, + creationTime, + pattern, + null, + startTime, + endTime, + pipeTaskMeta, + sourceEvent, + null, + notOnlyNeedObject); // tsFileResource will be obtained from sourceEvent try { currentModifications = @@ -330,9 +344,10 @@ private Map> readFilteredDeviceMeasurementsMap( final Map> result = new HashMap<>(); for (final IDeviceID device : devices) { - tsFileSequenceReader - .readDeviceMetadata(device) - .values() + tsFileSequenceReader.readDeviceMetadata(device).values().stream() + .filter( + timeseriesMetadata -> + notOnlyNeedObject || timeseriesMetadata.getTsDataType() == TSDataType.OBJECT) .forEach( timeseriesMetadata -> result @@ -409,7 +424,7 @@ public TabletInsertionEvent next() { final TabletInsertionEvent next; if (!hasNext()) { - next = + final PipeRawTabletInsertionEvent event = sourceEvent == null ? new PipeRawTabletInsertionEvent( null, @@ -435,9 +450,14 @@ public TabletInsertionEvent next() { pipeTaskMeta, sourceEvent, true); + + // Set tsFileResource and hasObjectData + event.setTsFileResource(tsFileResource); + event.setHasObject(hasObjectData); + next = event; close(); } else { - next = + final PipeRawTabletInsertionEvent event = sourceEvent == null ? new PipeRawTabletInsertionEvent( null, @@ -463,6 +483,11 @@ public TabletInsertionEvent next() { pipeTaskMeta, sourceEvent, false); + + // Set tsFileResource and hasObjectData + event.setTsFileResource(tsFileResource); + event.setHasObject(hasObjectData); + next = event; } return next; } @@ -472,6 +497,50 @@ public TabletInsertionEvent next() { return tabletInsertionIterable; } + @Override + public Iterable getObjectTypeData() { + return createObjectTypeDataIterator(); + } + + @Override + protected ObjectTypeDataIteratorState createObjectTypeDataIteratorState() { + return new ObjectTypeDataIteratorState() { + private TsFileInsertionEventQueryParserTabletIterator tabletIterator = null; + + @Override + public boolean hasMoreDataSources() { + while (tabletIterator == null || !tabletIterator.hasNext()) { + if (!deviceMeasurementsMapIterator.hasNext()) { + return false; + } + + final Map.Entry> entry = deviceMeasurementsMapIterator.next(); + + try { + tabletIterator = + new TsFileInsertionEventQueryParserTabletIterator( + tsFileReader, + measurementDataTypeMap, + entry.getKey(), + entry.getValue(), + timeFilterExpression, + allocatedMemoryBlockForTablet, + currentModifications); + } catch (final Exception e) { + close(); + throw new PipeException("failed to create TsFileInsertionDataTabletIterator", e); + } + } + return true; + } + + @Override + public Tablet getNextTablet() throws Exception { + return tabletIterator.next(); + } + }; + } + @Override public void close() { try { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java index 80382bf82b52..c7bec721cf4f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java @@ -104,9 +104,20 @@ public TsFileInsertionEventScanParser( final long endTime, final PipeTaskMeta pipeTaskMeta, final PipeInsertionEvent sourceEvent, - final boolean isWithMod) + final boolean isWithMod, + final boolean notOnlyNeedObject) throws IOException { - super(pipeName, creationTime, pattern, null, startTime, endTime, pipeTaskMeta, sourceEvent); + super( + pipeName, + creationTime, + pattern, + null, + startTime, + endTime, + pipeTaskMeta, + sourceEvent, + null, + notOnlyNeedObject); // tsFileResource will be obtained from sourceEvent this.startTime = startTime; this.endTime = endTime; @@ -150,9 +161,20 @@ public TsFileInsertionEventScanParser( final long endTime, final PipeTaskMeta pipeTaskMeta, final PipeInsertionEvent sourceEvent, - final boolean isWithMod) + final boolean isWithMod, + final boolean notOnlyNeedObject) throws IOException { - this(null, 0, tsFile, pattern, startTime, endTime, pipeTaskMeta, sourceEvent, isWithMod); + this( + null, + 0, + tsFile, + pattern, + startTime, + endTime, + pipeTaskMeta, + sourceEvent, + isWithMod, + notOnlyNeedObject); } @Override @@ -194,31 +216,38 @@ public TabletInsertionEvent next() { recordTabletMetrics(tablet); final boolean hasNext = hasNext(); try { - return sourceEvent == null - ? new PipeRawTabletInsertionEvent( - null, - null, - null, - null, - tablet, - isAligned, - null, - 0, - pipeTaskMeta, - sourceEvent, - !hasNext) - : new PipeRawTabletInsertionEvent( - sourceEvent.getRawIsTableModelEvent(), - sourceEvent.getSourceDatabaseNameFromDataRegion(), - sourceEvent.getRawTableModelDataBase(), - sourceEvent.getRawTreeModelDataBase(), - tablet, - isAligned, - sourceEvent.getPipeName(), - sourceEvent.getCreationTime(), - pipeTaskMeta, - sourceEvent, - !hasNext); + final PipeRawTabletInsertionEvent event = + sourceEvent == null + ? new PipeRawTabletInsertionEvent( + null, + null, + null, + null, + tablet, + isAligned, + null, + 0, + pipeTaskMeta, + sourceEvent, + !hasNext) + : new PipeRawTabletInsertionEvent( + sourceEvent.getRawIsTableModelEvent(), + sourceEvent.getSourceDatabaseNameFromDataRegion(), + sourceEvent.getRawTableModelDataBase(), + sourceEvent.getRawTreeModelDataBase(), + tablet, + isAligned, + sourceEvent.getPipeName(), + sourceEvent.getCreationTime(), + pipeTaskMeta, + sourceEvent, + !hasNext); + + // Set tsFileResource and hasObjectData + event.setTsFileResource(tsFileResource); + event.setHasObject(hasObjectData); + + return event; } finally { if (!hasNext) { close(); @@ -326,6 +355,26 @@ private Tablet getNextTablet() { } } + @Override + public Iterable getObjectTypeData() { + return createObjectTypeDataIterator(); + } + + @Override + protected ObjectTypeDataIteratorState createObjectTypeDataIteratorState() { + return new ObjectTypeDataIteratorState() { + @Override + public boolean hasMoreDataSources() { + return !Objects.isNull(chunkReader); + } + + @Override + public Tablet getNextTablet() throws Exception { + return getNextTablet(); + } + }; + } + private void prepareData() throws IOException { do { do { @@ -358,6 +407,7 @@ private boolean putValueToColumns(final BatchData data, final Tablet tablet, fin case TEXT: case BLOB: case STRING: + case OBJECT: tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues()); } tablet.getBitMaps()[i].mark(rowIndex); @@ -388,6 +438,7 @@ private boolean putValueToColumns(final BatchData data, final Tablet tablet, fin case TEXT: case BLOB: case STRING: + case OBJECT: tablet.addValue(rowIndex, i, primitiveType.getBinary().getValues()); break; default: @@ -419,6 +470,7 @@ private boolean putValueToColumns(final BatchData data, final Tablet tablet, fin case TEXT: case BLOB: case STRING: + case OBJECT: tablet.addValue(rowIndex, 0, data.getBinary().getValues()); break; default: @@ -478,6 +530,11 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException { break; } + if (!notOnlyNeedObject && chunkHeader.getDataType() != TSDataType.OBJECT) { + tsFileSequenceReader.position(nextMarkerOffset); + break; + } + // Skip the chunk if it is fully deleted by mods if (!currentModifications.isEmpty()) { final Statistics statistics = @@ -536,6 +593,11 @@ private void moveToNextChunkReader() throws IOException, IllegalStateException { break; } + if (!notOnlyNeedObject && chunkHeader.getDataType() != TSDataType.OBJECT) { + tsFileSequenceReader.position(nextMarkerOffset); + break; + } + if (!currentModifications.isEmpty()) { // Skip the chunk if it is fully deleted by mods final Statistics statistics = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java index 8ec8106a4963..b9fae47b0786 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParser.java @@ -37,6 +37,7 @@ import org.apache.iotdb.pipe.api.exception.PipeException; import org.apache.tsfile.read.TsFileSequenceReader; +import org.apache.tsfile.utils.Binary; import org.apache.tsfile.write.record.Tablet; import java.io.File; @@ -68,9 +69,20 @@ public TsFileInsertionEventTableParser( final PipeTaskMeta pipeTaskMeta, final String userName, final PipeInsertionEvent sourceEvent, - final boolean isWithMod) + final boolean isWithMod, + final boolean notOnlyNeedObject) throws IOException { - super(pipeName, creationTime, null, pattern, startTime, endTime, pipeTaskMeta, sourceEvent); + super( + pipeName, + creationTime, + null, + pattern, + startTime, + endTime, + pipeTaskMeta, + sourceEvent, + null, + notOnlyNeedObject); // tsFileResource will be obtained from sourceEvent this.isWithMod = isWithMod; try { @@ -119,7 +131,8 @@ public TsFileInsertionEventTableParser( final PipeTaskMeta pipeTaskMeta, final String userName, final PipeInsertionEvent sourceEvent, - final boolean isWithMod) + final boolean isWithMod, + final boolean notOnlyNeedObject) throws IOException { this( null, @@ -131,7 +144,8 @@ public TsFileInsertionEventTableParser( pipeTaskMeta, userName, sourceEvent, - isWithMod); + isWithMod, + notOnlyNeedObject); } @Override @@ -161,7 +175,8 @@ && hasTablePrivilege(entry.getKey()), allocatedMemoryBlockForTableSchemas, currentModifications, startTime, - endTime); + endTime, + notOnlyNeedObject); } final boolean hasNext = tabletIterator.hasNext(); if (hasNext && !parseStartTimeRecorded) { @@ -206,7 +221,7 @@ public TabletInsertionEvent next() { final TabletInsertionEvent next; if (!hasNext()) { - next = + final PipeRawTabletInsertionEvent event = sourceEvent == null ? new PipeRawTabletInsertionEvent( Boolean.TRUE, @@ -232,9 +247,14 @@ public TabletInsertionEvent next() { pipeTaskMeta, sourceEvent, true); + + // Set tsFileResource and hasObjectData + event.setTsFileResource(tsFileResource); + event.setHasObject(hasObjectData); + next = event; close(); } else { - next = + final PipeRawTabletInsertionEvent event = sourceEvent == null ? new PipeRawTabletInsertionEvent( Boolean.TRUE, @@ -260,6 +280,11 @@ public TabletInsertionEvent next() { pipeTaskMeta, sourceEvent, false); + + // Set tsFileResource and hasObjectData + event.setTsFileResource(tsFileResource); + event.setHasObject(hasObjectData); + next = event; } return next; } @@ -269,6 +294,61 @@ public TabletInsertionEvent next() { return tabletInsertionIterable; } + @Override + public Iterable getObjectTypeData() { + return createObjectTypeDataIterator(); + } + + @Override + protected ObjectTypeDataIteratorState createObjectTypeDataIteratorState() { + return new ObjectTypeDataIteratorState() { + private TsFileInsertionEventTableParserTabletIterator tabletIterator = null; + + @Override + public boolean hasMoreDataSources() { + try { + if (tabletIterator == null) { + tabletIterator = + new TsFileInsertionEventTableParserTabletIterator( + tsFileSequenceReader, + entry -> + (Objects.isNull(tablePattern) || tablePattern.matchesTable(entry.getKey())) + && hasTablePrivilege(entry.getKey()), + allocatedMemoryBlockForTablet, + allocatedMemoryBlockForBatchData, + allocatedMemoryBlockForChunk, + allocatedMemoryBlockForChunkMeta, + allocatedMemoryBlockForTableSchemas, + currentModifications, + startTime, + endTime, + notOnlyNeedObject); + } + return tabletIterator.hasNext(); + } catch (Exception e) { + close(); + throw new PipeException("Error while parsing tsfile insertion event", e); + } + } + + @Override + public Tablet getNextTablet() throws Exception { + return tabletIterator.next(); + } + + private boolean hasTablePrivilege(final String tableName) { + return Objects.isNull(userName) + || Objects.isNull(sourceEvent) + || Objects.isNull(sourceEvent.getTableModelDatabaseName()) + || AuthorityChecker.getAccessControl() + .checkCanSelectFromTable4Pipe( + userName, + new QualifiedObjectName(sourceEvent.getTableModelDatabaseName(), tableName), + new UserEntity(-1, userName, "")); + } + }; + } + @Override public void close() { super.close(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java index f05cf872c798..1adb05a2ebd4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java @@ -79,6 +79,8 @@ public class TsFileInsertionEventTableParserTabletIterator implements Iterator modifications; @@ -120,7 +122,8 @@ public TsFileInsertionEventTableParserTabletIterator( final PipeMemoryBlock allocatedMemoryBlockForTableSchema, final PatternTreeMap modifications, final long startTime, - final long endTime) + final long endTime, + final boolean notOnlyNeedObject) throws IOException { this.startTime = startTime; @@ -141,6 +144,8 @@ public TsFileInsertionEventTableParserTabletIterator( this.allocatedMemoryBlockForChunkMeta = allocatedMemoryBlockForChunkMeta; this.allocatedMemoryBlockForTableSchema = allocatedMemoryBlockForTableSchema; + this.notOnlyNeedObject = notOnlyNeedObject; + long tableSchemaSize = fileMetadata.getBloomFilter().getRetainedSizeInBytes(); for (Map.Entry tableSchemaEntry : tableSchemaList) { tableSchemaSize += @@ -222,6 +227,11 @@ public boolean hasNext() { continue; } + if (!notOnlyNeedObject && iChunkMetadata.getDataType() == TSDataType.OBJECT) { + iChunkMetadataIterator.remove(); + continue; + } + if (!modifications.isEmpty() && ModsOperationUtil.isAllDeletedByMods( pair.getLeft(), @@ -440,6 +450,7 @@ private boolean fillMeasurementValueColumns( case TEXT: case BLOB: case STRING: + case OBJECT: tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues()); } tablet.getBitMaps()[i].mark(rowIndex); @@ -470,6 +481,7 @@ private boolean fillMeasurementValueColumns( case TEXT: case BLOB: case STRING: + case OBJECT: Binary binary = primitiveType.getBinary(); tablet.addValue( rowIndex, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java index 9ee89189e138..6b47bd83ce47 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java @@ -235,6 +235,26 @@ public PipeRealtimeEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport( endTime); } + @Override + public void scanForObjectData() { + event.scanForObjectData(); + } + + @Override + public void setHasObject(boolean hasObject) { + event.setHasObject(hasObject); + } + + @Override + public boolean hasObjectData() { + return event.hasObjectData(); + } + + @Override + public String[] getObjectPaths() { + return event.getObjectPaths(); + } + @Override public boolean isGeneratedByPipe() { return event.isGeneratedByPipe(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java index acc62f7e7a4d..4cf808054cbd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java @@ -50,7 +50,7 @@ public static PipeRealtimeEvent createRealtimeEvent( final TsFileResource resource) { final PipeInsertNodeTabletInsertionEvent insertionEvent = new PipeInsertNodeTabletInsertionEvent( - isTableModel, databaseNameFromDataRegion, insertNode); + isTableModel, databaseNameFromDataRegion, insertNode, resource); return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent( insertionEvent, insertNode, resource); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java index d79b05f2fafe..8d58157aff8e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTableStatementDataTypeConvertExecutionVisitor.java @@ -134,6 +134,7 @@ public Optional visitLoadFile( null, "root", null, + true, true)) { for (final TabletInsertionEvent tabletInsertionEvent : parser.toTabletInsertionEvents()) { if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java index 282b378a2d26..19da440ce23a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeTreeStatementDataTypeConvertExecutionVisitor.java @@ -102,7 +102,14 @@ public Optional visitLoadFile( for (final File file : loadTsFileStatement.getTsFiles()) { try (final TsFileInsertionEventScanParser parser = new TsFileInsertionEventScanParser( - file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null, true)) { + file, + new IoTDBTreePattern(null), + Long.MIN_VALUE, + Long.MAX_VALUE, + null, + null, + true, + true)) { for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { final PipeConvertedInsertTabletStatement statement = new PipeConvertedInsertTabletStatement( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java index aaf9eff454b8..3a6abcfae6ce 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/PipeDataNodeResourceManager.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager; import org.apache.iotdb.commons.pipe.resource.snapshot.PipeSnapshotResourceManager; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager; +import org.apache.iotdb.db.pipe.resource.object.PipeObjectResourceManager; import org.apache.iotdb.db.pipe.resource.ref.PipeDataNodePhantomReferenceManager; import org.apache.iotdb.db.pipe.resource.snapshot.PipeDataNodeSnapshotResourceManager; import org.apache.iotdb.db.pipe.resource.tsfile.PipeCompactionManager; @@ -36,6 +37,7 @@ public class PipeDataNodeResourceManager { private final PipeMemoryManager pipeMemoryManager; private final PipeLogManager pipeLogManager; private final PipePhantomReferenceManager pipePhantomReferenceManager; + private final PipeObjectResourceManager pipeObjectResourceManager; public static PipeTsFileResourceManager tsfile() { return PipeResourceManagerHolder.INSTANCE.pipeTsFileResourceManager; @@ -61,6 +63,10 @@ public static PipePhantomReferenceManager ref() { return PipeResourceManagerHolder.INSTANCE.pipePhantomReferenceManager; } + public static PipeObjectResourceManager object() { + return PipeResourceManagerHolder.INSTANCE.pipeObjectResourceManager; + } + ///////////////////////////// SINGLETON ///////////////////////////// private PipeDataNodeResourceManager() { @@ -70,6 +76,7 @@ private PipeDataNodeResourceManager() { pipeMemoryManager = new PipeMemoryManager(); pipeLogManager = new PipeLogManager(); pipePhantomReferenceManager = new PipeDataNodePhantomReferenceManager(); + pipeObjectResourceManager = new PipeObjectResourceManager(); } private static class PipeResourceManagerHolder { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResource.java new file mode 100644 index 000000000000..4bdbdacb34e7 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResource.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.resource.object; + +import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Manage Object file resources under TSFile - Know the absolute path of Object files - Track + * whether TSFile is closed - Count how many Object files are linked - Provide hard link + * functionality - Provide methods to get events and iterators + */ +public class PipeObjectResource implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeObjectResource.class); + + // Object file directory + private final File objectFileDir; + + // Flag indicating whether Object resource is closed + private final AtomicBoolean isObjectResourceClosed = new AtomicBoolean(false); + + // Flag indicating whether TSFile is closed + private final AtomicBoolean isTsFileClosed = new AtomicBoolean(false); + + // Count of linked Object files + private final AtomicInteger linkedObjectFileCount = new AtomicInteger(0); + + // Reference count, used to track how many places are using this Object resource + private final AtomicInteger referenceCount = new AtomicInteger(0); + + // Associated TSFile resource + private final TsFileResource tsFileResource; + + public PipeObjectResource(final TsFileResource tsFileResource, final File objectFileDir) + throws IOException { + this.tsFileResource = tsFileResource; + + // Check if File exists and its type + if (objectFileDir.exists()) { + if (objectFileDir.isFile()) { + throw new IOException( + String.format( + "Object file directory cannot be a file: %s", objectFileDir.getAbsolutePath())); + } + // If exists and is a directory, use it normally + this.objectFileDir = objectFileDir; + } else { + // If does not exist, create as directory + if (!objectFileDir.mkdirs()) { + throw new IOException( + String.format( + "Failed to create object file directory: %s", objectFileDir.getAbsolutePath())); + } + this.objectFileDir = objectFileDir; + } + } + + /** + * Get Object file directory + * + * @return Object file directory + */ + public File getObjectFileDir() { + return objectFileDir; + } + + /** + * Get absolute path of Object file directory + * + * @return absolute path of Object file directory + */ + public String getObjectFileDirAbsolutePath() { + return objectFileDir.getAbsolutePath(); + } + + /** + * Get associated TSFile resource + * + * @return TSFile resource + */ + public TsFileResource getTsFileResource() { + return tsFileResource; + } + + /** + * Get whether Object resource is closed + * + * @return true if closed, false if not closed + */ + public boolean isObjectResourceClosed() { + return isObjectResourceClosed.get(); + } + + /** + * Get flag indicating whether TSFile is closed + * + * @return true if TSFile is closed, false if not closed + */ + public boolean isTsFileClosed() { + return isTsFileClosed.get(); + } + + /** Set TSFile closed flag */ + public void setTsFileClosed() { + isTsFileClosed.set(true); + } + + /** + * Get count of linked Object files + * + * @return Object file count + */ + public int getLinkedObjectFileCount() { + return linkedObjectFileCount.get(); + } + + /** + * Get reference count + * + * @return reference count + */ + public int getReferenceCount() { + return referenceCount.get(); + } + + /** Increase reference count. Called when a new place needs to use this Object resource */ + public void increaseReferenceCount() { + final int count = referenceCount.incrementAndGet(); + LOGGER.debug( + "Increased reference count for object resource of TSFile {}: {}", + tsFileResource.getTsFile().getPath(), + count); + } + + /** + * Decrease reference count. Called when a place no longer uses this Object resource + * + * @return true if reference count is 0, can be cleaned up; false if there are still other places + * using it + */ + public boolean decreaseReferenceCount() { + final int count = referenceCount.decrementAndGet(); + LOGGER.debug( + "Decreased reference count for object resource of TSFile {}: {}", + tsFileResource.getTsFile().getPath(), + count); + + if (count < 0) { + LOGGER.warn( + "Reference count for object resource of TSFile {} is decreased to below 0: {}", + tsFileResource.getTsFile().getPath(), + count); + } + + return count == 0 && isTsFileClosed.get(); + } + + /** + * Convert each relative path to Object absolute path and create hard links + * + * @param relativePaths list of Object file relative paths (relative to TSFile directory) + * @return true if successfully linked, false if failed + * @throws IOException when hard link creation fails + */ + public boolean linkObjectFiles(final List relativePaths) throws IOException { + if (relativePaths == null || relativePaths.isEmpty()) { + return true; // Empty list is considered success + } + + // Check if Object resource is closed + if (isObjectResourceClosed.get()) { + LOGGER.warn( + "Cannot link object files: Object resource is closed for TSFile {}", + tsFileResource.getTsFile().getPath()); + return false; + } + + // Check if TSFile is closed + if (isTsFileClosed.get()) { + LOGGER.warn( + "Cannot link object files for closed TSFile: {}", tsFileResource.getTsFile().getPath()); + return false; + } + + final File tsFile = tsFileResource.getTsFile(); + final File tsFileDir = tsFile.getParentFile(); + boolean allSuccess = true; + + for (final String relativePath : relativePaths) { + if (relativePath == null || relativePath.isEmpty()) { + continue; + } + + try { + // Build complete path of original Object file (relative to TSFile directory) + final File originalObjectFile = new File(tsFileDir, relativePath); + if (!originalObjectFile.exists()) { + LOGGER.warn( + "Object file does not exist: {} (relative to TSFile: {})", + relativePath, + tsFile.getPath()); + allSuccess = false; + continue; + } + + // Build hard link target path (in pipe directory) + final File hardlinkTargetFile = new File(objectFileDir, relativePath); + hardlinkTargetFile.getParentFile().mkdirs(); + + // Create hard link + FileUtils.createHardLink(originalObjectFile, hardlinkTargetFile); + + linkedObjectFileCount.incrementAndGet(); + + LOGGER.debug( + "Created hardlink for object file: {} -> {}", originalObjectFile, hardlinkTargetFile); + } catch (final Exception e) { + LOGGER.warn( + "Failed to create hardlink for object file {} (relative to TSFile {}): {}", + relativePath, + tsFile.getPath(), + e.getMessage(), + e); + allSuccess = false; + } + } + + return allSuccess; + } + + /** + * Get Object file hard link File object based on relative path + * + * @param relativePath Object file relative path (relative to TSFile directory) + * @return Object file hard link File object, or null if file does not exist + */ + public File getObjectFileHardlink(final String relativePath) { + if (relativePath == null || relativePath.isEmpty()) { + return null; + } + + // Check if Object resource is closed + if (isObjectResourceClosed.get()) { + LOGGER.warn( + "Cannot get object file hardlink: Object resource is closed for TSFile {}", + tsFileResource.getTsFile().getPath()); + return null; + } + + // Get Object file from hard link directory + final File hardlinkFile = new File(objectFileDir, relativePath); + + if (!hardlinkFile.exists()) { + LOGGER.debug( + "Object file hardlink does not exist: {} (relative to TSFile: {})", + relativePath, + tsFileResource.getTsFile().getPath()); + return null; + } + + return hardlinkFile; + } + + /** + * Get ObjectNode iterator. Note: This method needs to be implemented according to actual + * requirements + * + * @return ObjectNode iterator + */ + public Iterator getObjectNodeIterator() { + // TODO: Implement according to actual requirements, parse ObjectNode from Object files and + // return iterator + return new Iterator() { + @Override + public boolean hasNext() { + return false; + } + + @Override + public Object next() { + return null; + } + }; + } + + /** + * Clean up all Object file hard links. Delete the entire Object file directory, including the + * directory itself + */ + public void cleanup() { + // Mark resource as closed to prevent subsequent operations + isObjectResourceClosed.set(true); + + LOGGER.info( + "Cleaning up {} object files for TSFile: {}", + linkedObjectFileCount.get(), + tsFileResource.getTsFile().getPath()); + + // Delete the entire Object file directory, including the directory itself + // Directory structure: pipe/object/[pipeName]/[tsfilename]/ + // Need to delete [tsfilename] directory and all files under it + if (objectFileDir.exists()) { + try { + FileUtils.deleteFileOrDirectory(objectFileDir, true); + LOGGER.info( + "Successfully deleted object file directory: {}", objectFileDir.getAbsolutePath()); + } catch (final Exception e) { + LOGGER.error( + "Failed to delete object file directory: {}", objectFileDir.getAbsolutePath(), e); + } + } + + linkedObjectFileCount.set(0); + referenceCount.set(0); + } + + /** + * Implement AutoCloseable interface. Automatically called when resource is no longer used, clean + * up all Object files + */ + @Override + public void close() { + if (referenceCount.get() > 0) { + LOGGER.warn( + "Closing PipeObjectResource with non-zero reference count: {} for TSFile: {}", + referenceCount.get(), + tsFileResource.getTsFile().getPath()); + } + cleanup(); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResourceManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResourceManager.java new file mode 100644 index 000000000000..0eadbdaf02eb --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/object/PipeObjectResourceManager.java @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.resource.object; + +import org.apache.iotdb.commons.conf.IoTDBConstant; +import org.apache.iotdb.commons.pipe.config.PipeConfig; +import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceSegmentLock; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Manage Object file resources under TSFile. Each TSFile corresponds to one PipeObjectResource + * instance + */ +public class PipeObjectResourceManager { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeObjectResourceManager.class); + + // PipeName -> (tsFileName -> PipeObjectResource) + // Used to manage Object files for each TSFile under each Pipe + private final Map> pipeToTsFileToObjectResourceMap = + new ConcurrentHashMap<>(); + + // Cache absolute path of object/ directory: data/pipe/object/ + // Cached on first link, used to clean up entire Pipe directory + private volatile File objectBaseDirCache = null; + + private final PipeTsFileResourceSegmentLock segmentLock = new PipeTsFileResourceSegmentLock(); + + /////////////////////////////// Core Function Methods /////////////////////////////// + + /** + * Create hard links for Object files under TSFile + * + * @param tsFileResource TSFile resource, used to get TSFile information + * @param objectFilePaths list of Object file paths (relative paths) + * @param pipeName pipe name + * @return true if successfully linked, false if failed + * @throws IOException when hard link creation fails + */ + public boolean linkObjectFiles( + final TsFileResource tsFileResource, + final List objectFilePaths, + final @Nullable String pipeName) + throws IOException { + final PipeObjectResource objectResource = getOrCreateObjectResource(tsFileResource, pipeName); + + // On first link, cache absolute path of object/ root directory + if (objectBaseDirCache == null) { + synchronized (this) { + if (objectBaseDirCache == null) { + final File objectFileDir = objectResource.getObjectFileDir(); + // objectFileDir format: data/pipe/object/[pipeName]/[tsfilename]/ + // Go up two levels to get: data/pipe/object/ + File baseDir = objectFileDir.getParentFile(); // data/pipe/object/[pipeName]/ + if (pipeName != null) { + baseDir = baseDir.getParentFile(); // data/pipe/object/ + } + objectBaseDirCache = baseDir; + LOGGER.info("Cached object base directory: {}", objectBaseDirCache.getAbsolutePath()); + } + } + } + + // Link Object files + final boolean success = objectResource.linkObjectFiles(objectFilePaths); + + LOGGER.debug( + "Linked object files for TSFile: {} (pipe: {}, linked count: {})", + tsFileResource.getTsFile().getName(), + pipeName, + objectResource.getLinkedObjectFileCount()); + + return success; + } + + /** + * Increase reference count of Object resource + * + * @param tsFileResource TSFile resource + * @param pipeName pipe name + */ + public void increaseReference( + final TsFileResource tsFileResource, final @Nullable String pipeName) { + if (tsFileResource == null || tsFileResource.getTsFile() == null) { + LOGGER.warn("Cannot increase reference: TSFileResource or TSFile is null"); + return; + } + + final String tsFileName = tsFileResource.getTsFile().getName(); + final PipeObjectResource objectResource = getResourceMap(pipeName).get(tsFileName); + + if (objectResource == null) { + LOGGER.warn( + "Cannot increase reference for non-existent object resource: {} (pipe: {})", + tsFileName, + pipeName); + return; + } + + objectResource.increaseReferenceCount(); + + LOGGER.debug( + "Increased reference for object resource: {} (pipe: {}, ref count: {})", + tsFileName, + pipeName, + objectResource.getReferenceCount()); + } + + /** + * Decrease reference count of Object resource. When reference count reaches 0, clean up all + * Object files under this TSFile + * + * @param tsFileResource TSFile resource + * @param pipeName pipe name + */ + public void decreaseReference( + final TsFileResource tsFileResource, final @Nullable String pipeName) { + if (tsFileResource == null || tsFileResource.getTsFile() == null) { + return; + } + + final String tsFileName = tsFileResource.getTsFile().getName(); + final PipeObjectResource objectResource = getResourceMap(pipeName).get(tsFileName); + + if (objectResource == null) { + LOGGER.warn( + "Cannot decrease reference for non-existent object resource: {} (pipe: {})", + tsFileName, + pipeName); + return; + } + + final boolean shouldCleanup = objectResource.decreaseReferenceCount(); + + LOGGER.debug( + "Decreased reference for object resource: {} (pipe: {}, remaining: {})", + tsFileName, + pipeName, + objectResource.getReferenceCount()); + + if (shouldCleanup) { + // Reference count is 0, clean up resource + cleanupObjectFilesForTsFile(pipeName, tsFileName); + } + } + + /** + * Called when TSFile is closed, set PipeObjectResource's tsFileClosed flag to true Note: Will not + * immediately clean up Object files, cleanup is managed by reference count + * + * @param tsFileResource TSFile resource + * @param pipeName pipe name + */ + public void setTsFileClosed( + final TsFileResource tsFileResource, final @Nullable String pipeName) { + if (tsFileResource == null || tsFileResource.getTsFile() == null) { + return; + } + + final String tsFileName = tsFileResource.getTsFile().getName(); + final PipeObjectResource objectResource = getResourceMap(pipeName).get(tsFileName); + if (objectResource != null) { + objectResource.setTsFileClosed(); + LOGGER.debug( + "Set tsFileClosed flag for object resource: {} (pipe: {})", tsFileName, pipeName); + } + } + + /** + * Clean up all Object files for specified Pipe. Delete the entire Pipe's Object directory + * + * @param pipeName pipe name + */ + public void cleanupPipe(final @Nullable String pipeName) { + final String pipeKey = pipeName != null ? pipeName : "null"; + final Map resourceMap = + pipeToTsFileToObjectResourceMap.remove(pipeKey); + + if (resourceMap == null || resourceMap.isEmpty()) { + LOGGER.info("No object resources to clean up for pipe: {}", pipeName); + return; + } + + LOGGER.info("Cleaning up {} object resources for pipe: {}", resourceMap.size(), pipeName); + + // Clean up Object resources for all TSFiles + for (final Map.Entry entry : resourceMap.entrySet()) { + final String tsFileName = entry.getKey(); + final PipeObjectResource objectResource = entry.getValue(); + try { + objectResource.setTsFileClosed(); + objectResource.cleanup(); + LOGGER.debug("Cleaned up object resource for TSFile: {} (pipe: {})", tsFileName, pipeName); + } catch (final Exception e) { + LOGGER.error( + "Failed to cleanup object resource for TSFile: {} (pipe: {})", tsFileName, pipeName, e); + } + } + + // Delete entire Pipe's Object directory: pipe/object/[pipeName]/ + final File pipeObjectDir = getPipeObjectDirForPipe(pipeName); + if (pipeObjectDir != null && pipeObjectDir.exists()) { + try { + FileUtils.deleteFileOrDirectory(pipeObjectDir, true); + LOGGER.info("Deleted pipe object directory: {}", pipeObjectDir.getAbsolutePath()); + } catch (final Exception e) { + LOGGER.error("Failed to delete pipe object directory for pipe: {}", pipeName, e); + } + } else if (pipeObjectDir == null) { + LOGGER.warn( + "Cannot get pipe object directory for pipe: {}, skipping directory deletion", pipeName); + } + } + + /////////////////////////////// Helper Methods /////////////////////////////// + + /** + * Get or create PipeObjectResource corresponding to TSFile + * + * @param tsFileResource TSFile resource, used to get TSFile information and file name + * @param pipeName pipe name, if null means it's assigner's public resource + * @return PipeObjectResource instance + * @throws IOException when directory creation fails + */ + private PipeObjectResource getOrCreateObjectResource( + final TsFileResource tsFileResource, final @Nullable String pipeName) throws IOException { + final File tsFile = tsFileResource.getTsFile(); + if (tsFile == null) { + throw new IOException("TSFile is null in TsFileResource"); + } + final String tsFileName = tsFile.getName(); + + segmentLock.lock(tsFile); + try { + return getResourceMap(pipeName) + .computeIfAbsent( + tsFileName, + k -> { + try { + final File objectFileDir = getPipeObjectFileDir(tsFile, pipeName); + return new PipeObjectResource(tsFileResource, objectFileDir); + } catch (final IOException e) { + LOGGER.error( + "Failed to create PipeObjectResource for TSFile {}: {}", + tsFile.getPath(), + e.getMessage(), + e); + throw new RuntimeException(e); + } + }); + } finally { + segmentLock.unlock(tsFile); + } + } + + /** + * Get resource Map for specified Pipe + * + * @param pipeName pipe name + * @return resource Map for this Pipe + */ + private Map getResourceMap(final @Nullable String pipeName) { + return pipeToTsFileToObjectResourceMap.computeIfAbsent( + pipeName != null ? pipeName : "null", k -> new ConcurrentHashMap<>()); + } + + /** + * Clean up all Object files under specified TSFile + * + * @param pipeName pipe name + * @param tsFileName TSFile file name + */ + private void cleanupObjectFilesForTsFile( + final @Nullable String pipeName, final String tsFileName) { + final Map resourceMap = getResourceMap(pipeName); + final PipeObjectResource objectResource = resourceMap.remove(tsFileName); + + if (objectResource == null) { + return; + } + + LOGGER.info( + "Cleaning up object files for TSFile: {} (pipe: {}, linked {} files)", + tsFileName, + pipeName, + objectResource.getLinkedObjectFileCount()); + + // Mark TSFile as closed + objectResource.setTsFileClosed(); + + // Clean up all Object files + objectResource.cleanup(); + } + + /////////////////////////////// Query Methods /////////////////////////////// + + /** + * Get PipeObjectResource for specified TSFile + * + * @param tsFileResource TSFile resource + * @param pipeName pipe name + * @return PipeObjectResource instance, or null if does not exist + */ + @Nullable + public PipeObjectResource getObjectResource( + final TsFileResource tsFileResource, final @Nullable String pipeName) { + if (tsFileResource == null || tsFileResource.getTsFile() == null) { + return null; + } + final String tsFileName = tsFileResource.getTsFile().getName(); + return getResourceMap(pipeName).get(tsFileName); + } + + /** + * Get Object file hard link File object based on relative path This is a convenience method that + * first gets PipeObjectResource, then calls its getObjectFileHardlink method + * + * @param tsFileResource TSFile resource + * @param relativePath Object file relative path (relative to TSFile directory) + * @param pipeName pipe name + * @return Object file hard link File object, or null if does not exist + */ + @Nullable + public File getObjectFileHardlink( + final TsFileResource tsFileResource, + final String relativePath, + final @Nullable String pipeName) { + final PipeObjectResource objectResource = getObjectResource(tsFileResource, pipeName); + if (objectResource == null) { + return null; + } + return objectResource.getObjectFileHardlink(relativePath); + } + + /** + * Get Object file count under specified TSFile + * + * @param tsFileResource TSFile resource + * @param pipeName pipe name + * @return Object file count + */ + @TestOnly + public int getObjectFileCount( + final TsFileResource tsFileResource, final @Nullable String pipeName) { + final PipeObjectResource objectResource = getObjectResource(tsFileResource, pipeName); + return objectResource == null ? 0 : objectResource.getLinkedObjectFileCount(); + } + + /** + * Get Object resource count for specified Pipe + * + * @param pipeName pipe name + * @return Object resource count + */ + @TestOnly + public int getPipeObjectResourceCount(final @Nullable String pipeName) { + final Map resourceMap = getResourceMap(pipeName); + return resourceMap.size(); + } + + /////////////////////////////// Static Utility Methods /////////////////////////////// + + /** + * Get Object file hard link path in pipe directory + * + *

This method mimics the implementation of {@link + * org.apache.iotdb.db.pipe.resource.tsfile.PipeTsFileResourceManager#getHardlinkOrCopiedFileInPipeDir} + * + * @param objectFile original Object file (relative to TSFile directory) + * @param tsFileResource TSFile resource, used to determine which TSFile the Object file belongs + * to + * @param pipeName pipe name, if null means it's assigner's public resource + * @return Object file hard link path in pipe directory + * @throws IOException when path cannot be determined + */ + public static File getObjectFileHardlinkInPipeDir( + final File objectFile, final TsFileResource tsFileResource, final @Nullable String pipeName) + throws IOException { + try { + final File tsFile = tsFileResource.getTsFile(); + if (tsFile == null) { + throw new IOException("TSFile is null in TsFileResource"); + } + + // Get Object file relative path (relative to TSFile directory) + final String relativePath = getObjectFileRelativePath(objectFile, tsFile); + + // Get Object file directory + final File objectFileDir = getPipeObjectFileDir(tsFile, pipeName); + + // Return hard link path + return new File(objectFileDir, relativePath); + } catch (final Exception e) { + throw new IOException( + String.format( + "failed to get object file hardlink in pipe dir " + "for object file %s, tsfile: %s", + objectFile.getPath(), + tsFileResource.getTsFile() != null ? tsFileResource.getTsFile().getPath() : "null"), + e); + } + } + + /** + * Get Object file relative path relative to TSFile directory + * + * @param objectFile Object file + * @param tsFile TSFile file + * @return Object file relative path relative to TSFile directory + * @throws IOException when relative path cannot be calculated + */ + private static String getObjectFileRelativePath(final File objectFile, final File tsFile) + throws IOException { + final File tsFileDir = tsFile.getParentFile(); + if (tsFileDir == null) { + throw new IOException("TSFile has no parent directory: " + tsFile.getPath()); + } + + final String objectFilePath = objectFile.getCanonicalPath(); + final String tsFileDirPath = tsFileDir.getCanonicalPath(); + + if (!objectFilePath.startsWith(tsFileDirPath)) { + throw new IOException( + String.format( + "Object file %s is not under TSFile directory %s", objectFilePath, tsFileDirPath)); + } + + // Calculate relative path + final String relativePath = objectFilePath.substring(tsFileDirPath.length()); + // Remove leading path separator + return relativePath.startsWith(File.separator) || relativePath.startsWith("/") + ? relativePath.substring(1) + : relativePath; + } + + /** + * Get Object file directory in pipe directory + * + *

Directory structure description: + * + *

+   * data/
+   *   sequence/
+   *     database/
+   *       dataRegionId/
+   *         timePartitionId/
+   *           tsfile.tsfile
+   *   pipe/
+   *     tsfile/              # TSFile hardlink directory
+   *       [pipeName]/        # If pipeName exists, under this subdirectory
+   *         tsfile.tsfile    # TSFile hardlink
+   *     object/              # Object file hardlink directory (same level as tsfile)
+   *       [pipeName]/        # If pipeName exists, under this subdirectory
+   *         [tsfilename]/    # Directory named after TSFile, containing all Object files for this TSFile
+   *           object1.obj    # Object file hardlink
+   *           object2.obj
+   * 
+ * + * @param tsFile TSFile file (original file, not hardlink) + * @param pipeName pipe name, if null means it's assigner's public resource + * @return Object file directory + * @throws IOException when path cannot be determined + */ + private static File getPipeObjectFileDir(final File tsFile, final @Nullable String pipeName) + throws IOException { + // Traverse up from TSFile path until finding sequence/unsequence directory + // Mimic implementation of PipeTsFileResourceManager.getPipeTsFileDirPath + File file = tsFile; + while (!file.getName().equals(IoTDBConstant.SEQUENCE_FOLDER_NAME) + && !file.getName().equals(IoTDBConstant.UNSEQUENCE_FOLDER_NAME) + && !file.getName().equals(PipeConfig.getInstance().getPipeHardlinkBaseDirName())) { + file = file.getParentFile(); + if (file == null) { + throw new IOException("Cannot find sequence/unsequence folder for: " + tsFile.getPath()); + } + } + + // Build object directory path + // Structure: data/pipe/object/[pipeName]/[tsfilename]/ + // tsfilename is obtained from tsFile.getName() + File objectDir = + new File( + file.getParentFile().getCanonicalPath() + + File.separator + + PipeConfig.getInstance().getPipeHardlinkBaseDirName() + + File.separator + + "object"); + + if (Objects.nonNull(pipeName)) { + objectDir = new File(objectDir, pipeName); + } + + // Add TSFile name as subdirectory + objectDir = new File(objectDir, tsFile.getName()); + + return objectDir.getCanonicalFile(); + } + + /** + * Get Object directory for specified Pipe + * + *

Use cached object/ root directory to build Pipe's Object directory + * + * @param pipeName pipe name + * @return Pipe's Object directory path: data/pipe/object/[pipeName]/, or null if not cached + */ + private File getPipeObjectDirForPipe(final @Nullable String pipeName) { + if (objectBaseDirCache == null) { + LOGGER.warn( + "Object base directory not cached yet, cannot get pipe object directory for pipe: {}", + pipeName); + return null; + } + + // Build Pipe's directory from cached root directory + // objectBaseDirCache: data/pipe/object/ + // Return: data/pipe/object/[pipeName]/ + File pipeObjectDir = objectBaseDirCache; + if (pipeName != null) { + pipeObjectDir = new File(pipeObjectDir, pipeName); + } + + return pipeObjectDir; + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java index 275bc694397d..649db8ebe44b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.pipe.sink.util.builder.PipeTableModelTsFileBuilderV2; import org.apache.iotdb.db.pipe.sink.util.builder.PipeTreeModelTsFileBuilderV2; import org.apache.iotdb.db.pipe.sink.util.builder.PipeTsFileBuilder; +import org.apache.iotdb.db.pipe.sink.util.builder.PipeTsFileIdGenerator; import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter; import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; @@ -50,7 +51,18 @@ public class PipeTabletEventTsFileBatch extends PipeTabletEventBatch { private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventTsFileBatch.class); private static final AtomicLong BATCH_ID_GENERATOR = new AtomicLong(0); - private final AtomicLong currentBatchId = new AtomicLong(BATCH_ID_GENERATOR.incrementAndGet()); + private final AtomicLong currentBatchId = new AtomicLong(PipeTsFileIdGenerator.getNextBatchId()); + + /** + * Get the next BatchID + * + * @return the next BatchID + * @deprecated Use {@link PipeTsFileIdGenerator#getNextBatchId()} instead + */ + @Deprecated + public static long getNextBatchId() { + return PipeTsFileIdGenerator.getNextBatchId(); + } private final PipeTsFileBuilder treeModeTsFileBuilder; private final PipeTsFileBuilder tableModeTsFileBuilder; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/FileTransfer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/FileTransfer.java new file mode 100644 index 000000000000..849cec1be7cf --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/FileTransfer.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.tsfile; + +import java.io.File; + +/** + * FileTransfer - File transfer interface + * + *

Used to abstract different file transfer methods, such as local copy, SCP transfer, etc. + */ +public interface FileTransfer extends AutoCloseable { + + /** + * Test connection (if needed) + * + * @throws Exception throws exception when connection fails + */ + void testConnection() throws Exception; + + /** + * Transfer file + * + * @param localFile local file + * @throws Exception throws exception when transfer fails + */ + void transferFile(final File localFile) throws Exception; + + /** + * Transfer directory + * + * @param localDir local directory + * @throws Exception throws exception when transfer fails + */ + void transferDirectory(final File localDir) throws Exception; +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/LocalFileTransfer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/LocalFileTransfer.java new file mode 100644 index 000000000000..07b58ab866c6 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/LocalFileTransfer.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.tsfile; + +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; + +/** + * LocalFileTransfer - Local file transfer implementation + * + *

Use local file system for file copying + */ +public class LocalFileTransfer implements FileTransfer { + + private static final Logger LOGGER = LoggerFactory.getLogger(LocalFileTransfer.class); + + private final String targetPath; + + public LocalFileTransfer(final String targetPath) { + this.targetPath = targetPath; + } + + @Override + public void testConnection() throws Exception { + // Local mode does not need to test connection, just check if target directory exists + final File targetDir = new File(targetPath); + if (!targetDir.exists()) { + if (!targetDir.mkdirs()) { + throw new PipeException("Failed to create target directory: " + targetPath); + } + LOGGER.info("Created target directory: {}", targetPath); + } else if (!targetDir.isDirectory()) { + throw new PipeException("Target path is not a directory: " + targetPath); + } + LOGGER.debug("Local file transfer target directory verified: {}", targetPath); + } + + @Override + public void transferFile(final File localFile) throws Exception { + if (!localFile.exists()) { + throw new PipeException("Local file does not exist: " + localFile); + } + + if (localFile.isDirectory()) { + transferDirectory(localFile); + return; + } + + // Build target file path + final File targetFile = new File(targetPath, localFile.getName()); + + // Ensure target directory exists + final File targetDir = targetFile.getParentFile(); + if (targetDir != null && !targetDir.exists()) { + if (!targetDir.mkdirs()) { + throw new PipeException("Failed to create target directory: " + targetDir); + } + } + + // Copy file + try { + Files.copy(localFile.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + LOGGER.info("Copied file {} to {}", localFile, targetFile); + } catch (final IOException e) { + throw new PipeException( + String.format("Failed to copy file %s to %s: %s", localFile, targetFile, e.getMessage()), + e); + } + } + + @Override + public void transferDirectory(final File localDir) throws Exception { + if (!localDir.exists() || !localDir.isDirectory()) { + throw new PipeException("Local directory does not exist or is not a directory: " + localDir); + } + + // Build target directory path + final File targetDir = new File(targetPath, localDir.getName()); + + // Ensure target parent directory exists + final File targetParentDir = targetDir.getParentFile(); + if (targetParentDir != null && !targetParentDir.exists()) { + if (!targetParentDir.mkdirs()) { + throw new PipeException("Failed to create target parent directory: " + targetParentDir); + } + } + + // Recursively copy directory + try { + copyDirectoryRecursive(localDir, targetDir); + LOGGER.info("Copied directory {} to {}", localDir, targetDir); + } catch (final IOException e) { + throw new PipeException( + String.format( + "Failed to copy directory %s to %s: %s", localDir, targetDir, e.getMessage()), + e); + } + } + + /** + * Recursively copy directory + * + * @param sourceDir source directory + * @param targetDir target directory + * @throws IOException if copy fails + */ + private void copyDirectoryRecursive(final File sourceDir, final File targetDir) + throws IOException { + if (!targetDir.exists() && !targetDir.mkdirs()) { + throw new IOException("Failed to create target directory: " + targetDir); + } + + final File[] files = sourceDir.listFiles(); + if (files != null) { + for (final File file : files) { + final File targetFile = new File(targetDir, file.getName()); + if (file.isDirectory()) { + copyDirectoryRecursive(file, targetFile); + } else { + Files.copy(file.toPath(), targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + } + } + } + } + + @Override + public void close() throws Exception { + // Local mode does not need to close connection + LOGGER.debug("LocalFileTransfer closed"); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/ScpFileTransfer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/ScpFileTransfer.java new file mode 100644 index 000000000000..9b4961b441eb --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/ScpFileTransfer.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.tsfile; + +import org.apache.iotdb.pipe.api.exception.PipeException; + +import com.jcraft.jsch.Channel; +import com.jcraft.jsch.ChannelExec; +import com.jcraft.jsch.JSch; +import com.jcraft.jsch.JSchException; +import com.jcraft.jsch.Session; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Properties; + +/** ScpFileTransfer - Transfer files to remote server via SCP using JSch library */ +public class ScpFileTransfer implements FileTransfer { + + private static final Logger LOGGER = LoggerFactory.getLogger(ScpFileTransfer.class); + + private final String host; + private final int port; + private final String user; + private final String password; + private final String remotePath; + + private JSch jsch; + private Session session; + + public ScpFileTransfer( + final String host, + final int port, + final String user, + final String password, + final String remotePath) { + this.host = host; + this.port = port; + this.user = user; + this.password = password; + this.remotePath = remotePath; + this.jsch = new JSch(); + } + + /** Get or create SSH session */ + private Session getSession() throws JSchException { + if (session == null || !session.isConnected()) { + session = jsch.getSession(user, host, port); + session.setPassword(password); + + // Set SSH configuration + final Properties config = new Properties(); + config.put("StrictHostKeyChecking", "no"); + config.put("PreferredAuthentications", "password"); + session.setConfig(config); + + // Set connection timeout + session.setTimeout(10000); + + session.connect(); + LOGGER.debug("SSH session connected to {}@{}:{}", user, host, port); + } + return session; + } + + /** Test SCP connection */ + public void testConnection() throws Exception { + try { + final Session testSession = getSession(); + if (testSession.isConnected()) { + LOGGER.info("SCP connection test successful: {}@{}:{}", user, host, port); + } else { + throw new PipeException(String.format("Failed to connect to %s@%s:%d", user, host, port)); + } + } catch (final JSchException e) { + throw new PipeException( + String.format( + "Failed to test SCP connection to %s@%s:%d: %s", user, host, port, e.getMessage()), + e); + } + } + + /** + * Transfer file to remote server + * + * @param localFile local file + * @throws Exception throws exception when transfer fails + */ + public void transferFile(final File localFile) throws Exception { + if (!localFile.exists()) { + throw new PipeException("Local file does not exist: " + localFile); + } + + if (localFile.isDirectory()) { + transferDirectory(localFile); + return; + } + + final Session currentSession = getSession(); + Channel channel = null; + FileInputStream fileInputStream = null; + + try { + // Create SCP channel + final String command = String.format("scp -t %s", remotePath); + channel = currentSession.openChannel("exec"); + ((ChannelExec) channel).setCommand(command); + + // Get input/output streams + final InputStream inputStream = channel.getInputStream(); + final java.io.OutputStream outputStream = channel.getOutputStream(); + + channel.connect(); + + // Check if remote command succeeded + checkAck(inputStream); + + // Send file information + final long fileSize = localFile.length(); + final String fileCommand = String.format("C0644 %d %s\n", fileSize, localFile.getName()); + outputStream.write(fileCommand.getBytes()); + outputStream.flush(); + checkAck(inputStream); + + // Transfer file content + fileInputStream = new FileInputStream(localFile); + final byte[] buffer = new byte[1024]; + long totalBytes = 0; + while (totalBytes < fileSize) { + final int bytesRead = + fileInputStream.read(buffer, 0, (int) Math.min(buffer.length, fileSize - totalBytes)); + if (bytesRead < 0) { + break; + } + outputStream.write(buffer, 0, bytesRead); + totalBytes += bytesRead; + } + + // Send end marker + outputStream.write(0); + outputStream.flush(); + checkAck(inputStream); + + LOGGER.info( + "Successfully transferred file {} to {}@{}:{}", localFile, user, host, remotePath); + } catch (final Exception e) { + throw new PipeException( + String.format( + "Failed to transfer file %s to %s@%s:%s: %s", + localFile, user, host, remotePath, e.getMessage()), + e); + } finally { + if (fileInputStream != null) { + try { + fileInputStream.close(); + } catch (final IOException e) { + LOGGER.warn("Failed to close file input stream", e); + } + } + if (channel != null) { + channel.disconnect(); + } + } + } + + /** + * Transfer directory to remote server + * + * @param localDir local directory + * @throws Exception throws exception when transfer fails + */ + public void transferDirectory(final File localDir) throws Exception { + if (!localDir.exists() || !localDir.isDirectory()) { + throw new PipeException("Local directory does not exist or is not a directory: " + localDir); + } + + final Session currentSession = getSession(); + Channel channel = null; + + try { + // Create SCP channel for directory transfer + final String command = String.format("scp -r -t %s", remotePath); + channel = currentSession.openChannel("exec"); + ((ChannelExec) channel).setCommand(command); + + final InputStream inputStream = channel.getInputStream(); + final java.io.OutputStream outputStream = channel.getOutputStream(); + + channel.connect(); + checkAck(inputStream); + + // Recursively transfer directory + transferDirectoryRecursive(localDir, localDir.getName(), outputStream, inputStream); + + LOGGER.info( + "Successfully transferred directory {} to {}@{}:{}", localDir, user, host, remotePath); + } catch (final Exception e) { + throw new PipeException( + String.format( + "Failed to transfer directory %s to %s@%s:%s: %s", + localDir, user, host, remotePath, e.getMessage()), + e); + } finally { + if (channel != null) { + channel.disconnect(); + } + } + } + + /** Recursively transfer directory */ + private void transferDirectoryRecursive( + final File localDir, + final String remoteDirName, + final java.io.OutputStream outputStream, + final InputStream inputStream) + throws Exception { + // Create remote directory + final String dirCommand = String.format("D0755 0 %s\n", remoteDirName); + outputStream.write(dirCommand.getBytes()); + outputStream.flush(); + checkAck(inputStream); + + // Transfer files in directory + final File[] files = localDir.listFiles(); + if (files != null) { + for (final File file : files) { + if (file.isFile()) { + transferFileInDirectory(file, file.getName(), outputStream, inputStream); + } else if (file.isDirectory()) { + transferDirectoryRecursive(file, file.getName(), outputStream, inputStream); + } + } + } + + // End directory + outputStream.write("E\n".getBytes()); + outputStream.flush(); + checkAck(inputStream); + } + + /** Transfer single file in directory */ + private void transferFileInDirectory( + final File localFile, + final String remoteFileName, + final java.io.OutputStream outputStream, + final InputStream inputStream) + throws Exception { + FileInputStream fileInputStream = null; + try { + final long fileSize = localFile.length(); + final String fileCommand = String.format("C0644 %d %s\n", fileSize, remoteFileName); + outputStream.write(fileCommand.getBytes()); + outputStream.flush(); + checkAck(inputStream); + + fileInputStream = new FileInputStream(localFile); + final byte[] buffer = new byte[1024]; + long totalBytes = 0; + while (totalBytes < fileSize) { + final int bytesRead = + fileInputStream.read(buffer, 0, (int) Math.min(buffer.length, fileSize - totalBytes)); + if (bytesRead < 0) { + break; + } + outputStream.write(buffer, 0, bytesRead); + totalBytes += bytesRead; + } + + outputStream.write(0); + outputStream.flush(); + checkAck(inputStream); + } finally { + if (fileInputStream != null) { + try { + fileInputStream.close(); + } catch (final IOException e) { + LOGGER.warn("Failed to close file input stream", e); + } + } + } + } + + /** Check SCP command response */ + private void checkAck(final InputStream inputStream) throws IOException { + final int b = inputStream.read(); + if (b == 0) { + return; // Success + } + if (b == -1) { + throw new IOException("Unexpected end of stream"); + } + if (b == 1 || b == 2) { + // Error or warning + final StringBuilder errorMsg = new StringBuilder(); + int c; + while ((c = inputStream.read()) != '\n') { + errorMsg.append((char) c); + } + if (b == 1) { + throw new IOException("SCP error: " + errorMsg.toString()); + } else { + LOGGER.warn("SCP warning: {}", errorMsg.toString()); + } + } + } + + @Override + public void close() throws Exception { + if (session != null && session.isConnected()) { + session.disconnect(); + LOGGER.debug("SSH session disconnected"); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/TsFileSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/TsFileSink.java new file mode 100644 index 000000000000..6267dfa4010a --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/TsFileSink.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.tsfile; + +import org.apache.iotdb.pipe.api.PipeConnector; +import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; +import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; +import org.apache.iotdb.pipe.api.event.Event; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +/** + * TsFileSink - Export Pipe data as TSFile format + * + *

Supports two modes: + * + *

    + *
  • Local mode (local): Write directly to local directory + *
  • SCP mode (scp): Write to local temporary directory first, then transfer to remote server + * via SCP + *
+ * + *

Supports two event types: + * + *

    + *
  • TabletInsertionEvent: Build new TSFile, write InsertNode data + *
  • TsFileInsertionEvent: Copy existing TSFile to target directory + *
+ * + *

Object file handling: + * + *

    + *
  • Scan Object data paths in events + *
  • Hard link Object files to ${tsfilename}/ subdirectory in target directory + *
  • Maintain original relative path structure + *
+ */ +public class TsFileSink implements PipeConnector { + + private static final Logger LOGGER = LoggerFactory.getLogger(TsFileSink.class); + + // Configuration parameter keys + private static final String SINK_FILE_MODE_KEY = "sink.file-mode"; + private static final String SINK_TARGET_PATH_KEY = "sink.target-path"; + private static final String SINK_SCP_REMOTE_PATH_KEY = "sink.scp.remote-path"; + private static final String SINK_SCP_HOST_KEY = "sink.scp.host"; + private static final String SINK_SCP_PORT_KEY = "sink.scp.port"; + private static final String SINK_SCP_USER_KEY = "sink.scp.user"; + private static final String SINK_SCP_PASSWORD_KEY = "sink.scp.password"; + + // Default values + private static final String FILE_MODE_LOCAL = "local"; + private static final String FILE_MODE_SCP = "scp"; + private static final int DEFAULT_SCP_PORT = 22; + + // Runtime configuration + private String fileMode; + private String + targetPath; // Target path for local mode, or temporary path for SCP mode (Writer's target + // directory) + + // SCP configuration + private String scpRemotePath; + private String scpHost; + private int scpPort; + private String scpUser; + private String scpPassword; + + // Runtime environment + private String pipeName; + private long creationTime; + + // TSFile builder + private TsFileSinkWriter tsFileWriter; + + // File transfer + private FileTransfer fileTransfer; + + // SCP file transfer (for type conversion) + private ScpFileTransfer scpTransfer; + + @Override + public void validate(final PipeParameterValidator validator) throws Exception { + final PipeParameters parameters = validator.getParameters(); + + // Validate file-mode + final String mode = + parameters + .getStringOrDefault(Arrays.asList(SINK_FILE_MODE_KEY), FILE_MODE_LOCAL) + .toLowerCase(); + + if (!FILE_MODE_LOCAL.equals(mode) && !FILE_MODE_SCP.equals(mode)) { + throw new PipeException( + String.format("Invalid file-mode '%s'. Must be 'local' or 'scp'.", mode)); + } + + // Validate target path (required for both local and SCP modes) + validator.validateRequiredAttribute(SINK_TARGET_PATH_KEY); + + // Validate SCP mode parameters + if (FILE_MODE_SCP.equals(mode)) { + validator.validateRequiredAttribute(SINK_SCP_REMOTE_PATH_KEY); + validator.validateRequiredAttribute(SINK_SCP_HOST_KEY); + validator.validateRequiredAttribute(SINK_SCP_USER_KEY); + validator.validateRequiredAttribute(SINK_SCP_PASSWORD_KEY); + } + } + + @Override + public void customize( + final PipeParameters parameters, final PipeConnectorRuntimeConfiguration configuration) + throws Exception { + this.pipeName = configuration.getRuntimeEnvironment().getPipeName(); + this.creationTime = configuration.getRuntimeEnvironment().getCreationTime(); + + // Read configuration + this.fileMode = + parameters + .getStringOrDefault(Arrays.asList(SINK_FILE_MODE_KEY), FILE_MODE_LOCAL) + .toLowerCase(); + + // Read target path (target path for local mode, or temporary path for SCP mode) + this.targetPath = parameters.getString(SINK_TARGET_PATH_KEY); + ensureDirectoryExists(targetPath); + + if (FILE_MODE_LOCAL.equals(fileMode)) { + // Local mode + LOGGER.info("TsFileSink configured in LOCAL mode, target path: {}", targetPath); + } else { + // SCP mode + this.scpRemotePath = parameters.getString(SINK_SCP_REMOTE_PATH_KEY); + this.scpHost = parameters.getString(SINK_SCP_HOST_KEY); + this.scpPort = parameters.getIntOrDefault(Arrays.asList(SINK_SCP_PORT_KEY), DEFAULT_SCP_PORT); + this.scpUser = parameters.getString(SINK_SCP_USER_KEY); + this.scpPassword = parameters.getString(SINK_SCP_PASSWORD_KEY); + + LOGGER.info( + "TsFileSink configured in SCP mode, local tmp: {}, remote: {}@{}:{}", + targetPath, + scpUser, + scpHost, + scpRemotePath); + } + + // Initialize TSFile writer (use targetPath as target directory) + this.tsFileWriter = new TsFileSinkWriter(targetPath, pipeName, creationTime); + + // Initialize file transfer + if (FILE_MODE_LOCAL.equals(fileMode)) { + // Local mode: use local file transfer + this.fileTransfer = new LocalFileTransfer(targetPath); + } else { + // SCP mode: use SCP transfer + this.scpTransfer = new ScpFileTransfer(scpHost, scpPort, scpUser, scpPassword, scpRemotePath); + this.fileTransfer = scpTransfer; + } + } + + @Override + public void handshake() throws Exception { + // Test file transfer connection + if (fileTransfer != null) { + fileTransfer.testConnection(); + LOGGER.info("File transfer connection test successful"); + } + } + + @Override + public void heartbeat() throws Exception { + // Check file transfer connection + if (fileTransfer != null) { + fileTransfer.testConnection(); + } + } + + @Override + public void transfer(final TabletInsertionEvent tabletInsertionEvent) throws Exception { + // Handle TabletInsertionEvent: build new TSFile + tsFileWriter.writeTablet(tabletInsertionEvent); + + // Check if flush is needed and transfer files + final List generatedFiles = tsFileWriter.flushTabletsIfNeeded(); + if (!generatedFiles.isEmpty() && fileTransfer != null) { + transferFiles(generatedFiles); + } + } + + @Override + public void transfer(final TsFileInsertionEvent tsFileInsertionEvent) throws Exception { + // Handle TsFileInsertionEvent: copy TSFile + final File copiedFile = tsFileWriter.copyTsFile(tsFileInsertionEvent); + + // Transfer files + if (copiedFile != null && fileTransfer != null) { + transferFiles(Collections.singletonList(copiedFile)); + } + } + + /** Transfer files (including TSFile and related Object files) */ + private void transferFiles(final List tsFiles) throws Exception { + for (final File tsFile : tsFiles) { + // Transfer TSFile + fileTransfer.transferFile(tsFile); + + // Transfer Tablet Object directory (if exists) + final File tabletObjectDir = new File(tsFileWriter.getTargetDirectory(), "tablet_objects"); + if (tabletObjectDir.exists() && tabletObjectDir.isDirectory()) { + fileTransfer.transferDirectory(tabletObjectDir); + LOGGER.info("Transferred tablet object directory"); + } + + // Transfer TsFile Object directory (if exists) + final String tsFileName = tsFile.getName(); + final String tsFileNameWithoutSuffix = tsFileName.replace(".tsfile", ""); + final File tsFileObjectDir = + new File(tsFileWriter.getTargetDirectory(), tsFileNameWithoutSuffix); + + if (tsFileObjectDir.exists() && tsFileObjectDir.isDirectory()) { + fileTransfer.transferDirectory(tsFileObjectDir); + LOGGER.info("Transferred tsfile object directory {}", tsFileObjectDir.getName()); + } + } + } + + @Override + public void transfer(final Event event) throws Exception { + if (event instanceof TabletInsertionEvent) { + transfer((TabletInsertionEvent) event); + } else if (event instanceof TsFileInsertionEvent) { + transfer((TsFileInsertionEvent) event); + } else { + LOGGER.warn("Unsupported event type: {}", event.getClass().getName()); + } + } + + @Override + public void close() throws Exception { + try { + // Flush remaining Tablets and transfer + if (tsFileWriter != null) { + final List remainingFiles = tsFileWriter.flushTablets(); + if (!remainingFiles.isEmpty() && fileTransfer != null) { + transferFiles(remainingFiles); + } + tsFileWriter.close(); + } + } finally { + // Close file transfer + if (fileTransfer != null) { + fileTransfer.close(); + } + } + + LOGGER.info("TsFileSink closed for pipe: {}", pipeName); + } + + /** Ensure directory exists */ + private void ensureDirectoryExists(final String path) throws PipeException { + final File dir = new File(path); + if (!dir.exists()) { + if (!dir.mkdirs()) { + throw new PipeException("Failed to create directory: " + path); + } + } + if (!dir.isDirectory()) { + throw new PipeException("Path is not a directory: " + path); + } + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/TsFileSinkWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/TsFileSinkWriter.java new file mode 100644 index 000000000000..1e216db4d774 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/tsfile/TsFileSinkWriter.java @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.protocol.tsfile; + +import org.apache.iotdb.commons.pipe.event.EnrichedEvent; +import org.apache.iotdb.commons.utils.FileUtils; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; +import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager; +import org.apache.iotdb.db.pipe.sink.util.builder.PipeTableModelTsFileBuilderV2; +import org.apache.iotdb.db.pipe.sink.util.builder.PipeTreeModelTsFileBuilderV2; +import org.apache.iotdb.db.pipe.sink.util.builder.PipeTsFileBuilder; +import org.apache.iotdb.db.pipe.sink.util.builder.PipeTsFileIdGenerator; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; +import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; +import org.apache.iotdb.pipe.api.exception.PipeException; + +import org.apache.tsfile.common.constant.TsFileConstant; +import org.apache.tsfile.utils.Pair; +import org.apache.tsfile.write.record.Tablet; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; + +/** + * TsFileSinkWriter - Responsible for writing event data to TSFile and handling Object files + * + *

Features: + * + *

    + *
  • Use PipeTsFileBuilder to cache TabletInsertionEvent and batch write to TSFile + *
  • Copy TSFile corresponding to TsFileInsertionEvent + *
  • Handle hard links for Object files + *
+ */ +public class TsFileSinkWriter implements AutoCloseable { + + private static final Logger LOGGER = LoggerFactory.getLogger(TsFileSinkWriter.class); + + private static final int TABLET_BUFFER_SIZE = 1000; // Number of Tablets to buffer before flushing + + private final String targetDirectory; + private final String pipeName; + private final long creationTime; + + // TSFile Builders + private final AtomicLong currentBatchId = new AtomicLong(PipeTsFileIdGenerator.getNextBatchId()); + private final AtomicLong tsFileIdGenerator = new AtomicLong(0); + private PipeTsFileBuilder treeModelBuilder; + private PipeTsFileBuilder tableModelBuilder; + + private int bufferedTabletCount = 0; + + public TsFileSinkWriter( + final String targetDirectory, final String pipeName, final long creationTime) { + this.targetDirectory = targetDirectory; + this.pipeName = pipeName; + this.creationTime = creationTime; + + // Initialize Builders + this.treeModelBuilder = new PipeTreeModelTsFileBuilderV2(currentBatchId, tsFileIdGenerator); + this.tableModelBuilder = new PipeTableModelTsFileBuilderV2(currentBatchId, tsFileIdGenerator); + } + + /** Write TabletInsertionEvent */ + public synchronized void writeTablet(final TabletInsertionEvent event) throws Exception { + if (event instanceof PipeInsertNodeTabletInsertionEvent) { + writeInsertNodeTablet((PipeInsertNodeTabletInsertionEvent) event); + } else if (event instanceof PipeRawTabletInsertionEvent) { + writeRawTablet((PipeRawTabletInsertionEvent) event); + } else { + LOGGER.warn("Unsupported TabletInsertionEvent type: {}", event.getClass().getName()); + } + } + + /** Write PipeInsertNodeTabletInsertionEvent */ + private void writeInsertNodeTablet(final PipeInsertNodeTabletInsertionEvent event) + throws Exception { + // Immediately handle Object file hard links + handleObjectFilesForTabletEvent(event); + + // Convert to Tablet list + final List tablets = event.convertToTablets(); + if (tablets == null || tablets.isEmpty()) { + return; + } + + // Process each Tablet + for (int i = 0; i < tablets.size(); i++) { + final Tablet tablet = tablets.get(i); + if (tablet == null || tablet.getRowSize() == 0) { + continue; + } + + // Select Builder based on event type + if (event.isTableModelEvent()) { + // Table model + tableModelBuilder.bufferTableModelTablet( + event.getSourceDatabaseNameFromDataRegion(), tablet); + } else { + // Tree model + treeModelBuilder.bufferTreeModelTablet(tablet, event.isAligned(i)); + } + + bufferedTabletCount++; + } + } + + /** Write PipeRawTabletInsertionEvent */ + private void writeRawTablet(final PipeRawTabletInsertionEvent event) throws Exception { + // Immediately handle Object file hard links + handleObjectFilesForTabletEvent(event); + + // Get Tablet from event + final Tablet tablet = event.convertToTablet(); + if (tablet == null || tablet.getRowSize() == 0) { + return; + } + + // Select Builder based on event type + if (event.isTableModelEvent()) { + // Table model + tableModelBuilder.bufferTableModelTablet(event.getSourceDatabaseNameFromDataRegion(), tablet); + } else { + // Tree model + treeModelBuilder.bufferTreeModelTablet(tablet, event.isAligned()); + } + + bufferedTabletCount++; + } + + /** + * Flush buffered Tablets to TSFile + * + * @return list of generated TSFiles + */ + public List flushTablets() throws Exception { + final List generatedFiles = new ArrayList<>(); + + if (bufferedTabletCount == 0) { + return generatedFiles; + } + + try { + // Flush Tree model Tablets + if (!treeModelBuilder.isEmpty()) { + final List> sealedFiles = + treeModelBuilder.convertTabletToTsFileWithDBInfo(); + + for (final Pair pair : sealedFiles) { + final File tsFile = pair.getRight(); + // Generate final file name (using global generator to ensure uniqueness) + final String finalFileName = generateFinalTsFileName(tsFile.getName()); + final File targetFile = new File(targetDirectory, finalFileName); + + // If target file exists, generate new file name with sequence number + File actualTargetFile = targetFile; + long sequence = 0; + while (actualTargetFile.exists()) { + sequence++; + final String nameWithoutSuffix = + finalFileName.replace(TsFileConstant.TSFILE_SUFFIX, ""); + final String newFileName = + nameWithoutSuffix + "_" + sequence + TsFileConstant.TSFILE_SUFFIX; + actualTargetFile = new File(targetDirectory, newFileName); + } + + // Move file to target directory + Files.move( + tsFile.toPath(), actualTargetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + + LOGGER.info("Created TSFile: {}", actualTargetFile.getAbsolutePath()); + generatedFiles.add(actualTargetFile); + } + + treeModelBuilder.onSuccess(); + } + + // Flush Table model Tablets + if (!tableModelBuilder.isEmpty()) { + final List> sealedFiles = + tableModelBuilder.convertTabletToTsFileWithDBInfo(); + + for (final Pair pair : sealedFiles) { + final File tsFile = pair.getRight(); + // Generate final file name (using global generator to ensure uniqueness) + final String finalFileName = generateFinalTsFileName(tsFile.getName()); + final File targetFile = new File(targetDirectory, finalFileName); + + // If target file exists, generate new file name with sequence number + File actualTargetFile = targetFile; + long sequence = 0; + while (actualTargetFile.exists()) { + sequence++; + final String nameWithoutSuffix = + finalFileName.replace(TsFileConstant.TSFILE_SUFFIX, ""); + final String newFileName = + nameWithoutSuffix + "_" + sequence + TsFileConstant.TSFILE_SUFFIX; + actualTargetFile = new File(targetDirectory, newFileName); + } + + // Move file to target directory + Files.move( + tsFile.toPath(), actualTargetFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + + LOGGER.info("Created TSFile: {}", actualTargetFile.getAbsolutePath()); + generatedFiles.add(actualTargetFile); + } + + tableModelBuilder.onSuccess(); + } + + } finally { + // Reset counter + bufferedTabletCount = 0; + + // Recreate Builders (using new BatchID) + currentBatchId.set(PipeTsFileIdGenerator.getNextBatchId()); + treeModelBuilder = new PipeTreeModelTsFileBuilderV2(currentBatchId, tsFileIdGenerator); + tableModelBuilder = new PipeTableModelTsFileBuilderV2(currentBatchId, tsFileIdGenerator); + } + + return generatedFiles; + } + + /** + * Check if flush is needed, flush if needed and return generated file list + * + * @return list of generated files, empty list if flush is not needed + * @throws Exception if flush fails + */ + public List flushTabletsIfNeeded() throws Exception { + if (bufferedTabletCount >= TABLET_BUFFER_SIZE) { + return flushTablets(); + } + return Collections.emptyList(); + } + + /** + * Handle Object file hard links for TabletInsertionEvent Process immediately when data is + * captured + */ + private void handleObjectFilesForTabletEvent(final EnrichedEvent event) throws Exception { + // First check if there is Object data + if (!event.hasObjectData()) { + return; + } + + // Scan Object data + event.scanForObjectData(); + + final String[] objectPaths = event.getObjectPaths(); + if (objectPaths == null || objectPaths.length == 0) { + return; + } + + // Get TsFileResource + final TsFileResource tsFileResource = (TsFileResource) event.getTsFileResource(); + if (tsFileResource == null || tsFileResource.getTsFile() == null) { + LOGGER.warn("TsFileResource is null, cannot process object files"); + return; + } + + // Create Object directory: ${targetDirectory}/tablet_objects/ + // Use unified directory to store Tablet Object files + final File objectDir = new File(targetDirectory, "tablet_objects"); + if (!objectDir.exists() && !objectDir.mkdirs()) { + throw new PipeException("Failed to create object directory: " + objectDir); + } + + // Get pipeName (from event) + final String pipeName = event.getPipeName(); + + // Create hard links for each Object file + int linkedCount = 0; + for (final String relativePath : objectPaths) { + if (relativePath == null || relativePath.isEmpty()) { + continue; + } + + try { + // Get Object file hard link File object through PipeObjectResourceManager + final File hardlinkSourceFile = + PipeDataNodeResourceManager.object() + .getObjectFileHardlink(tsFileResource, relativePath, pipeName); + + if (hardlinkSourceFile == null || !hardlinkSourceFile.exists()) { + LOGGER.warn("Hardlink source file does not exist for relative path: {}", relativePath); + continue; + } + + // Target Object file path (maintain relative path) + final File targetObjectFile = new File(objectDir, relativePath); + + // Ensure parent directory exists + final File targetObjectFileParent = targetObjectFile.getParentFile(); + if (targetObjectFileParent != null && !targetObjectFileParent.exists()) { + targetObjectFileParent.mkdirs(); + } + + // Create hard link (skip if already exists) + if (!targetObjectFile.exists()) { + FileUtils.createHardLink(hardlinkSourceFile, targetObjectFile); + linkedCount++; + LOGGER.debug("Linked object file: {} -> {}", hardlinkSourceFile, targetObjectFile); + } + } catch (final Exception e) { + LOGGER.warn("Failed to link object file {}: {}", relativePath, e.getMessage(), e); + } + } + + if (linkedCount > 0) { + LOGGER.info( + "Linked {} object files for TabletInsertionEvent to directory: {}", + linkedCount, + objectDir.getAbsolutePath()); + } + } + + /** + * Copy TSFile corresponding to TsFileInsertionEvent + * + * @return copied TSFile, or null if failed + */ + public synchronized File copyTsFile(final TsFileInsertionEvent event) throws Exception { + if (!(event instanceof PipeTsFileInsertionEvent)) { + LOGGER.warn("Unsupported TsFileInsertionEvent type: {}", event.getClass().getName()); + return null; + } + + final PipeTsFileInsertionEvent tsFileEvent = (PipeTsFileInsertionEvent) event; + final File sourceTsFile = tsFileEvent.getTsFile(); + + if (sourceTsFile == null || !sourceTsFile.exists()) { + LOGGER.warn("Source TSFile does not exist: {}", sourceTsFile); + return null; + } + + // First flush currently buffered Tablets + if (bufferedTabletCount > 0) { + flushTablets(); + } + + // Generate target file name + final String targetFileName = sourceTsFile.getName(); + final File targetTsFile = new File(targetDirectory, targetFileName); + + // Copy TSFile + Files.copy(sourceTsFile.toPath(), targetTsFile.toPath(), StandardCopyOption.REPLACE_EXISTING); + + LOGGER.info("Copied TSFile from {} to {}", sourceTsFile, targetTsFile); + + // Handle Object files + handleObjectFilesForTsFile(tsFileEvent, targetTsFile); + + return targetTsFile; + } + + /** Get target directory */ + public String getTargetDirectory() { + return targetDirectory; + } + + /** + * Generate final TSFile file name + * + *

If original file name is a temporary file name (tb_*), keep original file name Otherwise use + * original file name + * + * @param originalFileName original file name + * @return final file name + */ + private String generateFinalTsFileName(final String originalFileName) { + // If already in temporary file name format, keep as is + if (PipeTsFileIdGenerator.isValidTempFileName(originalFileName)) { + return originalFileName; + } + // Otherwise keep original file name + return originalFileName; + } + + /** Handle Object file hard links for TSFile */ + private void handleObjectFilesForTsFile( + final PipeTsFileInsertionEvent event, final File targetTsFile) throws Exception { + // Scan Object data + event.scanForObjectData(); + + final String[] objectPaths = event.getObjectPaths(); + if (objectPaths == null || objectPaths.length == 0) { + LOGGER.debug("No object files to link for TSFile: {}", targetTsFile.getName()); + return; + } + + // Create Object directory: ${targetDirectory}/${tsfilename}/ + final String tsFileName = targetTsFile.getName(); + final String tsFileNameWithoutSuffix = tsFileName.replace(TsFileConstant.TSFILE_SUFFIX, ""); + final File objectDir = new File(targetDirectory, tsFileNameWithoutSuffix); + + if (!objectDir.exists() && !objectDir.mkdirs()) { + throw new PipeException("Failed to create object directory: " + objectDir); + } + + // Get source TSFile directory + final File sourceTsFile = event.getTsFile(); + final File sourceTsFileDir = sourceTsFile.getParentFile(); + + // Create hard links for each Object file + int linkedCount = 0; + for (final String relativePath : objectPaths) { + if (relativePath == null || relativePath.isEmpty()) { + continue; + } + + try { + // Source Object file path (relative to source TSFile directory) + final File sourceObjectFile = new File(sourceTsFileDir, relativePath); + if (!sourceObjectFile.exists()) { + LOGGER.warn("Source object file does not exist: {}", sourceObjectFile); + continue; + } + + // Target Object file path (maintain relative path) + final File targetObjectFile = new File(objectDir, relativePath); + + // Ensure parent directory exists + final File targetObjectFileParent = targetObjectFile.getParentFile(); + if (targetObjectFileParent != null && !targetObjectFileParent.exists()) { + targetObjectFileParent.mkdirs(); + } + + // Create hard link + FileUtils.createHardLink(sourceObjectFile, targetObjectFile); + linkedCount++; + + LOGGER.debug("Linked object file: {} -> {}", sourceObjectFile, targetObjectFile); + } catch (final Exception e) { + LOGGER.warn("Failed to link object file {}: {}", relativePath, e.getMessage(), e); + } + } + + LOGGER.info( + "Linked {} object files for TSFile: {} to directory: {}", + linkedCount, + tsFileName, + objectDir.getAbsolutePath()); + } + + @Override + public synchronized void close() throws Exception { + try { + // Flush remaining Tablets + if (bufferedTabletCount > 0) { + flushTablets(); + } + } finally { + // Close Builders + if (treeModelBuilder != null) { + treeModelBuilder.close(); + } + if (tableModelBuilder != null) { + tableModelBuilder.close(); + } + } + + LOGGER.info("TsFileSinkWriter closed"); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileIdGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileIdGenerator.java new file mode 100644 index 000000000000..079fb05beb38 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTsFileIdGenerator.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.sink.util.builder; + +import org.apache.tsfile.common.constant.TsFileConstant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.concurrent.atomic.AtomicLong; + +/** + * PipeTsFileIdGenerator - Global TSFile ID Generator + * + *

Provides unified BatchID and TSFile ID allocation, as well as file name generation rules + */ +public class PipeTsFileIdGenerator { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileIdGenerator.class); + + // Global BatchID generator + private static final AtomicLong GLOBAL_BATCH_ID_GENERATOR = new AtomicLong(0); + + // Global TSFile ID generator + private static final AtomicLong GLOBAL_TSFILE_ID_GENERATOR = new AtomicLong(0); + + // TSFile file name prefix + private static final String TS_FILE_PREFIX = "tb"; // tb means tablet batch + + /** + * Get the next BatchID + * + * @return the next BatchID + */ + public static long getNextBatchId() { + return GLOBAL_BATCH_ID_GENERATOR.incrementAndGet(); + } + + /** + * Get the next TSFile ID + * + * @return the next TSFile ID + */ + public static long getNextTsFileId() { + return GLOBAL_TSFILE_ID_GENERATOR.incrementAndGet(); + } + + /** + * Generate temporary TSFile file name + * + *

File name format: tb_{dataNodeId}_{batchId}_{tsFileId}.tsfile + * + * @param dataNodeId DataNode ID + * @param batchId Batch ID + * @param tsFileId TSFile ID + * @return temporary TSFile file name + */ + public static String generateTempTsFileName( + final int dataNodeId, final long batchId, final long tsFileId) { + return TS_FILE_PREFIX + + "_" + + dataNodeId + + "_" + + batchId + + "_" + + tsFileId + + TsFileConstant.TSFILE_SUFFIX; + } + + /** + * Generate temporary TSFile file path + * + * @param baseDir base directory + * @param dataNodeId DataNode ID + * @param batchId Batch ID + * @param tsFileId TSFile ID + * @return temporary TSFile file path + */ + public static File generateTempTsFilePath( + final File baseDir, final int dataNodeId, final long batchId, final long tsFileId) { + return new File(baseDir, generateTempTsFileName(dataNodeId, batchId, tsFileId)); + } + + /** + * Parse BatchID from temporary file name + * + * @param fileName temporary file name (format: tb_{dataNodeId}_{batchId}_{tsFileId}.tsfile) + * @return BatchID, or -1 if parsing fails + */ + public static long parseBatchIdFromTempFileName(final String fileName) { + try { + if (fileName == null || !fileName.startsWith(TS_FILE_PREFIX + "_")) { + return -1; + } + final String[] parts = fileName.substring(TS_FILE_PREFIX.length() + 1).split("_"); + if (parts.length >= 2) { + return Long.parseLong(parts[1]); + } + } catch (final Exception e) { + LOGGER.warn("Failed to parse batch ID from file name: {}", fileName, e); + } + return -1; + } + + /** + * Parse TSFile ID from temporary file name + * + * @param fileName temporary file name (format: tb_{dataNodeId}_{batchId}_{tsFileId}.tsfile) + * @return TSFile ID, or -1 if parsing fails + */ + public static long parseTsFileIdFromTempFileName(final String fileName) { + try { + if (fileName == null || !fileName.startsWith(TS_FILE_PREFIX + "_")) { + return -1; + } + final String[] parts = fileName.substring(TS_FILE_PREFIX.length() + 1).split("_"); + if (parts.length >= 3) { + final String tsFileIdPart = parts[2]; + // Remove .tsfile suffix + final int suffixIndex = tsFileIdPart.indexOf(TsFileConstant.TSFILE_SUFFIX); + if (suffixIndex > 0) { + return Long.parseLong(tsFileIdPart.substring(0, suffixIndex)); + } + return Long.parseLong(tsFileIdPart); + } + } catch (final Exception e) { + LOGGER.warn("Failed to parse TSFile ID from file name: {}", fileName, e); + } + return -1; + } + + /** + * Generate final TSFile file name (for renaming) + * + *

Can customize naming rules as needed, for example: - Keep original file name - Use + * timestamp: {timestamp}.tsfile - Use sequence number: tsfile_{sequence}.tsfile + * + * @param originalFileName original temporary file name + * @param targetDirectory target directory + * @param sequenceNumber sequence number (optional, for generating unique file names) + * @return final TSFile file name + */ + public static String generateFinalTsFileName( + final String originalFileName, final File targetDirectory, final long sequenceNumber) { + // Default to keep original file name, add sequence number if file already exists in target + // directory + final String baseName = originalFileName != null ? originalFileName : "tsfile"; + final File targetFile = new File(targetDirectory, baseName); + + if (targetFile.exists() && sequenceNumber > 0) { + // If file exists, add sequence number + final String nameWithoutSuffix = baseName.replace(TsFileConstant.TSFILE_SUFFIX, ""); + return nameWithoutSuffix + "_" + sequenceNumber + TsFileConstant.TSFILE_SUFFIX; + } + + return baseName; + } + + /** + * Validate if file name conforms to temporary file naming rules + * + * @param fileName file name + * @return true if conforms to rules, false otherwise + */ + public static boolean isValidTempFileName(final String fileName) { + if (fileName == null || !fileName.endsWith(TsFileConstant.TSFILE_SUFFIX)) { + return false; + } + return fileName.startsWith(TS_FILE_PREFIX + "_"); + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 8ea973fbf0a3..ac3bc7851cec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -52,6 +52,7 @@ import java.io.Closeable; import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; public class PipeDataRegionAssigner implements Closeable { @@ -72,6 +73,9 @@ public class PipeDataRegionAssigner implements Closeable { private final PipeEventCounter eventCounter = new PipeDataRegionEventCounter(); + /** Track whether this data region has Object type write operations */ + public final AtomicBoolean hasObjectData = new AtomicBoolean(false); + public String getDataRegionId() { return dataRegionId; } @@ -162,6 +166,9 @@ private void assignToExtractor( return; } + event.setHasObject(hasObjectData.get()); + event.scanForObjectData(); + final PipeRealtimeEvent copiedEvent = event.shallowCopySelfAndBindPipeTaskMetaForProgressReport( extractor.getPipeName(), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index 8c49dd0a3dd8..a4f2b0cd71a2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -34,6 +34,7 @@ import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** @@ -54,13 +55,25 @@ public class PipeInsertionDataNodeListener { private final AtomicInteger listenToTsFileExtractorCount = new AtomicInteger(0); private final AtomicInteger listenToInsertNodeExtractorCount = new AtomicInteger(0); + // Independent tracking for Object type write operations + // This map has independent lifecycle from assigner + private final ConcurrentMap dataRegionId2HasObjectWrite = + new ConcurrentHashMap<>(); + //////////////////////////// start & stop //////////////////////////// public synchronized void startListenAndAssign( final String dataRegionId, final PipeRealtimeDataRegionSource extractor) { - dataRegionId2Assigner - .computeIfAbsent(dataRegionId, o -> new PipeDataRegionAssigner(dataRegionId)) - .startAssignTo(extractor); + final PipeDataRegionAssigner assigner = + dataRegionId2Assigner.computeIfAbsent( + dataRegionId, k -> new PipeDataRegionAssigner(dataRegionId)); + assigner.startAssignTo(extractor); + + // Sync Object write flag from independent tracking to assigner + final AtomicBoolean hasObjectWrite = dataRegionId2HasObjectWrite.get(dataRegionId); + if (hasObjectWrite != null && hasObjectWrite.get()) { + assigner.hasObjectData.set(true); + } if (extractor.isNeedListenToTsFile()) { listenToTsFileExtractorCount.incrementAndGet(); @@ -117,6 +130,25 @@ public void listenToTsFile( assigner.isTableModel(), databaseName, tsFileResource, isLoaded)); } + /** + * Listen to Object type write operations. Mark that this data region has Object type write + * operations. Use independent map to track Object write even before assigner is created. + * + * @param dataRegionId the data region id + */ + public void listenToObjectNode(final String dataRegionId) { + // Track Object write independently + dataRegionId2HasObjectWrite + .computeIfAbsent(dataRegionId, k -> new AtomicBoolean(false)) + .set(true); + + // Also mark assigner if it exists + final PipeDataRegionAssigner assigner = dataRegionId2Assigner.get(dataRegionId); + if (assigner != null) { + assigner.hasObjectData.set(true); + } + } + public void listenToInsertNode( final String dataRegionId, final String databaseName, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 1124e33a7df9..e2983f320248 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -3612,6 +3612,7 @@ public void writeObject(ObjectNode objectNode) throws Exception { } FileMetrics.getInstance().increaseObjectFileNum(1); FileMetrics.getInstance().increaseObjectFileSize(objectFile.length()); + PipeInsertionDataNodeListener.getInstance().listenToObjectNode(dataRegionIdString); } getWALNode() .ifPresent(walNode -> walNode.log(TsFileProcessor.MEMTABLE_NOT_EXIST, objectNode)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java index a011490ca497..75b5f83ffe7d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTableStatementDataTypeConvertExecutionVisitor.java @@ -85,6 +85,7 @@ public Optional visitLoadTsFile( null, "root", null, + true, true)) { for (final TabletInsertionEvent tabletInsertionEvent : parser.toTabletInsertionEvents()) { if (!(tabletInsertionEvent instanceof PipeRawTabletInsertionEvent)) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java index 226966454aaa..f26a28f8903c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java @@ -97,6 +97,7 @@ public Optional visitLoadFile( Long.MAX_VALUE, null, null, + true, true)) { for (final Pair tabletWithIsAligned : parser.toTabletWithIsAligneds()) { final PipeTransferTabletRawReq tabletRawReq = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 7bfde3b158d5..bc466cfc31f5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -584,7 +584,7 @@ private void testTsFilePointNum( ? new TsFileInsertionEventQueryParser( tsFile, pattern, startTime, endTime, tsFileInsertionEvent) : new TsFileInsertionEventScanParser( - tsFile, pattern, startTime, endTime, null, tsFileInsertionEvent, false)) { + tsFile, pattern, startTime, endTime, null, tsFileInsertionEvent, false, true)) { final AtomicInteger count1 = new AtomicInteger(0); final AtomicInteger count2 = new AtomicInteger(0); final AtomicInteger count3 = new AtomicInteger(0); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index 74bc0d9815aa..242499e68235 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -104,6 +104,9 @@ public enum BuiltinPipePlugin { WRITE_BACK_SINK("write-back-sink", WriteBackSink.class), SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class), PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", PipeConsensusAsyncSink.class), + + // TsFile export sink (datanode only, placeholder class here) + TSFILE_SINK("tsfile-sink", DoNothingSink.class), ; private final String pipePluginName; diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java index afe0ebe22df5..f90a119ce227 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java @@ -488,6 +488,70 @@ public boolean isReleased() { return isReleased.get(); } + /** + * Scan and detect if the event contains any Object type (BLOB or STRING) data. This method should + * be called before {@link #hasObjectData()} and {@link #getObjectPaths()}. Default implementation + * does nothing. Only events that may contain Object types need to override this. + */ + public void scanForObjectData() { + // Default implementation does nothing + } + + /** + * Set whether the event has Object type data manually. This can be used to manually mark Object + * data without scanning. Default implementation does nothing. Only events that may contain Object + * types need to override this. + * + * @param hasObject whether the event has Object type data + */ + public void setHasObject(boolean hasObject) { + // Default implementation does nothing + } + + /** + * Check if the event contains any Object type (BLOB or STRING) data. Default implementation + * returns false. Only events that may contain Object types need to override this. + * + * @return true if the event contains Object type data, false otherwise + */ + public boolean hasObjectData() { + return false; + } + + /** + * Get the paths of measurements that contain Object type (BLOB or STRING) data. Default + * implementation returns an empty array. Only events that may contain Object types need to + * override this. + * + * @return array of measurement names that contain Object type data, empty array if none + */ + public String[] getObjectPaths() { + return new String[0]; + } + + /////////////////////////// Object Resource Management /////////////////////////// + + /** + * Get the TsFileResource associated with this event. Default implementation returns null. Only + * events that are associated with a TsFile need to override this. + * + * @return TsFileResource if this event is associated with a TsFile, null otherwise + */ + public Object getTsFileResource() { + // Default implementation returns null + return null; + } + + /** + * Set the TsFileResource for this event. Default implementation does nothing. Only events that + * need to store TsFileResource need to override this. + * + * @param tsFileResource the TsFileResource to set + */ + public void setTsFileResource(Object tsFileResource) { + // Default implementation does nothing + } + /** * Used for pipeConsensus. In PipeConsensus, we only need committerKey, commitId and rebootTimes * to uniquely identify an event