Skip to content

Commit 5020cf5

Browse files
committed
add CteScanOperator test
1 parent 74340b0 commit 5020cf5

File tree

2 files changed

+259
-0
lines changed

2 files changed

+259
-0
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
2323

24+
import org.apache.iotdb.commons.utils.TestOnly;
2425
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
2526
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
2627
import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSourceOperator;
@@ -122,4 +123,9 @@ private void prepareReader() {
122123
dataReader = new MemoryReader(dataStore.getCachedData());
123124
}
124125
}
126+
127+
@TestOnly
128+
public int getDataStoreRefCount() {
129+
return dataStoreRefCount;
130+
}
125131
}
Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,253 @@
1+
/*
2+
*
3+
* * Licensed to the Apache Software Foundation (ASF) under one
4+
* * or more contributor license agreements. See the NOTICE file
5+
* * distributed with this work for additional information
6+
* * regarding copyright ownership. The ASF licenses this file
7+
* * to you under the Apache License, Version 2.0 (the
8+
* * "License"); you may not use this file except in compliance
9+
* * with the License. You may obtain a copy of the License at
10+
* *
11+
* * http://www.apache.org/licenses/LICENSE-2.0
12+
* *
13+
* * Unless required by applicable law or agreed to in writing,
14+
* * software distributed under the License is distributed on an
15+
* * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* * KIND, either express or implied. See the License for the
17+
* * specific language governing permissions and limitations
18+
* * under the License.
19+
*
20+
*/
21+
22+
package org.apache.iotdb.db.queryengine.execution.operator;
23+
24+
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
25+
import org.apache.iotdb.db.queryengine.execution.operator.source.relational.CteScanOperator;
26+
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
27+
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
28+
import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
29+
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
30+
import org.apache.iotdb.db.utils.cte.CteDataStore;
31+
32+
import com.google.common.collect.ImmutableList;
33+
import org.apache.tsfile.common.conf.TSFileDescriptor;
34+
import org.apache.tsfile.enums.TSDataType;
35+
import org.apache.tsfile.read.common.block.TsBlock;
36+
import org.apache.tsfile.read.common.block.TsBlockBuilder;
37+
import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
38+
import org.apache.tsfile.read.common.block.column.DoubleColumnBuilder;
39+
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
40+
import org.apache.tsfile.read.common.type.DoubleType;
41+
import org.apache.tsfile.read.common.type.StringType;
42+
import org.apache.tsfile.read.common.type.TimestampType;
43+
import org.apache.tsfile.utils.Binary;
44+
import org.junit.After;
45+
import org.junit.Before;
46+
import org.junit.Test;
47+
48+
import java.nio.charset.StandardCharsets;
49+
import java.util.ArrayList;
50+
import java.util.Arrays;
51+
import java.util.List;
52+
53+
import static org.junit.Assert.assertEquals;
54+
import static org.junit.Assert.assertFalse;
55+
import static org.junit.Assert.assertNotNull;
56+
import static org.junit.Assert.assertNull;
57+
import static org.junit.Assert.assertTrue;
58+
import static org.mockito.Mockito.mock;
59+
60+
public class CteScanOperatorTest {
61+
private OperatorContext operatorContext;
62+
private PlanNodeId planNodeId;
63+
private CteDataStore cteDataStore;
64+
private CteScanOperator cteScanOperator;
65+
66+
@Before
67+
public void setUp() {
68+
// Set up mock objects
69+
operatorContext = mock(OperatorContext.class);
70+
planNodeId = new PlanNodeId("test-plan-node");
71+
72+
// Create a simple table schema for testing
73+
TableSchema tableSchema = createTestTableSchema();
74+
75+
// Create mock query
76+
Query mockQuery = mock(Query.class);
77+
78+
// Create column index mapping
79+
List<Integer> columnIndex2TsBlockColumnIndexList = Arrays.asList(0, 1, 2);
80+
81+
// Initialize CteDataStore
82+
cteDataStore = new CteDataStore(mockQuery, tableSchema, columnIndex2TsBlockColumnIndexList);
83+
84+
// Add test data to the data store
85+
List<TsBlock> testData = createTestTsBlocks();
86+
for (TsBlock tsBlock : testData) {
87+
cteDataStore.addTsBlock(tsBlock);
88+
}
89+
}
90+
91+
@After
92+
public void tearDown() throws Exception {
93+
if (cteScanOperator != null) {
94+
cteScanOperator.close();
95+
}
96+
}
97+
98+
@Test
99+
public void testConstructor() throws Exception {
100+
cteScanOperator = new CteScanOperator(operatorContext, planNodeId, cteDataStore);
101+
assertEquals(1, cteScanOperator.getDataStoreRefCount());
102+
cteScanOperator.close();
103+
}
104+
105+
@Test
106+
public void testEmptyDataStore() throws Exception {
107+
// Create empty data store
108+
Query mockQuery = mock(Query.class);
109+
TableSchema tableSchema = createTestTableSchema();
110+
CteDataStore emptyDataStore = new CteDataStore(mockQuery, tableSchema, Arrays.asList(0, 1, 2));
111+
112+
cteScanOperator = new CteScanOperator(operatorContext, planNodeId, emptyDataStore);
113+
// Should not have data
114+
assertFalse(cteScanOperator.hasNext());
115+
116+
cteScanOperator.close();
117+
}
118+
119+
@Test
120+
public void testNextWithData() throws Exception {
121+
cteScanOperator = new CteScanOperator(operatorContext, planNodeId, cteDataStore);
122+
// Should have data
123+
assertTrue(cteScanOperator.hasNext());
124+
TsBlock firstBlock = cteScanOperator.next();
125+
assertNotNull(firstBlock);
126+
assertEquals(2, firstBlock.getValueColumnCount());
127+
assertEquals(3, firstBlock.getPositionCount());
128+
129+
// Should have data
130+
assertTrue(cteScanOperator.hasNext());
131+
TsBlock secondBlock = cteScanOperator.next();
132+
assertNotNull(secondBlock);
133+
assertEquals(2, secondBlock.getValueColumnCount());
134+
assertEquals(2, secondBlock.getPositionCount());
135+
136+
// should return null
137+
TsBlock thirdBlock = cteScanOperator.next();
138+
assertNull(thirdBlock);
139+
140+
cteScanOperator.close();
141+
}
142+
143+
@Test
144+
public void testIsFinished() throws Exception {
145+
cteScanOperator = new CteScanOperator(operatorContext, planNodeId, cteDataStore);
146+
147+
// Initially not finished
148+
assertFalse(cteScanOperator.isFinished());
149+
// Consume all data
150+
while (cteScanOperator.hasNext()) {
151+
cteScanOperator.next();
152+
}
153+
// Now should be finished
154+
assertTrue(cteScanOperator.isFinished());
155+
156+
cteScanOperator.close();
157+
}
158+
159+
@Test
160+
public void testMemory() throws Exception {
161+
cteScanOperator = new CteScanOperator(operatorContext, planNodeId, cteDataStore);
162+
163+
// maxPeekMemory + maxReturnSize + retainedSize
164+
long maxPeekMemory = cteScanOperator.calculateMaxPeekMemory();
165+
assertEquals(
166+
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(), maxPeekMemory);
167+
long maxReturnSize = cteScanOperator.calculateMaxReturnSize();
168+
assertEquals(
169+
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(), maxReturnSize);
170+
long retainedSize = cteScanOperator.calculateRetainedSizeAfterCallingNext();
171+
assertEquals(0L, retainedSize);
172+
173+
cteScanOperator.close();
174+
}
175+
176+
@Test
177+
public void testMultipleCteScanOperators() throws Exception {
178+
// Test reference counting with multiple operators
179+
CteScanOperator operator1 = new CteScanOperator(operatorContext, planNodeId, cteDataStore);
180+
CteScanOperator operator2 = new CteScanOperator(operatorContext, planNodeId, cteDataStore);
181+
182+
// CteDataStore Reference count
183+
assertEquals(1, operator1.getDataStoreRefCount());
184+
assertEquals(2, operator2.getDataStoreRefCount());
185+
186+
// Operator Memory
187+
assertTrue(operator1.ramBytesUsed() > operator2.ramBytesUsed());
188+
189+
// Both operators should be able to read data
190+
assertTrue(operator1.hasNext());
191+
assertTrue(operator2.hasNext());
192+
193+
// Clean up
194+
operator1.close();
195+
operator2.close();
196+
}
197+
198+
private TableSchema createTestTableSchema() {
199+
List<ColumnSchema> columnSchemas = new ArrayList<>();
200+
columnSchemas.add(
201+
new ColumnSchema("time", TimestampType.TIMESTAMP, false, TsTableColumnCategory.TIME));
202+
columnSchemas.add(
203+
new ColumnSchema("name", StringType.STRING, false, TsTableColumnCategory.FIELD));
204+
columnSchemas.add(
205+
new ColumnSchema("value", DoubleType.DOUBLE, false, TsTableColumnCategory.FIELD));
206+
207+
return new TableSchema("test_table", columnSchemas);
208+
}
209+
210+
private List<TsBlock> createTestTsBlocks() {
211+
List<TsBlock> blocks = new ArrayList<>();
212+
213+
// Create first TsBlock
214+
blocks.add(
215+
createTsBlock(
216+
new long[] {1000L, 2000L, 3000L},
217+
new String[] {"Alice", "Bob", "Charlie"},
218+
new double[] {10.5, 20.3, 30.7}));
219+
220+
// Create second TsBlock
221+
blocks.add(
222+
createTsBlock(
223+
new long[] {4000L, 5000L}, new String[] {"David", "Eve"}, new double[] {40.2, 50.8}));
224+
225+
return blocks;
226+
}
227+
228+
private TsBlock createTsBlock(long[] times, String[] names, double[] values) {
229+
TsBlockBuilder builder =
230+
new TsBlockBuilder(ImmutableList.of(TSDataType.STRING, TSDataType.DOUBLE));
231+
232+
// Time column
233+
TimeColumnBuilder timeColumn = builder.getTimeColumnBuilder();
234+
for (long time : times) {
235+
timeColumn.writeLong(time);
236+
}
237+
238+
// Name column
239+
BinaryColumnBuilder nameColumn = (BinaryColumnBuilder) builder.getColumnBuilder(0);
240+
for (String name : names) {
241+
nameColumn.writeBinary(new Binary(name, StandardCharsets.UTF_8));
242+
}
243+
244+
// Value column
245+
DoubleColumnBuilder valueColumn = (DoubleColumnBuilder) builder.getColumnBuilder(1);
246+
for (double value : values) {
247+
valueColumn.writeDouble(value);
248+
}
249+
250+
builder.declarePositions(times.length);
251+
return builder.build();
252+
}
253+
}

0 commit comments

Comments
 (0)