Skip to content

Commit 13b0582

Browse files
authored
Pipe: Modify the TableRawReq deserialization method to support directconversion to TableStatement. (#16844)
* Pipe: Modify the TableRawReq deserialization method to support direct conversion to TableStatement. * fix * fix * fix * fix * fix * update * update * update * refactor: optimize TabletStatementConverter according to code review - Optimize times array copy: skip copy when lengths are equal, use System.arraycopy - Add warning logs when times array is null or too small - Ensure all arrays (values, times, bitMaps) are copied to rowSize length for immutability - Filter out null columns when converting Statement to Tablet - Rename idColumnIndices to tagColumnIndices - Add skipString method to avoid constructing temporary objects - Add comments explaining skipped fields in readMeasurement - Use direct buffer position increment instead of reading bytes for skipping - Ensure all column values are copied to ensure immutability * update * update
1 parent bc4f8e9 commit 13b0582

15 files changed

+2257
-84
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333

3434
import org.apache.tsfile.utils.PublicBAOS;
3535
import org.apache.tsfile.utils.ReadWriteIOUtils;
36-
import org.apache.tsfile.write.record.Tablet;
3736

3837
import java.io.DataOutputStream;
3938
import java.io.IOException;
@@ -247,11 +246,7 @@ public static PipeTransferTabletBatchReqV2 fromTPipeTransferReq(
247246

248247
size = ReadWriteIOUtils.readInt(transferReq.body);
249248
for (int i = 0; i < size; ++i) {
250-
batchReq.tabletReqs.add(
251-
PipeTransferTabletRawReqV2.toTPipeTransferRawReq(
252-
Tablet.deserialize(transferReq.body),
253-
ReadWriteIOUtils.readBool(transferReq.body),
254-
ReadWriteIOUtils.readString(transferReq.body)));
249+
batchReq.tabletReqs.add(PipeTransferTabletRawReqV2.toTPipeTransferRawReq(transferReq.body));
255250
}
256251

257252
batchReq.version = transferReq.version;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java

Lines changed: 99 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.commons.exception.MetadataException;
2323
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
2424
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
25+
import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter;
2526
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
2627
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
2728
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -43,27 +44,55 @@ public class PipeTransferTabletRawReq extends TPipeTransferReq {
4344

4445
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTransferTabletRawReq.class);
4546

46-
protected transient Tablet tablet;
47+
protected transient InsertTabletStatement statement;
48+
4749
protected transient boolean isAligned;
50+
protected transient Tablet tablet;
4851

52+
/**
53+
* Get Tablet. If tablet is null, convert from statement.
54+
*
55+
* @return Tablet object
56+
*/
4957
public Tablet getTablet() {
58+
if (tablet == null && statement != null) {
59+
try {
60+
tablet = statement.convertToTablet();
61+
} catch (final MetadataException e) {
62+
LOGGER.warn("Failed to convert statement to tablet.", e);
63+
return null;
64+
}
65+
}
5066
return tablet;
5167
}
5268

5369
public boolean getIsAligned() {
5470
return isAligned;
5571
}
5672

73+
/**
74+
* Construct Statement. If statement already exists, return it. Otherwise, convert from tablet.
75+
*
76+
* @return InsertTabletStatement
77+
*/
5778
public InsertTabletStatement constructStatement() {
79+
if (statement != null) {
80+
new PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
81+
return statement;
82+
}
83+
84+
// Sort and deduplicate tablet before converting
5885
new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
5986

6087
try {
6188
if (isTabletEmpty(tablet)) {
6289
// Empty statement, will be filtered after construction
63-
return new InsertTabletStatement();
90+
statement = new InsertTabletStatement();
91+
return statement;
6492
}
6593

66-
return new InsertTabletStatement(tablet, isAligned, null);
94+
statement = new InsertTabletStatement(tablet, isAligned, null);
95+
return statement;
6796
} catch (final MetadataException e) {
6897
LOGGER.warn("Generate Statement from tablet {} error.", tablet, e);
6998
return null;
@@ -107,8 +136,20 @@ public static PipeTransferTabletRawReq toTPipeTransferReq(
107136
public static PipeTransferTabletRawReq fromTPipeTransferReq(final TPipeTransferReq transferReq) {
108137
final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
109138

110-
tabletReq.tablet = Tablet.deserialize(transferReq.body);
111-
tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body);
139+
final ByteBuffer buffer = transferReq.body;
140+
final int startPosition = buffer.position();
141+
try {
142+
// V1: no databaseName, readDatabaseName = false
143+
final InsertTabletStatement insertTabletStatement =
144+
TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, false);
145+
tabletReq.isAligned = insertTabletStatement.isAligned();
146+
// devicePath is already set in deserializeStatementFromTabletFormat for V1 format
147+
tabletReq.statement = insertTabletStatement;
148+
} catch (final Exception e) {
149+
buffer.position(startPosition);
150+
tabletReq.tablet = Tablet.deserialize(buffer);
151+
tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer);
152+
}
112153

113154
tabletReq.version = transferReq.version;
114155
tabletReq.type = transferReq.type;
@@ -118,18 +159,56 @@ public static PipeTransferTabletRawReq fromTPipeTransferReq(final TPipeTransferR
118159

119160
/////////////////////////////// Air Gap ///////////////////////////////
120161

121-
public static byte[] toTPipeTransferBytes(final Tablet tablet, final boolean isAligned)
122-
throws IOException {
162+
/**
163+
* Serialize to bytes. If tablet is null, convert from statement first.
164+
*
165+
* @return serialized bytes
166+
* @throws IOException if serialization fails
167+
*/
168+
public byte[] toTPipeTransferBytes() throws IOException {
169+
Tablet tabletToSerialize = tablet;
170+
boolean isAlignedToSerialize = isAligned;
171+
172+
// If tablet is null, convert from statement
173+
if (tabletToSerialize == null && statement != null) {
174+
try {
175+
tabletToSerialize = statement.convertToTablet();
176+
isAlignedToSerialize = statement.isAligned();
177+
} catch (final MetadataException e) {
178+
throw new IOException("Failed to convert statement to tablet for serialization", e);
179+
}
180+
}
181+
182+
if (tabletToSerialize == null) {
183+
throw new IOException("Cannot serialize: both tablet and statement are null");
184+
}
185+
123186
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
124187
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
125188
ReadWriteIOUtils.write(IoTDBSinkRequestVersion.VERSION_1.getVersion(), outputStream);
126189
ReadWriteIOUtils.write(PipeRequestType.TRANSFER_TABLET_RAW.getType(), outputStream);
127-
tablet.serialize(outputStream);
128-
ReadWriteIOUtils.write(isAligned, outputStream);
190+
tabletToSerialize.serialize(outputStream);
191+
ReadWriteIOUtils.write(isAlignedToSerialize, outputStream);
129192
return byteArrayOutputStream.toByteArray();
130193
}
131194
}
132195

196+
/**
197+
* Static method for backward compatibility. Creates a temporary instance and serializes.
198+
*
199+
* @param tablet Tablet to serialize
200+
* @param isAligned whether aligned
201+
* @return serialized bytes
202+
* @throws IOException if serialization fails
203+
*/
204+
public static byte[] toTPipeTransferBytes(final Tablet tablet, final boolean isAligned)
205+
throws IOException {
206+
final PipeTransferTabletRawReq req = new PipeTransferTabletRawReq();
207+
req.tablet = tablet;
208+
req.isAligned = isAligned;
209+
return req.toTPipeTransferBytes();
210+
}
211+
133212
/////////////////////////////// Object ///////////////////////////////
134213

135214
@Override
@@ -141,7 +220,16 @@ public boolean equals(final Object obj) {
141220
return false;
142221
}
143222
final PipeTransferTabletRawReq that = (PipeTransferTabletRawReq) obj;
144-
return Objects.equals(tablet, that.tablet)
223+
// Compare statement if both have it, otherwise compare tablet
224+
if (statement != null && that.statement != null) {
225+
return Objects.equals(statement, that.statement)
226+
&& isAligned == that.isAligned
227+
&& version == that.version
228+
&& type == that.type
229+
&& Objects.equals(body, that.body);
230+
}
231+
// Fallback to tablet comparison
232+
return Objects.equals(getTablet(), that.getTablet())
145233
&& isAligned == that.isAligned
146234
&& version == that.version
147235
&& type == that.type
@@ -150,6 +238,6 @@ public boolean equals(final Object obj) {
150238

151239
@Override
152240
public int hashCode() {
153-
return Objects.hash(tablet, isAligned, version, type, body);
241+
return Objects.hash(getTablet(), isAligned, version, type, body);
154242
}
155243
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java

Lines changed: 46 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.commons.exception.MetadataException;
2323
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
2424
import org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
25+
import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter;
2526
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
2627
import org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
2728
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
@@ -52,6 +53,16 @@ public String getDataBaseName() {
5253

5354
@Override
5455
public InsertTabletStatement constructStatement() {
56+
if (statement != null) {
57+
if (Objects.isNull(dataBaseName)) {
58+
new PipeTreeModelTabletEventSorter(statement).deduplicateAndSortTimestampsIfNecessary();
59+
} else {
60+
new PipeTableModelTabletEventSorter(statement).sortByTimestampIfNecessary();
61+
}
62+
63+
return statement;
64+
}
65+
5566
if (Objects.isNull(dataBaseName)) {
5667
new PipeTreeModelTabletEventSorter(tablet).deduplicateAndSortTimestampsIfNecessary();
5768
} else {
@@ -86,6 +97,16 @@ public static PipeTransferTabletRawReqV2 toTPipeTransferRawReq(
8697
return tabletReq;
8798
}
8899

100+
public static PipeTransferTabletRawReqV2 toTPipeTransferRawReq(final ByteBuffer buffer) {
101+
final PipeTransferTabletRawReqV2 tabletReq = new PipeTransferTabletRawReqV2();
102+
103+
tabletReq.deserializeTPipeTransferRawReq(buffer);
104+
tabletReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
105+
tabletReq.type = PipeRequestType.TRANSFER_TABLET_RAW_V2.getType();
106+
107+
return tabletReq;
108+
}
109+
89110
/////////////////////////////// Thrift ///////////////////////////////
90111

91112
public static PipeTransferTabletRawReqV2 toTPipeTransferReq(
@@ -114,13 +135,11 @@ public static PipeTransferTabletRawReqV2 fromTPipeTransferReq(
114135
final TPipeTransferReq transferReq) {
115136
final PipeTransferTabletRawReqV2 tabletReq = new PipeTransferTabletRawReqV2();
116137

117-
tabletReq.tablet = Tablet.deserialize(transferReq.body);
118-
tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body);
119-
tabletReq.dataBaseName = ReadWriteIOUtils.readString(transferReq.body);
138+
tabletReq.deserializeTPipeTransferRawReq(transferReq.body);
139+
tabletReq.body = transferReq.body;
120140

121141
tabletReq.version = transferReq.version;
122142
tabletReq.type = transferReq.type;
123-
tabletReq.body = transferReq.body;
124143

125144
return tabletReq;
126145
}
@@ -161,4 +180,27 @@ public boolean equals(Object o) {
161180
public int hashCode() {
162181
return Objects.hash(super.hashCode(), dataBaseName);
163182
}
183+
184+
/////////////////////////////// Util ///////////////////////////////
185+
186+
public void deserializeTPipeTransferRawReq(final ByteBuffer buffer) {
187+
final int startPosition = buffer.position();
188+
try {
189+
// V2: read databaseName, readDatabaseName = true
190+
final InsertTabletStatement insertTabletStatement =
191+
TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, true);
192+
this.isAligned = insertTabletStatement.isAligned();
193+
// databaseName is already set in deserializeStatementFromTabletFormat when
194+
// readDatabaseName=true
195+
this.dataBaseName = insertTabletStatement.getDatabaseName().orElse(null);
196+
this.statement = insertTabletStatement;
197+
} catch (final Exception e) {
198+
// If Statement deserialization fails, fallback to Tablet format
199+
// Reset buffer position for Tablet deserialization
200+
buffer.position(startPosition);
201+
this.tablet = Tablet.deserialize(buffer);
202+
this.isAligned = ReadWriteIOUtils.readBool(buffer);
203+
this.dataBaseName = ReadWriteIOUtils.readString(buffer);
204+
}
205+
}
164206
}

0 commit comments

Comments
 (0)