Skip to content

Commit cb705d1

Browse files
authored
Fixed the schema cache calculation 2 & The potential NPE caused by concurrent invalidate and update (#16834)
* fix * test * sl * fix * fix * fix * sonar * fix * fix * test
1 parent e3127e6 commit cb705d1

File tree

9 files changed

+101
-66
lines changed

9 files changed

+101
-66
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/CacheEntryGroupImpl.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ public T computeCacheEntry(
7272
return cacheEntryMap.compute(secondKey, computation.apply(memory));
7373
}
7474

75+
@Override
76+
public T computeCacheEntryIfPresent(
77+
final SK secondKey, final Function<AtomicLong, BiFunction<SK, T, T>> computation) {
78+
return cacheEntryMap.computeIfPresent(secondKey, computation.apply(memory));
79+
}
80+
7581
@Override
7682
public long removeCacheEntry(final SK secondKey) {
7783
final T result = cacheEntryMap.remove(secondKey);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/DualKeyCacheImpl.java

Lines changed: 24 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,28 @@ public void update(
144144
@Override
145145
public void update(
146146
final FK firstKey, final Predicate<SK> secondKeyChecker, final ToIntFunction<V> updater) {
147-
final ICacheEntryGroup<FK, SK, V, T> entryGroup = firstKeyMap.get(firstKey);
147+
clearSecondEntry(firstKeyMap.get(firstKey), secondKeyChecker, updater);
148+
mayEvict();
149+
}
150+
151+
@Override
152+
public void update(
153+
final Predicate<FK> firstKeyChecker,
154+
final Predicate<SK> secondKeyChecker,
155+
final ToIntFunction<V> updater) {
156+
for (final FK firstKey : firstKeyMap.getAllKeys()) {
157+
if (!firstKeyChecker.test(firstKey)) {
158+
continue;
159+
}
160+
clearSecondEntry(firstKeyMap.get(firstKey), secondKeyChecker, updater);
161+
}
162+
mayEvict();
163+
}
164+
165+
public void clearSecondEntry(
166+
final ICacheEntryGroup<FK, SK, V, T> entryGroup,
167+
final Predicate<SK> secondKeyChecker,
168+
final ToIntFunction<V> updater) {
148169
if (Objects.nonNull(entryGroup)) {
149170
entryGroup
150171
.getAllCacheEntries()
@@ -153,49 +174,15 @@ public void update(
153174
if (!secondKeyChecker.test(entry.getKey())) {
154175
return;
155176
}
156-
entryGroup.computeCacheEntry(
177+
entryGroup.computeCacheEntryIfPresent(
157178
entry.getKey(),
158179
memory ->
159180
(secondKey, cacheEntry) -> {
160-
if (Objects.nonNull(cacheEntry)) {
161-
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
162-
}
181+
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
163182
return cacheEntry;
164183
});
165184
});
166185
}
167-
mayEvict();
168-
}
169-
170-
@Override
171-
public void update(
172-
final Predicate<FK> firstKeyChecker,
173-
final Predicate<SK> secondKeyChecker,
174-
final ToIntFunction<V> updater) {
175-
for (final FK firstKey : firstKeyMap.getAllKeys()) {
176-
if (!firstKeyChecker.test(firstKey)) {
177-
continue;
178-
}
179-
final ICacheEntryGroup<FK, SK, V, T> entryGroup = firstKeyMap.get(firstKey);
180-
if (Objects.nonNull(entryGroup)) {
181-
entryGroup
182-
.getAllCacheEntries()
183-
.forEachRemaining(
184-
entry -> {
185-
if (!secondKeyChecker.test(entry.getKey())) {
186-
return;
187-
}
188-
entryGroup.computeCacheEntry(
189-
entry.getKey(),
190-
memory ->
191-
(secondKey, cacheEntry) -> {
192-
memory.getAndAdd(updater.applyAsInt(cacheEntry.getValue()));
193-
return cacheEntry;
194-
});
195-
});
196-
}
197-
mayEvict();
198-
}
199186
}
200187

201188
private void mayEvict() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/schema/dualkeycache/impl/ICacheEntryGroup.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ interface ICacheEntryGroup<FK, SK, V, T extends ICacheEntry<SK, V>> {
4545
T computeCacheEntry(
4646
final SK secondKey, final Function<AtomicLong, BiFunction<SK, T, T>> computation);
4747

48+
T computeCacheEntryIfPresent(
49+
final SK secondKey, final Function<AtomicLong, BiFunction<SK, T, T>> computation);
50+
4851
long removeCacheEntry(final SK secondKey);
4952

5053
boolean isEmpty();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceCacheEntry.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,13 +184,13 @@ int initOrInvalidateLastCache(
184184
final boolean isInvalidate,
185185
final boolean isTableModel) {
186186
int result =
187-
lastCache.compareAndSet(null, new TableDeviceLastCache())
187+
lastCache.compareAndSet(null, new TableDeviceLastCache(isTableModel))
188188
? TableDeviceLastCache.INSTANCE_SIZE
189189
: 0;
190190
final TableDeviceLastCache cache = lastCache.get();
191191
result +=
192192
Objects.nonNull(cache)
193-
? cache.initOrInvalidate(database, tableName, measurements, isInvalidate, isTableModel)
193+
? cache.initOrInvalidate(database, tableName, measurements, isInvalidate)
194194
: 0;
195195
return Objects.nonNull(lastCache.get()) ? result : 0;
196196
}
@@ -207,9 +207,9 @@ int tryUpdateLastCache(final String[] measurements, final TimeValuePair[] timeVa
207207
return tryUpdateLastCache(measurements, timeValuePairs, false);
208208
}
209209

210-
int invalidateLastCache(final String measurement, final boolean isTableModel) {
210+
int invalidateLastCache(final String measurement) {
211211
final TableDeviceLastCache cache = lastCache.get();
212-
final int result = Objects.nonNull(cache) ? cache.invalidate(measurement, isTableModel) : 0;
212+
final int result = Objects.nonNull(cache) ? cache.invalidate(measurement) : 0;
213213
return Objects.nonNull(lastCache.get()) ? result : 0;
214214
}
215215

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceLastCache.java

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -93,13 +93,17 @@ public TSDataType getDataType() {
9393

9494
// Time is seen as "" as a measurement
9595
private final Map<String, TimeValuePair> measurement2CachedLastMap = new ConcurrentHashMap<>();
96+
private final boolean isTableModel;
97+
98+
TableDeviceLastCache(final boolean isTableModel) {
99+
this.isTableModel = isTableModel;
100+
}
96101

97102
int initOrInvalidate(
98103
final String database,
99104
final String tableName,
100105
final String[] measurements,
101-
final boolean isInvalidate,
102-
final boolean isTableModel) {
106+
final boolean isInvalidate) {
103107
final AtomicInteger diff = new AtomicInteger(0);
104108

105109
for (final String measurement : measurements) {
@@ -121,13 +125,13 @@ int initOrInvalidate(
121125
if (Objects.isNull(newPair)) {
122126
diff.addAndGet(
123127
-((isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(finalMeasurement))
124-
+ getTVPairEntrySize(tvPair)));
128+
+ getTvPairEntrySize(tvPair)));
125129
return null;
126130
}
127131
if (Objects.isNull(tvPair)) {
128132
diff.addAndGet(
129133
(isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(finalMeasurement))
130-
+ getTVPairEntrySize(newPair));
134+
+ getTvPairEntrySize(newPair));
131135
return newPair;
132136
}
133137
return tvPair;
@@ -151,7 +155,9 @@ int tryUpdate(
151155
for (int i = 0; i < measurements.length; ++i) {
152156
if (Objects.isNull(timeValuePairs[i])) {
153157
if (invalidateNull) {
154-
measurement2CachedLastMap.remove(measurements[i]);
158+
diff.addAndGet(
159+
-((int) RamUsageEstimator.sizeOf(measurements[i])
160+
+ getTvPairEntrySize(measurement2CachedLastMap.remove(measurements[i]))));
155161
}
156162
continue;
157163
}
@@ -181,15 +187,15 @@ int tryUpdate(
181187
}
182188

183189
@GuardedBy("DataRegionInsertLock#writeLock")
184-
int invalidate(final String measurement, final boolean isTableModel) {
190+
int invalidate(final String measurement) {
185191
final AtomicInteger diff = new AtomicInteger();
186192
final AtomicLong time = new AtomicLong();
187193
measurement2CachedLastMap.computeIfPresent(
188194
measurement,
189195
(s, timeValuePair) -> {
190196
diff.set(
191197
(isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(s))
192-
+ getTVPairEntrySize(timeValuePair));
198+
+ getTvPairEntrySize(timeValuePair));
193199
time.set(timeValuePair.getTimestamp());
194200
return null;
195201
});
@@ -200,7 +206,7 @@ int invalidate(final String measurement, final boolean isTableModel) {
200206
"",
201207
(s, timeValuePair) -> {
202208
if (timeValuePair.getTimestamp() <= time.get()) {
203-
diff.addAndGet(getTVPairEntrySize(timeValuePair));
209+
diff.addAndGet((int) RamUsageEstimator.sizeOf(s) + getTvPairEntrySize(timeValuePair));
204210
return null;
205211
}
206212
return timeValuePair;
@@ -209,13 +215,18 @@ int invalidate(final String measurement, final boolean isTableModel) {
209215
return diff.get();
210216
}
211217

212-
private int getTVPairEntrySize(final TimeValuePair tvPair) {
213-
return (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
214-
+ ((Objects.isNull(tvPair)
215-
|| tvPair == PLACEHOLDER_TIME_VALUE_PAIR
216-
|| tvPair == EMPTY_TIME_VALUE_PAIR)
217-
? 0
218-
: tvPair.getSize());
218+
private static int getTvPairEntrySize(final TimeValuePair tvPair) {
219+
return (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY + getTvPairSize(tvPair);
220+
}
221+
222+
private static int getTvPairSize(final TimeValuePair tvPair) {
223+
return isEmptyTvPair(tvPair) ? 0 : tvPair.getSize();
224+
}
225+
226+
private static boolean isEmptyTvPair(final TimeValuePair tvPair) {
227+
return Objects.isNull(tvPair)
228+
|| tvPair == PLACEHOLDER_TIME_VALUE_PAIR
229+
|| tvPair == EMPTY_TIME_VALUE_PAIR;
219230
}
220231

221232
@Nullable
@@ -262,16 +273,21 @@ Optional<Pair<OptionalLong, TsPrimitiveType[]>> getLastRow(
262273
int estimateSize() {
263274
return INSTANCE_SIZE
264275
+ (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY * measurement2CachedLastMap.size()
265-
+ measurement2CachedLastMap.values().stream()
266-
.mapToInt(TimeValuePair::getSize)
276+
+ measurement2CachedLastMap.entrySet().stream()
277+
.mapToInt(
278+
entry ->
279+
(isTableModel ? 0 : (int) RamUsageEstimator.sizeOf(entry.getKey()))
280+
+ TableDeviceLastCache.getTvPairSize(entry.getValue()))
267281
.reduce(0, Integer::sum);
268282
}
269283

270284
private static int getDiffSize(
271285
final TimeValuePair oldTimeValuePair, final TimeValuePair newTimeValuePair) {
272-
if (oldTimeValuePair == EMPTY_TIME_VALUE_PAIR
273-
|| oldTimeValuePair == PLACEHOLDER_TIME_VALUE_PAIR) {
274-
return newTimeValuePair.getSize();
286+
if (isEmptyTvPair(oldTimeValuePair)) {
287+
return getTvPairSize(newTimeValuePair);
288+
}
289+
if (isEmptyTvPair(newTimeValuePair)) {
290+
return -getTvPairSize(oldTimeValuePair);
275291
}
276292
final TsPrimitiveType oldValue = oldTimeValuePair.getValue();
277293
final TsPrimitiveType newValue = newTimeValuePair.getValue();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -456,7 +456,7 @@ void invalidateLastCache(final PartialPath devicePath, final String measurement)
456456
final ToIntFunction<TableDeviceCacheEntry> updateFunction =
457457
PathPatternUtil.hasWildcard(measurement)
458458
? entry -> -entry.invalidateLastCache()
459-
: entry -> -entry.invalidateLastCache(measurement, false);
459+
: entry -> -entry.invalidateLastCache(measurement);
460460

461461
if (!devicePath.hasWildcard()) {
462462
final IDeviceID deviceID = devicePath.getIDeviceID();
@@ -543,7 +543,7 @@ long getRequestCount() {
543543
return dualKeyCache.stats().requestCount();
544544
}
545545

546-
long getMemoryUsage() {
546+
public long getMemoryUsage() {
547547
return dualKeyCache.stats().memoryUsage();
548548
}
549549

@@ -679,7 +679,7 @@ public void invalidate(
679679
final ToIntFunction<TableDeviceCacheEntry> updateFunction =
680680
isAttributeColumn
681681
? entry -> -entry.invalidateAttributeColumn(columnName)
682-
: entry -> -entry.invalidateLastCache(columnName, true);
682+
: entry -> -entry.invalidateLastCache(columnName);
683683
dualKeyCache.update(new TableId(null, tableName), deviceID -> true, updateFunction);
684684
} finally {
685685
readWriteLock.writeLock().unlock();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TreeDeviceNormalSchema.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,13 @@ private int putEntry(
9999
public int estimateSize() {
100100
// Do not need to calculate database because it is interned
101101
return INSTANCE_SIZE
102+
+ measurementMap.size() * (int) RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY
102103
+ measurementMap.entrySet().stream()
103104
.mapToInt(
104105
entry ->
105106
Math.toIntExact(
106107
RamUsageEstimator.sizeOf(entry.getKey())
107-
+ SchemaCacheEntry.estimateSize(entry.getValue())
108-
+ RamUsageEstimator.HASHTABLE_RAM_BYTES_PER_ENTRY))
108+
+ SchemaCacheEntry.estimateSize(entry.getValue())))
109109
.reduce(0, Integer::sum);
110110
}
111111
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/metadata/cache/TreeDeviceSchemaCacheManagerTest.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.iotdb.db.queryengine.common.schematree.ClusterSchemaTree;
2626
import org.apache.iotdb.db.queryengine.common.schematree.ISchemaTree;
2727
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.SchemaCacheEntry;
28+
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
2829
import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager;
2930
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
3031
import org.apache.iotdb.db.schemaengine.template.Template;
@@ -264,6 +265,11 @@ public void testUpdateLastCache() throws IllegalPathException {
264265
Assert.assertEquals(
265266
new TimeValuePair(2, new TsPrimitiveType.TsInt(2)),
266267
treeDeviceSchemaCacheManager.getLastCache(new MeasurementPath("root.db.d.s3")));
268+
269+
Assert.assertTrue(TableDeviceSchemaCache.getInstance().getMemoryUsage() > 0);
270+
271+
treeDeviceSchemaCacheManager.cleanUp();
272+
Assert.assertEquals(0, TableDeviceSchemaCache.getInstance().getMemoryUsage());
267273
}
268274

269275
@Test
@@ -317,5 +323,13 @@ public void testPut() throws Exception {
317323
Assert.assertEquals(1, measurementPaths.size());
318324
Assert.assertEquals(TSDataType.FLOAT, measurementPaths.get(0).getMeasurementSchema().getType());
319325
Assert.assertEquals("root.sg1.d3.s1", measurementPaths.get(0).getFullPath());
326+
327+
treeDeviceSchemaCacheManager.invalidateLastCache(new MeasurementPath("root.sg1.**"));
328+
treeDeviceSchemaCacheManager.invalidateDatabaseLastCache("root.sg1");
329+
TableDeviceSchemaCache.getInstance().invalidateTreeSchema();
330+
Assert.assertTrue(TableDeviceSchemaCache.getInstance().getMemoryUsage() > 0);
331+
332+
TableDeviceSchemaCache.getInstance().invalidateAll();
333+
Assert.assertEquals(0, TableDeviceSchemaCache.getInstance().getMemoryUsage());
320334
}
321335
}

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCacheTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -248,6 +248,11 @@ public void testDeviceCache() {
248248
Assert.assertNull(
249249
cache.getDeviceAttribute(
250250
database1, convertTagValuesToDeviceID(table2, new String[] {"hebei", "p_1", "d_1"})));
251+
252+
Assert.assertTrue(cache.getMemoryUsage() > 0);
253+
254+
cache.invalidateAll();
255+
Assert.assertEquals(0, cache.getMemoryUsage());
251256
}
252257

253258
@Test
@@ -502,6 +507,10 @@ public void testLastCache() {
502507
TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE, new TsPrimitiveType.TsInt(3),
503508
},
504509
result.get().getRight());
510+
511+
cache.invalidateLastCache();
512+
cache.invalidateLastCache();
513+
Assert.assertTrue(cache.getMemoryUsage() > 0);
505514
}
506515

507516
@Test

0 commit comments

Comments
 (0)