Skip to content

Commit 1f3396a

Browse files
committed
[fix](hive) fix invalid edit after inserting hive partition table
1 parent 29428ff commit 1f3396a

File tree

4 files changed

+51
-33
lines changed

4 files changed

+51
-33
lines changed

fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,9 @@
2626
import org.apache.doris.datasource.ExternalDatabase;
2727
import org.apache.doris.datasource.ExternalObjectLog;
2828
import org.apache.doris.datasource.ExternalTable;
29+
import org.apache.doris.datasource.hive.HMSExternalCatalog;
2930
import org.apache.doris.datasource.hive.HMSExternalTable;
31+
import org.apache.doris.datasource.hive.HiveMetaStoreCache;
3032
import org.apache.doris.persist.OperationType;
3133

3234
import com.google.common.base.Strings;
@@ -183,13 +185,20 @@ public void replayRefreshTable(ExternalObjectLog log) {
183185
db.get().unregisterTable(log.getTableName());
184186
db.get().resetMetaCacheNames();
185187
} else {
186-
List<String> partitionNames = log.getPartitionNames();
187-
if (partitionNames != null && !partitionNames.isEmpty()) {
188-
// Partition-level cache invalidation
189-
Env.getCurrentEnv().getExtMetaCacheMgr()
190-
.invalidatePartitionsCache(table.get(), partitionNames);
191-
LOG.info("replay refresh partitions for table {}, partitions count: {}",
192-
table.get().getName(), partitionNames.size());
188+
List<String> modifiedPartNames = log.getPartitionNames();
189+
List<String> newPartNames = log.getNewPartitionNames();
190+
if (catalog instanceof HMSExternalCatalog
191+
&& ((modifiedPartNames != null && !modifiedPartNames.isEmpty())
192+
|| (newPartNames != null && !newPartNames.isEmpty()))) {
193+
// Partition-level cache invalidation, only for hive catalog
194+
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
195+
.getMetaStoreCache((HMSExternalCatalog) catalog);
196+
cache.refreshAffectedPartitionsCache((HMSExternalTable) table.get(), modifiedPartNames, newPartNames);
197+
LOG.info("replay refresh partitions for table {}, "
198+
+ "modified partitions count: {}, "
199+
+ "new partitions count: {}",
200+
table.get().getName(), modifiedPartNames == null ? 0 : modifiedPartNames.size(),
201+
newPartNames == null ? 0 : newPartNames.size());
193202
} else {
194203
// Full table cache invalidation
195204
refreshTableInternal(db.get(), table.get(), log.getLastUpdateTime());

fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ public class ExternalObjectLog implements Writable {
5858
@SerializedName(value = "partitionNames")
5959
private List<String> partitionNames;
6060

61+
@SerializedName(value = "newPartitionNames")
62+
private List<String> newPartitionNames;
63+
6164
@SerializedName(value = "lastUpdateTime")
6265
private long lastUpdateTime;
6366

@@ -81,12 +84,13 @@ public static ExternalObjectLog createForRefreshTable(long catalogId, String dbN
8184
}
8285

8386
public static ExternalObjectLog createForRefreshPartitions(long catalogId, String dbName, String tblName,
84-
List<String> partitionNames) {
87+
List<String> modifiedPartNames, List<String> newPartNames) {
8588
ExternalObjectLog externalObjectLog = new ExternalObjectLog();
8689
externalObjectLog.setCatalogId(catalogId);
8790
externalObjectLog.setDbName(dbName);
8891
externalObjectLog.setTableName(tblName);
89-
externalObjectLog.setPartitionNames(partitionNames);
92+
externalObjectLog.setPartitionNames(modifiedPartNames);
93+
externalObjectLog.setNewPartitionNames(newPartNames);
9094
return externalObjectLog;
9195
}
9296

@@ -134,6 +138,12 @@ public String debugForRefreshTable() {
134138
} else {
135139
sb.append("tableId: " + tableId + "]");
136140
}
141+
if (partitionNames != null && !partitionNames.isEmpty()) {
142+
sb.append(", partitionNames: " + partitionNames);
143+
}
144+
if (newPartitionNames != null && !newPartitionNames.isEmpty()) {
145+
sb.append(", newPartitionNames: " + newPartitionNames);
146+
}
137147
return sb.toString();
138148
}
139149
}

fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetaStoreCache.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -573,16 +573,16 @@ public void invalidatePartitionCache(ExternalTable dorisTable, String partitionN
573573
*
574574
* @param table The Hive table whose partitions were modified
575575
* @param partitionUpdates List of partition updates from BE
576+
* @param modifiedPartNames Output list to collect names of modified partitions
577+
* @param newPartNames Output list to collect names of new partitions
576578
*/
577579
public void refreshAffectedPartitions(HMSExternalTable table,
578-
List<org.apache.doris.thrift.THivePartitionUpdate> partitionUpdates) {
580+
List<org.apache.doris.thrift.THivePartitionUpdate> partitionUpdates,
581+
List<String> modifiedPartNames, List<String> newPartNames) {
579582
if (partitionUpdates == null || partitionUpdates.isEmpty()) {
580583
return;
581584
}
582585

583-
List<String> modifiedPartitionNames = new ArrayList<>();
584-
List<String> newPartitionNames = new ArrayList<>();
585-
586586
for (org.apache.doris.thrift.THivePartitionUpdate update : partitionUpdates) {
587587
String partitionName = update.getName();
588588
// Skip if partition name is null/empty (non-partitioned table case)
@@ -593,10 +593,10 @@ public void refreshAffectedPartitions(HMSExternalTable table,
593593
switch (update.getUpdateMode()) {
594594
case APPEND:
595595
case OVERWRITE:
596-
modifiedPartitionNames.add(partitionName);
596+
modifiedPartNames.add(partitionName);
597597
break;
598598
case NEW:
599-
newPartitionNames.add(partitionName);
599+
newPartNames.add(partitionName);
600600
break;
601601
default:
602602
LOG.warn("Unknown update mode {} for partition {}",
@@ -605,20 +605,26 @@ public void refreshAffectedPartitions(HMSExternalTable table,
605605
}
606606
}
607607

608+
refreshAffectedPartitionsCache(table, modifiedPartNames, newPartNames);
609+
}
610+
611+
public void refreshAffectedPartitionsCache(HMSExternalTable table,
612+
List<String> modifiedPartNames, List<String> newPartNames) {
613+
608614
// Invalidate cache for modified partitions (both partition cache and file cache)
609-
for (String partitionName : modifiedPartitionNames) {
615+
for (String partitionName : modifiedPartNames) {
610616
invalidatePartitionCache(table, partitionName);
611617
}
612618

613619
// Add new partitions to partition values cache
614-
if (!newPartitionNames.isEmpty()) {
615-
addPartitionsCache(table.getOrBuildNameMapping(), newPartitionNames,
620+
if (!newPartNames.isEmpty()) {
621+
addPartitionsCache(table.getOrBuildNameMapping(), newPartNames,
616622
table.getPartitionColumnTypes(Optional.empty()));
617623
}
618624

619625
// Log summary
620626
LOG.info("Refreshed cache for table {}: {} modified partitions, {} new partitions",
621-
table.getName(), modifiedPartitionNames.size(), newPartitionNames.size());
627+
table.getName(), modifiedPartNames.size(), newPartNames.size());
622628
}
623629

624630
public void invalidateDbCache(String dbName) {

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/HiveInsertExecutor.java

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,10 @@
3434
import org.apache.doris.transaction.TransactionType;
3535

3636
import com.google.common.base.Preconditions;
37+
import com.google.common.collect.Lists;
3738
import org.apache.logging.log4j.LogManager;
3839
import org.apache.logging.log4j.Logger;
3940

40-
import java.util.ArrayList;
4141
import java.util.List;
4242
import java.util.Optional;
4343

@@ -86,34 +86,27 @@ protected void doAfterCommit() throws DdlException {
8686

8787
// For partitioned tables, do selective partition refresh
8888
// For non-partitioned tables, do full table cache invalidation
89-
List<String> affectedPartitionNames = null;
89+
List<String> modifiedPartNames = Lists.newArrayList();
90+
List<String> newPartNames = Lists.newArrayList();
9091
if (hmsTable.isPartitionedTable() && partitionUpdates != null && !partitionUpdates.isEmpty()) {
9192
HiveMetaStoreCache cache = Env.getCurrentEnv().getExtMetaCacheMgr()
9293
.getMetaStoreCache((HMSExternalCatalog) hmsTable.getCatalog());
93-
cache.refreshAffectedPartitions(hmsTable, partitionUpdates);
94-
95-
// Collect partition names for edit log
96-
affectedPartitionNames = new ArrayList<>();
97-
for (THivePartitionUpdate update : partitionUpdates) {
98-
String partitionName = update.getName();
99-
if (partitionName != null && !partitionName.isEmpty()) {
100-
affectedPartitionNames.add(partitionName);
101-
}
102-
}
94+
cache.refreshAffectedPartitions(hmsTable, partitionUpdates, modifiedPartNames, newPartNames);
10395
} else {
10496
// Non-partitioned table or no partition updates, do full table refresh
10597
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(hmsTable);
10698
}
10799

108100
// Write edit log to notify other FEs
109101
ExternalObjectLog log;
110-
if (affectedPartitionNames != null && !affectedPartitionNames.isEmpty()) {
102+
if (!modifiedPartNames.isEmpty() || !newPartNames.isEmpty()) {
111103
// Partition-level refresh for other FEs
112104
log = ExternalObjectLog.createForRefreshPartitions(
113105
hmsTable.getCatalog().getId(),
114106
table.getDatabase().getFullName(),
115107
table.getName(),
116-
affectedPartitionNames);
108+
modifiedPartNames,
109+
newPartNames);
117110
} else {
118111
// Full table refresh for other FEs
119112
log = ExternalObjectLog.createForRefreshTable(

0 commit comments

Comments
 (0)