Skip to content

Commit 208f2e9

Browse files
committed
fix: sorted original indices
1 parent 75d00aa commit 208f2e9

File tree

3 files changed

+101
-37
lines changed

3 files changed

+101
-37
lines changed

integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBCteIT.java

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,10 @@ public class IoTDBCteIT {
7171
new String[] {
7272
"CREATE DATABASE IF NOT EXISTS testdb",
7373
"USE testdb",
74-
"CREATE TABLE IF NOT EXISTS testtb(deviceid STRING TAG, voltage FLOAT FIELD)",
75-
"INSERT INTO testtb VALUES(1000, 'd1', 100.0)",
76-
"INSERT INTO testtb VALUES(2000, 'd1', 200.0)",
77-
"INSERT INTO testtb VALUES(1000, 'd2', 300.0)",
74+
"CREATE TABLE IF NOT EXISTS testtb(voltage FLOAT FIELD, manufacturer STRING FIELD, deviceid STRING TAG)",
75+
"INSERT INTO testtb VALUES(1000, 100.0, 'a', 'd1')",
76+
"INSERT INTO testtb VALUES(2000, 200.0, 'b', 'd1')",
77+
"INSERT INTO testtb VALUES(1000, 300.0, 'c', 'd2')",
7878
};
7979

8080
private static final String dropDbSqls = "DROP DATABASE IF EXISTS testdb";
@@ -118,10 +118,10 @@ public void testMultipleWith() {
118118
String mainQuery =
119119
"select * from cte1 where voltage > "
120120
+ "(with cte2 as materialized (select avg(voltage) as avg_voltage from testtb) select avg_voltage from cte2)";
121-
String[] expectedHeader = new String[] {"time", "deviceid", "voltage"};
121+
String[] expectedHeader = new String[] {"time", "voltage", "manufacturer", "deviceid"};
122122
String[] retArray =
123123
new String[] {
124-
"1970-01-01T00:00:01.000Z,d2,300.0,",
124+
"1970-01-01T00:00:01.000Z,300.0,c,d2,",
125125
};
126126
String[] cteTemplateQueries = new String[] {"cte1 as %s (select * from testtb)"};
127127
testCteSuccessWithVariants(cteTemplateQueries, mainQuery, expectedHeader, retArray);
@@ -131,31 +131,31 @@ public void testMultipleWith() {
131131
public void testFilterQuery() {
132132
// case 1
133133
String mainQuery = "select * from cte where time > 1000 order by deviceid";
134-
String[] expectedHeader = new String[] {"time", "deviceid", "voltage"};
134+
String[] expectedHeader = new String[] {"time", "voltage", "manufacturer", "deviceid"};
135135
String[] retArray =
136136
new String[] {
137-
"1970-01-01T00:00:02.000Z,d1,200.0,",
137+
"1970-01-01T00:00:02.000Z,200.0,b,d1,",
138138
};
139139
String[] cteTemplateQueries = new String[] {"cte as %s (select * from testtb)"};
140140
testCteSuccessWithVariants(cteTemplateQueries, mainQuery, expectedHeader, retArray);
141141

142142
// case 2
143143
mainQuery = "select * from cte where voltage > 200 order by deviceid";
144-
expectedHeader = new String[] {"time", "deviceid", "voltage"};
145-
retArray = new String[] {"1970-01-01T00:00:01.000Z,d2,300.0,"};
144+
expectedHeader = new String[] {"time", "voltage", "manufacturer", "deviceid"};
145+
retArray = new String[] {"1970-01-01T00:00:01.000Z,300.0,c,d2,"};
146146
testCteSuccessWithVariants(cteTemplateQueries, mainQuery, expectedHeader, retArray);
147147
}
148148

149149
@Test
150150
public void testSortQuery() {
151151
final String mainQuery = "select * from cte order by deviceid, voltage desc";
152152

153-
String[] expectedHeader = new String[] {"time", "deviceid", "voltage"};
153+
String[] expectedHeader = new String[] {"time", "voltage", "manufacturer", "deviceid"};
154154
String[] retArray =
155155
new String[] {
156-
"1970-01-01T00:00:02.000Z,d1,200.0,",
157-
"1970-01-01T00:00:01.000Z,d1,100.0,",
158-
"1970-01-01T00:00:01.000Z,d2,300.0,"
156+
"1970-01-01T00:00:02.000Z,200.0,b,d1,",
157+
"1970-01-01T00:00:01.000Z,100.0,a,d1,",
158+
"1970-01-01T00:00:01.000Z,300.0,c,d2,"
159159
};
160160
String[] cteTemplateQueries = new String[] {"cte as %s (select * from testtb)"};
161161
testCteSuccessWithVariants(cteTemplateQueries, mainQuery, expectedHeader, retArray);
@@ -165,10 +165,10 @@ public void testSortQuery() {
165165
public void testLimitOffsetQuery() {
166166
final String mainQuery = "select * from cte limit 1 offset 1";
167167

168-
String[] expectedHeader = new String[] {"time", "deviceid", "voltage"};
168+
String[] expectedHeader = new String[] {"time", "voltage", "manufacturer", "deviceid"};
169169
String[] retArray =
170170
new String[] {
171-
"1970-01-01T00:00:02.000Z,d1,200.0,",
171+
"1970-01-01T00:00:02.000Z,200.0,b,d1,",
172172
};
173173
String[] cteTemplateQueries =
174174
new String[] {"cte as %s (select * from testtb where deviceid = 'd1') "};
@@ -248,8 +248,8 @@ public void testExplain() throws SQLException {
248248

249249
@Test
250250
public void testMultiReference() {
251-
String[] expectedHeader = new String[] {"time", "deviceid", "voltage"};
252-
String[] retArray = new String[] {"1970-01-01T00:00:01.000Z,d2,300.0,"};
251+
String[] expectedHeader = new String[] {"time", "voltage", "manufacturer", "deviceid"};
252+
String[] retArray = new String[] {"1970-01-01T00:00:01.000Z,300.0,c,d2,"};
253253
String[] cteTemplateQueries = new String[] {"cte as %s (select * from testtb)"};
254254
String mainQuery = "select * from cte where voltage > (select avg(voltage) from cte)";
255255
testCteSuccessWithVariants(cteTemplateQueries, mainQuery, expectedHeader, retArray);
@@ -280,10 +280,11 @@ public void testSession() throws IoTDBConnectionException, StatementExecutionExc
280280
session.executeQueryStatement(
281281
String.format("with cte as %s (select * from testtb) select * from cte", keyword));
282282

283-
assertEquals(dataSet.getColumnNames().size(), 3);
283+
assertEquals(dataSet.getColumnNames().size(), 4);
284284
assertEquals(dataSet.getColumnNames().get(0), "time");
285-
assertEquals(dataSet.getColumnNames().get(1), "deviceid");
286-
assertEquals(dataSet.getColumnNames().get(2), "voltage");
285+
assertEquals(dataSet.getColumnNames().get(1), "voltage");
286+
assertEquals(dataSet.getColumnNames().get(2), "manufacturer");
287+
assertEquals(dataSet.getColumnNames().get(3), "deviceid");
287288
int cnt = 0;
288289
while (dataSet.hasNext()) {
289290
dataSet.next();
@@ -310,10 +311,11 @@ public void testJdbc() throws ClassNotFoundException, SQLException {
310311
String.format("with cte as %s (select * from testtb) select * from cte", keyword));
311312

312313
final ResultSetMetaData metaData = resultSet.getMetaData();
313-
assertEquals(metaData.getColumnCount(), 3);
314+
assertEquals(metaData.getColumnCount(), 4);
314315
assertEquals(metaData.getColumnLabel(1), "time");
315-
assertEquals(metaData.getColumnLabel(2), "deviceid");
316-
assertEquals(metaData.getColumnLabel(3), "voltage");
316+
assertEquals(metaData.getColumnLabel(2), "voltage");
317+
assertEquals(metaData.getColumnLabel(3), "manufacturer");
318+
assertEquals(metaData.getColumnLabel(4), "deviceid");
317319

318320
int cnt = 0;
319321
while (resultSet.next()) {
@@ -326,20 +328,20 @@ public void testJdbc() throws ClassNotFoundException, SQLException {
326328

327329
@Test
328330
public void testNest() {
329-
final String mainQuery = "SELECT * FROM cte2";
331+
final String mainQuery = "select * from cte2";
330332

331333
String[] cteTemplateQueries =
332334
new String[] {
333335
"cte1 as %s (select deviceid, voltage from testtb where voltage > 200)",
334-
"cte2 as %s (SELECT voltage FROM cte1)"
336+
"cte2 as %s (select voltage from cte1)"
335337
};
336338
String[] expectedHeader = new String[] {"voltage"};
337339
String[] retArray = new String[] {"300.0,"};
338340
testCteSuccessWithVariants(cteTemplateQueries, mainQuery, expectedHeader, retArray);
339341

340342
cteTemplateQueries =
341343
new String[] {
342-
"cte2 as %s (SELECT voltage FROM cte1)",
344+
"cte2 as %s (select voltage from cte1)",
343345
"cte1 as %s (select deviceid, voltage from testtb where voltage > 200)"
344346
};
345347
String errMsg = "550: Table 'testdb.cte1' does not exist.";
@@ -406,8 +408,8 @@ public void testRecursive() {
406408
"WITH RECURSIVE t(n) AS %s ("
407409
+ " VALUES (1)"
408410
+ " UNION ALL"
409-
+ " SELECT n+1 FROM t WHERE n < 100)"
410-
+ " SELECT sum(n) FROM t";
411+
+ " select n+1 from t WHERE n < 100)"
412+
+ " select sum(n) from t";
411413

412414
for (String keyword : cteKeywords) {
413415
tableAssertTestFail(
@@ -426,7 +428,7 @@ public void testPrivileges() throws SQLException {
426428
adminStmt.execute("USE testdb");
427429
adminStmt.execute(
428430
"CREATE TABLE IF NOT EXISTS testtb1(deviceid STRING TAG, voltage FLOAT FIELD)");
429-
adminStmt.execute("GRANT SELECT ON testdb.testtb TO USER tmpuser");
431+
adminStmt.execute("GRANT select ON testdb.testtb TO USER tmpuser");
430432

431433
try (Connection connection =
432434
EnvFactory.getEnv()
@@ -496,13 +498,13 @@ public void testConcurrentCteQueries() throws Exception {
496498
// Test different types of CTE queries
497499
String[] queries = {
498500
String.format(
499-
"WITH cte as %s (SELECT * FROM testtb WHERE voltage > 150) SELECT * FROM cte ORDER BY deviceid",
501+
"WITH cte as %s (select * from testtb WHERE voltage > 150) select * from cte ORDER BY deviceid",
500502
cteKeywords[j % cteKeywords.length]),
501503
String.format(
502-
"WITH cte as %s (SELECT deviceid, avg(voltage) as avg_v FROM testtb GROUP BY deviceid) SELECT * FROM cte",
504+
"WITH cte as %s (select deviceid, avg(voltage) as avg_v from testtb GROUP BY deviceid) select * from cte",
503505
cteKeywords[j % cteKeywords.length]),
504506
String.format(
505-
"WITH cte as %s (SELECT * FROM testtb WHERE time > 1000) SELECT count(*) as cnt FROM cte",
507+
"WITH cte as %s (select * from testtb WHERE time > 1000) select count(*) as cnt from cte",
506508
cteKeywords[j % cteKeywords.length])
507509
};
508510

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/StatementAnalyzer.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3099,8 +3099,29 @@ protected Scope visitTable(Table table, Optional<Scope> scope) {
30993099

31003100
// check if table schema is found in CTE data stores
31013101
CteDataStore dataStore = queryContext.getCteDataStore(table);
3102-
Optional<TableSchema> tableSchema =
3103-
dataStore != null ? Optional.of(dataStore.getTableSchema()) : Optional.empty();
3102+
Optional<TableSchema> tableSchema = Optional.empty();
3103+
if (dataStore != null) {
3104+
tableSchema = Optional.of(dataStore.getTableSchema());
3105+
List<Integer> columnIndex2TsBlockColumnIndexList =
3106+
dataStore.getColumnIndex2TsBlockColumnIndexList();
3107+
if (columnIndex2TsBlockColumnIndexList != null
3108+
&& !columnIndex2TsBlockColumnIndexList.isEmpty()) {
3109+
// Check if the list is completely sequential (0, 1, 2, ...)
3110+
boolean isSequential = true;
3111+
for (int i = 0; i < columnIndex2TsBlockColumnIndexList.size(); i++) {
3112+
if (columnIndex2TsBlockColumnIndexList.get(i) != i) {
3113+
isSequential = false;
3114+
break;
3115+
}
3116+
}
3117+
3118+
// Generate new TableSchema with reordered columns only if not sequential
3119+
if (!isSequential) {
3120+
tableSchema =
3121+
reorderTableSchemaColumns(tableSchema.get(), columnIndex2TsBlockColumnIndexList);
3122+
}
3123+
}
3124+
}
31043125
// If table schema is not found, check if it is in metadata
31053126
if (!tableSchema.isPresent()) {
31063127
tableSchema = metadata.getTableSchema(sessionContext, name);
@@ -3124,6 +3145,17 @@ protected Scope visitTable(Table table, Optional<Scope> scope) {
31243145
return createAndAssignScope(table, scope, relationType);
31253146
}
31263147

3148+
private Optional<TableSchema> reorderTableSchemaColumns(
3149+
TableSchema tableSchema, List<Integer> columnIndex2TsBlockColumnIndexList) {
3150+
List<ColumnSchema> columnSchemas = tableSchema.getColumns();
3151+
final List<ColumnSchema> columnSchemaList =
3152+
columnIndex2TsBlockColumnIndexList.stream()
3153+
.map(columnSchemas::get)
3154+
.collect(Collectors.toList());
3155+
3156+
return Optional.of(new TableSchema(tableSchema.getTableName(), columnSchemaList));
3157+
}
3158+
31273159
private Scope createScopeForCommonTableExpression(
31283160
Table table, Optional<Scope> scope, WithQuery withQuery) {
31293161
Query query = withQuery.getQuery();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/CteMaterializer.java

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import org.slf4j.Logger;
6464
import org.slf4j.LoggerFactory;
6565

66+
import java.util.LinkedHashMap;
6667
import java.util.List;
6768
import java.util.Map;
6869
import java.util.Optional;
@@ -225,9 +226,14 @@ private TableSchema getTableSchema(DatasetHeader datasetHeader, String cteName)
225226
columnIndex2TsBlockColumnIndexList =
226227
IntStream.range(0, columnNames.size()).boxed().collect(Collectors.toList());
227228
}
228-
// build column schema list of cte table based on column2BlockColumnIndex
229+
230+
// Get original column indices in the TsBlock
231+
List<Integer> tsBlockColumnIndices =
232+
adjustColumnIndexMapping(columnIndex2TsBlockColumnIndexList);
233+
234+
// build column schema list of cte table based on sorted original indices
229235
final List<ColumnSchema> columnSchemaList =
230-
columnIndex2TsBlockColumnIndexList.stream()
236+
tsBlockColumnIndices.stream()
231237
.map(
232238
index ->
233239
new ColumnSchema(
@@ -239,6 +245,30 @@ private TableSchema getTableSchema(DatasetHeader datasetHeader, String cteName)
239245
return new TableSchema(cteName, columnSchemaList);
240246
}
241247

248+
/**
249+
* Adjust column index mapping by sorting and preserving original indices. For example, if input
250+
* is {0, 3, 1, 2}, the output will be {0, 2, 3, 1}. This method doesn't modify the original list.
251+
*
252+
* @param originalIndexList original column index list
253+
* @return adjusted column index list with sorted values preserving original positions
254+
*/
255+
private List<Integer> adjustColumnIndexMapping(List<Integer> originalIndexList) {
256+
if (originalIndexList == null || originalIndexList.isEmpty()) {
257+
return originalIndexList;
258+
}
259+
260+
// Create LinkedHashMap to maintain value-position mapping
261+
Map<Integer, Integer> valueToPositionMap = new LinkedHashMap<>();
262+
IntStream.range(0, originalIndexList.size())
263+
.forEach(i -> valueToPositionMap.put(originalIndexList.get(i), i));
264+
265+
// Sort by key (value) and collect positions in sorted order
266+
return valueToPositionMap.entrySet().stream()
267+
.sorted(Map.Entry.comparingByKey())
268+
.map(Map.Entry::getValue)
269+
.collect(Collectors.toList());
270+
}
271+
242272
private List<String> getCteExplainAnalyzeLines(
243273
FragmentInstanceStatisticsDrawer fragmentInstanceStatisticsDrawer,
244274
List<FragmentInstance> instances,

0 commit comments

Comments
 (0)