diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 91c36262cbd9..9293f9867b67 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -2435,7 +2435,8 @@ private List getFileHandleListForQuery( } else { tsFileResource .getProcessor() - .queryForSeriesRegionScanWithoutLock(partialPaths, context, fileScanHandles); + .queryForSeriesRegionScanWithoutLock( + partialPaths, context, fileScanHandles, globalTimeFilter); } } return fileScanHandles; @@ -2512,7 +2513,8 @@ private List getFileHandleListForQuery( } else { tsFileResource .getProcessor() - .queryForDeviceRegionScanWithoutLock(devicePathsToContext, context, fileScanHandles); + .queryForDeviceRegionScanWithoutLock( + devicePathsToContext, context, fileScanHandles, globalTimeFilter); } } return fileScanHandles; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index 92803984cee1..22606f232a8d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -70,6 +70,7 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @@ -488,7 +489,8 @@ public void queryForSeriesRegionScan( long ttlLowerBound, Map> chunkMetaDataMap, Map> memChunkHandleMap, - List> modsToMemTabled) { + List> modsToMemTabled, + Filter globalTimeFilter) { IDeviceID deviceID = fullPath.getDeviceId(); if (fullPath instanceof NonAlignedFullPath) { @@ -506,7 +508,12 @@ public void queryForSeriesRegionScan( fullPath.getDeviceId(), measurementId, this, modsToMemTabled, ttlLowerBound); } getMemChunkHandleFromMemTable( - deviceID, measurementId, chunkMetaDataMap, memChunkHandleMap, deletionList); + deviceID, + measurementId, + chunkMetaDataMap, + memChunkHandleMap, + deletionList, + globalTimeFilter); } else { // check If MemTable Contains this path if (!memTableMap.containsKey(deviceID)) { @@ -528,7 +535,8 @@ public void queryForSeriesRegionScan( ((AlignedFullPath) fullPath).getSchemaList(), chunkMetaDataMap, memChunkHandleMap, - deletionList); + deletionList, + globalTimeFilter); } } @@ -539,7 +547,8 @@ public void queryForDeviceRegionScan( long ttlLowerBound, Map> chunkMetadataMap, Map> memChunkHandleMap, - List> modsToMemTabled) { + List> modsToMemTabled, + Filter globalTimeFilter) { Map memTableMap = getMemTableMap(); @@ -556,7 +565,8 @@ public void queryForDeviceRegionScan( chunkMetadataMap, memChunkHandleMap, ttlLowerBound, - modsToMemTabled); + modsToMemTabled, + globalTimeFilter); } else { getMemChunkHandleFromMemTable( deviceID, @@ -564,7 +574,8 @@ public void queryForDeviceRegionScan( chunkMetadataMap, memChunkHandleMap, ttlLowerBound, - modsToMemTabled); + modsToMemTabled, + globalTimeFilter); } } @@ -573,24 +584,30 @@ private void getMemChunkHandleFromMemTable( String measurementId, Map> chunkMetadataMap, Map> memChunkHandleMap, - List deletionList) { + List deletionList, + Filter globalTimeFilter) { WritableMemChunk memChunk = (WritableMemChunk) memTableMap.get(deviceID).getMemChunkMap().get(measurementId); - long[] timestamps = memChunk.getFilteredTimestamp(deletionList); + if (memChunk == null) { + return; + } + Optional anySatisfiedTimestamp = + memChunk.getAnySatisfiedTimestamp(deletionList, globalTimeFilter); + if (!anySatisfiedTimestamp.isPresent()) { + return; + } + long satisfiedTimestamp = anySatisfiedTimestamp.get(); chunkMetadataMap .computeIfAbsent(measurementId, k -> new ArrayList<>()) .add( - buildChunkMetaDataForMemoryChunk( - measurementId, - timestamps[0], - timestamps[timestamps.length - 1], - Collections.emptyList())); + buildFakeChunkMetaDataForFakeMemoryChunk( + measurementId, satisfiedTimestamp, satisfiedTimestamp, Collections.emptyList())); memChunkHandleMap .computeIfAbsent(measurementId, k -> new ArrayList<>()) - .add(new MemChunkHandleImpl(deviceID, measurementId, timestamps)); + .add(new MemChunkHandleImpl(deviceID, measurementId, new long[] {satisfiedTimestamp})); } private void getMemAlignedChunkHandleFromMemTable( @@ -598,7 +615,8 @@ private void getMemAlignedChunkHandleFromMemTable( List schemaList, Map> chunkMetadataList, Map> memChunkHandleMap, - List> deletionList) { + List> deletionList, + Filter globalTimeFilter) { AlignedWritableMemChunk alignedMemChunk = ((AlignedWritableMemChunkGroup) memTableMap.get(deviceID)).getAlignedMemChunk(); @@ -615,7 +633,11 @@ private void getMemAlignedChunkHandleFromMemTable( } List bitMaps = new ArrayList<>(); - long[] timestamps = alignedMemChunk.getFilteredTimestamp(deletionList, bitMaps, true); + long[] timestamps = + alignedMemChunk.getAnySatisfiedTimestamp(deletionList, bitMaps, true, globalTimeFilter); + if (timestamps.length == 0) { + return; + } buildAlignedMemChunkHandle( deviceID, @@ -633,7 +655,8 @@ private void getMemAlignedChunkHandleFromMemTable( Map> chunkMetadataList, Map> memChunkHandleMap, long ttlLowerBound, - List> modsToMemTabled) { + List> modsToMemTabled, + Filter globalTimeFilter) { AlignedWritableMemChunk memChunk = writableMemChunkGroup.getAlignedMemChunk(); List schemaList = memChunk.getSchemaList(); @@ -648,7 +671,11 @@ private void getMemAlignedChunkHandleFromMemTable( } List bitMaps = new ArrayList<>(); - long[] timestamps = memChunk.getFilteredTimestamp(deletionList, bitMaps, true); + long[] timestamps = + memChunk.getAnySatisfiedTimestamp(deletionList, bitMaps, true, globalTimeFilter); + if (timestamps.length == 0) { + return; + } buildAlignedMemChunkHandle( deviceID, timestamps, @@ -665,7 +692,8 @@ private void getMemChunkHandleFromMemTable( Map> chunkMetadataMap, Map> memChunkHandleMap, long ttlLowerBound, - List> modsToMemTabled) { + List> modsToMemTabled, + Filter globalTimeFilter) { for (Entry entry : writableMemChunkGroup.getMemChunkMap().entrySet()) { @@ -679,18 +707,20 @@ private void getMemChunkHandleFromMemTable( ModificationUtils.constructDeletionList( deviceID, measurementId, this, modsToMemTabled, ttlLowerBound); } - long[] timestamps = writableMemChunk.getFilteredTimestamp(deletionList); + Optional anySatisfiedTimestamp = + writableMemChunk.getAnySatisfiedTimestamp(deletionList, globalTimeFilter); + if (!anySatisfiedTimestamp.isPresent()) { + return; + } + long satisfiedTimestamp = anySatisfiedTimestamp.get(); chunkMetadataMap .computeIfAbsent(measurementId, k -> new ArrayList<>()) .add( - buildChunkMetaDataForMemoryChunk( - measurementId, - timestamps[0], - timestamps[timestamps.length - 1], - Collections.emptyList())); + buildFakeChunkMetaDataForFakeMemoryChunk( + measurementId, satisfiedTimestamp, satisfiedTimestamp, Collections.emptyList())); memChunkHandleMap .computeIfAbsent(measurementId, k -> new ArrayList<>()) - .add(new MemChunkHandleImpl(deviceID, measurementId, timestamps)); + .add(new MemChunkHandleImpl(deviceID, measurementId, new long[] {satisfiedTimestamp})); } } @@ -714,7 +744,7 @@ private void buildAlignedMemChunkHandle( chunkMetadataList .computeIfAbsent(measurement, k -> new ArrayList<>()) .add( - buildChunkMetaDataForMemoryChunk( + buildFakeChunkMetaDataForFakeMemoryChunk( measurement, startEndTime[0], startEndTime[1], deletion)); chunkHandleMap .computeIfAbsent(measurement, k -> new ArrayList<>()) @@ -745,7 +775,7 @@ private long[] calculateStartEndTime(long[] timestamps, List bitMaps, in return new long[] {startTime, endTime}; } - private IChunkMetadata buildChunkMetaDataForMemoryChunk( + private IChunkMetadata buildFakeChunkMetaDataForFakeMemoryChunk( String measurement, long startTime, long endTime, List deletionList) { TimeStatistics timeStatistics = new TimeStatistics(); timeStatistics.setStartTime(startTime); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index f81238bef71e..49f8ab0dadab 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -34,6 +34,7 @@ import org.apache.tsfile.encrypt.EncryptUtils; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.utils.Pair; @@ -55,6 +56,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import static org.apache.iotdb.db.utils.ModificationUtils.isPointDeleted; @@ -287,30 +289,99 @@ private Pair checkAndReorderColumnValuesInInsertPlan( return new Pair<>(reorderedColumnValues, reorderedBitMaps); } - private void filterDeletedTimeStamp( + public long[] getAnySatisfiedTimestamp( + List> deletionList, + List bitMaps, + boolean ignoreAllNullRows, + Filter globalTimeFilter) { + BitMap columnHasNonNullValue = new BitMap(schemaList.size()); + AtomicInteger hasNonNullValueColumnCount = new AtomicInteger(0); + Map timestampWithBitmap = new TreeMap<>(); + + getAnySatisfiedTimestamp( + list, + deletionList, + ignoreAllNullRows, + timestampWithBitmap, + globalTimeFilter, + columnHasNonNullValue, + hasNonNullValueColumnCount); + for (int i = 0; + i < sortedList.size() && hasNonNullValueColumnCount.get() < schemaList.size(); + i++) { + if (!ignoreAllNullRows && !timestampWithBitmap.isEmpty()) { + // count devices in table model + break; + } + getAnySatisfiedTimestamp( + sortedList.get(i), + deletionList, + ignoreAllNullRows, + timestampWithBitmap, + globalTimeFilter, + columnHasNonNullValue, + hasNonNullValueColumnCount); + } + + long[] timestamps = new long[timestampWithBitmap.size()]; + int idx = 0; + for (Map.Entry entry : timestampWithBitmap.entrySet()) { + timestamps[idx++] = entry.getKey(); + bitMaps.add(entry.getValue()); + } + return timestamps; + } + + private void getAnySatisfiedTimestamp( AlignedTVList alignedTVList, List> valueColumnsDeletionList, boolean ignoreAllNullRows, - Map timestampWithBitmap) { + Map timestampWithBitmap, + Filter globalTimeFilter, + BitMap columnHasNonNullValue, + AtomicInteger hasNonNullValueColumnCount) { + if (globalTimeFilter != null + && !globalTimeFilter.satisfyStartEndTime( + alignedTVList.getMinTime(), alignedTVList.getMaxTime())) { + return; + } BitMap allValueColDeletedMap = alignedTVList.getAllValueColDeletedMap(); - int rowCount = alignedTVList.rowCount(); List valueColumnDeleteCursor = new ArrayList<>(); if (valueColumnsDeletionList != null) { valueColumnsDeletionList.forEach(x -> valueColumnDeleteCursor.add(new int[] {0})); } + // example: + // globalTimeFilter:null, ignoreAllNullRows: true + // tvList: + // time s1 s2 s3 + // 1 1 null null + // 2 null 1 null + // 2 1 1 null + // 3 1 null null + // 4 1 null 1 + // timestampWithBitmap: + // timestamp: 1 bitmap: 011 + // timestamp: 2 bitmap: 101 + // timestamp: 4 bitmap: 110 for (int row = 0; row < rowCount; row++) { // the row is deleted if (allValueColDeletedMap != null && allValueColDeletedMap.isMarked(row)) { continue; } long timestamp = alignedTVList.getTime(row); + if (globalTimeFilter != null && !globalTimeFilter.satisfy(timestamp, null)) { + continue; + } + + // Note that this method will only perform bitmap unmarking on the first occurrence of a + // non-null value in multiple timestamps for the same column. + BitMap currentRowNullValueBitmap = null; - BitMap bitMap = new BitMap(schemaList.size()); for (int column = 0; column < schemaList.size(); column++) { if (alignedTVList.isNullValue(alignedTVList.getValueIndex(row), column)) { - bitMap.mark(column); + continue; } // skip deleted row @@ -320,33 +391,44 @@ && isPointDeleted( timestamp, valueColumnsDeletionList.get(column), valueColumnDeleteCursor.get(column))) { - bitMap.mark(column); - } - - // skip all-null row - if (ignoreAllNullRows && bitMap.isAllMarked()) { continue; } - timestampWithBitmap.put(timestamp, bitMap); + if (!columnHasNonNullValue.isMarked(column)) { + hasNonNullValueColumnCount.incrementAndGet(); + columnHasNonNullValue.mark(column); + currentRowNullValueBitmap = + currentRowNullValueBitmap != null + ? currentRowNullValueBitmap + : timestampWithBitmap.computeIfAbsent( + timestamp, k -> getAllMarkedBitmap(schemaList.size())); + currentRowNullValueBitmap.unmark(column); + } } - } - } - public long[] getFilteredTimestamp( - List> deletionList, List bitMaps, boolean ignoreAllNullRows) { - Map timestampWithBitmap = new TreeMap<>(); + if (!ignoreAllNullRows) { + timestampWithBitmap.put( + timestamp, + currentRowNullValueBitmap != null + ? currentRowNullValueBitmap + : getAllMarkedBitmap(schemaList.size())); + return; + } + if (currentRowNullValueBitmap == null) { + continue; + } + // found new column with non-null value + timestampWithBitmap.put(timestamp, currentRowNullValueBitmap); - filterDeletedTimeStamp(list, deletionList, ignoreAllNullRows, timestampWithBitmap); - for (AlignedTVList alignedTVList : sortedList) { - filterDeletedTimeStamp(alignedTVList, deletionList, ignoreAllNullRows, timestampWithBitmap); + if (hasNonNullValueColumnCount.get() == schemaList.size()) { + return; + } } + } - List filteredTimestamps = new ArrayList<>(); - for (Map.Entry entry : timestampWithBitmap.entrySet()) { - filteredTimestamps.add(entry.getKey()); - bitMaps.add(entry.getValue()); - } - return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray(); + private BitMap getAllMarkedBitmap(int size) { + BitMap bitMap = new BitMap(size); + bitMap.markAll(); + return bitMap; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java index 6c6e09c57825..fd9ffe90b0ae 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java @@ -125,7 +125,8 @@ void queryForSeriesRegionScan( long ttlLowerBound, Map> chunkMetadataMap, Map> memChunkHandleMap, - List> modsToMemtabled) + List> modsToMemtabled, + Filter globalTimeFilter) throws IOException, QueryProcessException, MetadataException; void queryForDeviceRegionScan( @@ -134,7 +135,8 @@ void queryForDeviceRegionScan( long ttlLowerBound, Map> chunkMetadataMap, Map> memChunkHandleMap, - List> modsToMemtabled) + List> modsToMemtabled, + Filter globalTimeFilter) throws IOException, QueryProcessException, MetadataException; /** putBack all the memory resources. */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 68776c92c0b5..36cbb4e0f880 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -1974,7 +1974,8 @@ private List getAlignedVisibleMetadataListFromWriterByDeviceID( public void queryForSeriesRegionScanWithoutLock( List pathList, QueryContext queryContext, - List fileScanHandlesForQuery) { + List fileScanHandlesForQuery, + Filter globalTimeFilter) { long startTime = System.nanoTime(); try { Map>> deviceToMemChunkHandleMap = new HashMap<>(); @@ -1995,7 +1996,8 @@ public void queryForSeriesRegionScanWithoutLock( timeLowerBound, measurementToChunkMetaList, measurementToChunkHandleList, - modsToMemtable); + modsToMemtable, + globalTimeFilter); } if (workMemTable != null) { workMemTable.queryForSeriesRegionScan( @@ -2003,7 +2005,8 @@ public void queryForSeriesRegionScanWithoutLock( timeLowerBound, measurementToChunkMetaList, measurementToChunkHandleList, - null); + null, + globalTimeFilter); } IDeviceID deviceID = seriesPath.getDeviceId(); // Some memTable have been flushed already, so we need to get the chunk metadata from @@ -2054,7 +2057,8 @@ public void queryForSeriesRegionScanWithoutLock( public void queryForDeviceRegionScanWithoutLock( Map devicePathsToContext, QueryContext queryContext, - List fileScanHandlesForQuery) { + List fileScanHandlesForQuery, + Filter globalTimeFilter) { long startTime = System.nanoTime(); try { Map>> deviceToMemChunkHandleMap = new HashMap<>(); @@ -2077,7 +2081,8 @@ public void queryForDeviceRegionScanWithoutLock( timeLowerBound, measurementToChunkMetadataList, measurementToMemChunkHandleList, - modsToMemtable); + modsToMemtable, + globalTimeFilter); } if (workMemTable != null) { workMemTable.queryForDeviceRegionScan( @@ -2086,7 +2091,8 @@ public void queryForDeviceRegionScanWithoutLock( timeLowerBound, measurementToChunkMetadataList, measurementToMemChunkHandleList, - null); + null, + globalTimeFilter); } buildChunkHandleForFlushedMemTable( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index ebf23154d150..dbc3183df80c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -35,6 +35,7 @@ import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.read.TimeValuePair; import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.BitMap; import org.apache.tsfile.write.UnSupportedDataTypeException; @@ -48,10 +49,9 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.concurrent.BlockingQueue; -import java.util.stream.Collectors; import static org.apache.iotdb.db.utils.MemUtils.getBinarySize; @@ -574,11 +574,30 @@ public List getSortedList() { return sortedList; } - private void filterDeletedTimestamp( - TVList tvlist, List deletionList, List timestampList) { - long lastTime = Long.MIN_VALUE; + public Optional getAnySatisfiedTimestamp( + List deletionList, Filter globalTimeFilter) { + Optional anySatisfiedTimestamp = + getAnySatisfiedTimestamp(list, deletionList, globalTimeFilter); + if (anySatisfiedTimestamp.isPresent()) { + return anySatisfiedTimestamp; + } + for (TVList tvList : sortedList) { + anySatisfiedTimestamp = getAnySatisfiedTimestamp(tvList, deletionList, globalTimeFilter); + if (anySatisfiedTimestamp.isPresent()) { + break; + } + } + return anySatisfiedTimestamp; + } + + private Optional getAnySatisfiedTimestamp( + TVList tvlist, List deletionList, Filter globalTimeFilter) { int[] deletionCursor = {0}; int rowCount = tvlist.rowCount(); + if (globalTimeFilter != null + && !globalTimeFilter.satisfyStartEndTime(tvlist.getMinTime(), tvlist.getMaxTime())) { + return Optional.empty(); + } for (int i = 0; i < rowCount; i++) { if (tvlist.getBitMap() != null && tvlist.isNullValue(tvlist.getValueIndex(i))) { continue; @@ -588,27 +607,12 @@ private void filterDeletedTimestamp( && ModificationUtils.isPointDeleted(curTime, deletionList, deletionCursor)) { continue; } - - if (i == rowCount - 1 || curTime != lastTime) { - timestampList.add(curTime); + if (globalTimeFilter != null && !globalTimeFilter.satisfy(curTime, null)) { + continue; } - lastTime = curTime; + return Optional.of(curTime); } - } - - public long[] getFilteredTimestamp(List deletionList) { - List timestampList = new ArrayList<>(); - filterDeletedTimestamp(list, deletionList, timestampList); - for (TVList tvList : sortedList) { - filterDeletedTimestamp(tvList, deletionList, timestampList); - } - - // remove duplicated time - List distinctTimestamps = timestampList.stream().distinct().collect(Collectors.toList()); - // sort timestamps - long[] filteredTimestamps = distinctTimestamps.stream().mapToLong(Long::longValue).toArray(); - Arrays.sort(filteredTimestamps); - return filteredTimestamps; + return Optional.empty(); } @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java new file mode 100644 index 000000000000..3967409db52d --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkRegionScanTest.java @@ -0,0 +1,359 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.memtable; + +import org.apache.iotdb.commons.path.AlignedFullPath; +import org.apache.iotdb.commons.path.NonAlignedFullPath; +import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.storageengine.dataregion.read.filescan.IChunkHandle; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; +import org.apache.tsfile.read.common.TimeRange; +import org.apache.tsfile.read.filter.operator.TimeFilterOperators; +import org.apache.tsfile.utils.BitMap; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +@RunWith(Parameterized.class) +public class WritableMemChunkRegionScanTest { + + @Parameterized.Parameters(name = "{0}") + public static Collection data() { + return Arrays.asList(new Object[][] {{0}, {1000}, {10000}, {20000}}); + } + + private int defaultTvListThreshold; + private int tvListSortThreshold; + + public WritableMemChunkRegionScanTest(int tvListSortThreshold) { + this.tvListSortThreshold = tvListSortThreshold; + } + + @Before + public void setup() { + defaultTvListThreshold = IoTDBDescriptor.getInstance().getConfig().getTvListSortThreshold(); + IoTDBDescriptor.getInstance().getConfig().setTVListSortThreshold(tvListSortThreshold); + } + + @After + public void tearDown() { + IoTDBDescriptor.getInstance().getConfig().setTVListSortThreshold(defaultTvListThreshold); + } + + @Test + public void testAlignedWritableMemChunkRegionScan() { + PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0"); + try { + List measurementSchemas = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)); + AlignedWritableMemChunk writableMemChunk = null; + int size = 100000; + for (int i = 0; i < size; i++) { + if (i <= 10000) { + memTable.writeAlignedRow( + new StringArrayDeviceID("root.test.d1"), + measurementSchemas, + i, + new Object[] {1, null, 1}); + } else if (i <= 20000) { + memTable.writeAlignedRow( + new StringArrayDeviceID("root.test.d1"), + measurementSchemas, + i, + new Object[] {null, null, 2}); + } else if (i <= 30000) { + memTable.writeAlignedRow( + new StringArrayDeviceID("root.test.d1"), + measurementSchemas, + i, + new Object[] {3, null, null}); + } else { + memTable.writeAlignedRow( + new StringArrayDeviceID("root.test.d1"), + measurementSchemas, + i, + new Object[] {4, 4, 4}); + } + } + writableMemChunk = + (AlignedWritableMemChunk) + memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), ""); + List bitMaps = new ArrayList<>(); + long[] timestamps = + writableMemChunk.getAnySatisfiedTimestamp( + Arrays.asList( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), + bitMaps, + true, + null); + Assert.assertEquals(2, timestamps.length); + Assert.assertEquals(0, timestamps[0]); + Assert.assertFalse(bitMaps.get(0).isMarked(0)); + Assert.assertTrue(bitMaps.get(0).isMarked(1)); + Assert.assertFalse(bitMaps.get(0).isMarked(2)); + Assert.assertTrue(bitMaps.get(1).isMarked(0)); + Assert.assertFalse(bitMaps.get(1).isMarked(1)); + Assert.assertTrue(bitMaps.get(1).isMarked(2)); + Assert.assertEquals(30001, timestamps[1]); + + bitMaps = new ArrayList<>(); + timestamps = + writableMemChunk.getAnySatisfiedTimestamp( + Arrays.asList( + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(new TimeRange(0, 12000))), + bitMaps, + true, + new TimeFilterOperators.TimeGt(10000000)); + Assert.assertEquals(0, timestamps.length); + + bitMaps = new ArrayList<>(); + timestamps = + writableMemChunk.getAnySatisfiedTimestamp( + Arrays.asList( + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(new TimeRange(0, 12000))), + bitMaps, + true, + new TimeFilterOperators.TimeGt(11000)); + + Assert.assertEquals(3, timestamps.length); + Assert.assertEquals(12001, timestamps[0]); + Assert.assertTrue(bitMaps.get(0).isMarked(0)); + Assert.assertTrue(bitMaps.get(0).isMarked(1)); + Assert.assertFalse(bitMaps.get(0).isMarked(2)); + Assert.assertEquals(20001, timestamps[1]); + Assert.assertFalse(bitMaps.get(1).isMarked(0)); + Assert.assertTrue(bitMaps.get(1).isMarked(1)); + Assert.assertTrue(bitMaps.get(1).isMarked(2)); + Assert.assertEquals(30001, timestamps[2]); + Assert.assertTrue(bitMaps.get(2).isMarked(0)); + Assert.assertFalse(bitMaps.get(2).isMarked(1)); + Assert.assertTrue(bitMaps.get(2).isMarked(2)); + + writableMemChunk.writeAlignedPoints( + 1000001, new Object[] {1, null, null}, measurementSchemas); + writableMemChunk.writeAlignedPoints( + 1000002, new Object[] {null, 1, null}, measurementSchemas); + writableMemChunk.writeAlignedPoints(1000002, new Object[] {1, 1, null}, measurementSchemas); + writableMemChunk.writeAlignedPoints( + 1000003, new Object[] {1, null, null}, measurementSchemas); + writableMemChunk.writeAlignedPoints(1000004, new Object[] {1, null, 1}, measurementSchemas); + bitMaps = new ArrayList<>(); + timestamps = + writableMemChunk.getAnySatisfiedTimestamp( + Arrays.asList( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), + bitMaps, + true, + new TimeFilterOperators.TimeGt(1000000)); + Assert.assertEquals(3, timestamps.length); + Assert.assertEquals(1000001, timestamps[0]); + Assert.assertFalse(bitMaps.get(0).isMarked(0)); + Assert.assertTrue(bitMaps.get(0).isMarked(1)); + Assert.assertTrue(bitMaps.get(0).isMarked(2)); + Assert.assertEquals(1000002, timestamps[1]); + Assert.assertTrue(bitMaps.get(1).isMarked(0)); + Assert.assertFalse(bitMaps.get(1).isMarked(1)); + Assert.assertTrue(bitMaps.get(1).isMarked(2)); + Assert.assertEquals(1000004, timestamps[2]); + Assert.assertTrue(bitMaps.get(2).isMarked(0)); + Assert.assertTrue(bitMaps.get(2).isMarked(1)); + Assert.assertFalse(bitMaps.get(2).isMarked(2)); + + Map> chunkHandleMap = new HashMap<>(); + memTable.queryForDeviceRegionScan( + new StringArrayDeviceID("root.test.d1"), + true, + Long.MIN_VALUE, + new HashMap<>(), + chunkHandleMap, + Collections.emptyList(), + new TimeFilterOperators.TimeGt(1000000)); + Assert.assertEquals(3, chunkHandleMap.size()); + Assert.assertArrayEquals( + new long[] {1000001, 1000001}, chunkHandleMap.get("s1").get(0).getPageStatisticsTime()); + Assert.assertArrayEquals( + new long[] {1000002, 1000002}, chunkHandleMap.get("s2").get(0).getPageStatisticsTime()); + Assert.assertArrayEquals( + new long[] {1000004, 1000004}, chunkHandleMap.get("s3").get(0).getPageStatisticsTime()); + + memTable.queryForSeriesRegionScan( + new AlignedFullPath( + new StringArrayDeviceID("root.test.d1"), + IMeasurementSchema.getMeasurementNameList(measurementSchemas), + measurementSchemas), + Long.MIN_VALUE, + new HashMap<>(), + chunkHandleMap, + Collections.emptyList(), + new TimeFilterOperators.TimeGt(1000000)); + Assert.assertEquals(3, chunkHandleMap.size()); + Assert.assertArrayEquals( + new long[] {1000001, 1000001}, chunkHandleMap.get("s1").get(0).getPageStatisticsTime()); + Assert.assertArrayEquals( + new long[] {1000002, 1000002}, chunkHandleMap.get("s2").get(0).getPageStatisticsTime()); + Assert.assertArrayEquals( + new long[] {1000004, 1000004}, chunkHandleMap.get("s3").get(0).getPageStatisticsTime()); + } finally { + memTable.release(); + } + } + + @Test + public void testTableWritableMemChunkRegionScan() { + List measurementSchemas = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)); + AlignedWritableMemChunk writableMemChunk = + new AlignedWritableMemChunk(measurementSchemas, true); + int size = 100000; + for (int i = 0; i < size; i++) { + if (i <= 10000) { + writableMemChunk.writeAlignedPoints(i, new Object[] {1, null, 1}, measurementSchemas); + } else if (i <= 20000) { + writableMemChunk.writeAlignedPoints(i, new Object[] {null, null, 2}, measurementSchemas); + } else if (i <= 30000) { + writableMemChunk.writeAlignedPoints(i, new Object[] {3, null, null}, measurementSchemas); + } else { + writableMemChunk.writeAlignedPoints(i, new Object[] {4, 4, 4}, measurementSchemas); + } + } + List bitMaps = new ArrayList<>(); + long[] timestamps = + writableMemChunk.getAnySatisfiedTimestamp( + Arrays.asList( + Collections.emptyList(), Collections.emptyList(), Collections.emptyList()), + bitMaps, + false, + null); + Assert.assertEquals(1, timestamps.length); + Assert.assertEquals(0, timestamps[0]); + Assert.assertFalse(bitMaps.get(0).isMarked(0)); + Assert.assertTrue(bitMaps.get(0).isMarked(1)); + Assert.assertFalse(bitMaps.get(0).isMarked(2)); + + bitMaps = new ArrayList<>(); + timestamps = + writableMemChunk.getAnySatisfiedTimestamp( + Arrays.asList( + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(new TimeRange(0, 12000))), + bitMaps, + false, + new TimeFilterOperators.TimeGt(11000)); + + Assert.assertEquals(1, timestamps.length); + Assert.assertEquals(11001, timestamps[0]); + Assert.assertTrue(bitMaps.get(0).isMarked(0)); + Assert.assertTrue(bitMaps.get(0).isMarked(1)); + Assert.assertTrue(bitMaps.get(0).isMarked(2)); + } + + @Test + public void testNonAlignedWritableMemChunkRegionScan() { + PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0"); + try { + MeasurementSchema measurementSchema = new MeasurementSchema("s1", TSDataType.INT32); + int size = 100000; + for (int i = 0; i < size; i++) { + memTable.write( + new StringArrayDeviceID("root.test.d1"), + Collections.singletonList(measurementSchema), + i, + new Object[] {i}); + } + WritableMemChunk writableMemChunk = + (WritableMemChunk) + memTable.getWritableMemChunk(new StringArrayDeviceID("root.test.d1"), "s1"); + Optional timestamp = writableMemChunk.getAnySatisfiedTimestamp(null, null); + Assert.assertTrue(timestamp.isPresent()); + Assert.assertEquals(0, timestamp.get().longValue()); + + timestamp = + writableMemChunk.getAnySatisfiedTimestamp( + null, new TimeFilterOperators.TimeBetweenAnd(1000L, 2000L)); + Assert.assertTrue(timestamp.isPresent()); + Assert.assertEquals(1000, timestamp.get().longValue()); + + timestamp = + writableMemChunk.getAnySatisfiedTimestamp( + Collections.singletonList(new TimeRange(1, 1500)), + new TimeFilterOperators.TimeBetweenAnd(1000L, 2000L)); + Assert.assertTrue(timestamp.isPresent()); + Assert.assertEquals(1501, timestamp.get().longValue()); + + timestamp = + writableMemChunk.getAnySatisfiedTimestamp( + Collections.singletonList(new TimeRange(1, 1500)), + new TimeFilterOperators.TimeBetweenAnd(100000L, 200000L)); + Assert.assertFalse(timestamp.isPresent()); + + Map> chunkHandleMap = new HashMap<>(); + memTable.queryForDeviceRegionScan( + new StringArrayDeviceID("root.test.d1"), + false, + Long.MIN_VALUE, + new HashMap<>(), + chunkHandleMap, + Collections.emptyList(), + new TimeFilterOperators.TimeGt(1)); + Assert.assertEquals(1, chunkHandleMap.size()); + Assert.assertArrayEquals( + new long[] {2, 2}, chunkHandleMap.get("s1").get(0).getPageStatisticsTime()); + memTable.queryForSeriesRegionScan( + new NonAlignedFullPath(new StringArrayDeviceID("root.test.d1"), measurementSchema), + Long.MIN_VALUE, + new HashMap<>(), + chunkHandleMap, + Collections.emptyList(), + new TimeFilterOperators.TimeGt(1)); + Assert.assertEquals(1, chunkHandleMap.size()); + Assert.assertArrayEquals( + new long[] {2, 2}, chunkHandleMap.get("s1").get(0).getPageStatisticsTime()); + } finally { + memTable.release(); + } + } +}