Skip to content

Commit 87bafe8

Browse files
authored
[spark] Add partition filter for incremental clustering procedure (#6640)
1 parent c95dcec commit 87bafe8

File tree

2 files changed

+42
-3
lines changed

2 files changed

+42
-3
lines changed

paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,8 @@ private boolean execute(
268268
break;
269269
case BUCKET_UNAWARE:
270270
if (clusterIncrementalEnabled) {
271-
clusterIncrementalUnAwareBucketTable(table, fullCompact, relation);
271+
clusterIncrementalUnAwareBucketTable(
272+
table, partitionPredicate, fullCompact, relation);
272273
} else {
273274
compactUnAwareBucketTable(
274275
table, partitionPredicate, partitionIdleTime, javaSparkContext);
@@ -530,8 +531,12 @@ private void sortCompactUnAwareBucketTable(
530531
}
531532

532533
private void clusterIncrementalUnAwareBucketTable(
533-
FileStoreTable table, boolean fullCompaction, DataSourceV2Relation relation) {
534-
IncrementalClusterManager incrementalClusterManager = new IncrementalClusterManager(table);
534+
FileStoreTable table,
535+
@Nullable PartitionPredicate partitionPredicate,
536+
boolean fullCompaction,
537+
DataSourceV2Relation relation) {
538+
IncrementalClusterManager incrementalClusterManager =
539+
new IncrementalClusterManager(table, partitionPredicate);
535540
Map<BinaryRow, CompactUnit> compactUnits =
536541
incrementalClusterManager.createCompactUnits(fullCompaction);
537542

paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CompactProcedureTestBase.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1139,6 +1139,40 @@ abstract class CompactProcedureTestBase extends PaimonSparkTestBase with StreamT
11391139
}
11401140
}
11411141

1142+
test("Paimon Procedure: cluster for partitioned table with partition filter") {
1143+
sql(
1144+
"""
1145+
|CREATE TABLE T (a INT, b INT, pt INT)
1146+
|PARTITIONED BY (pt)
1147+
|TBLPROPERTIES (
1148+
| 'bucket'='-1', 'num-levels'='6', 'num-sorted-run.compaction-trigger'='2',
1149+
| 'clustering.columns'='a,b', 'clustering.strategy'='zorder', 'clustering.incremental' = 'true'
1150+
|)
1151+
|""".stripMargin)
1152+
1153+
sql("INSERT INTO T VALUES (0, 0, 0), (0, 0, 1)")
1154+
sql("INSERT INTO T VALUES (0, 1, 0), (0, 1, 1)")
1155+
sql("INSERT INTO T VALUES (0, 2, 0), (0, 2, 1)")
1156+
sql("INSERT INTO T VALUES (1, 0, 0), (1, 0, 1)")
1157+
sql("INSERT INTO T VALUES (1, 1, 0), (1, 1, 1)")
1158+
sql("INSERT INTO T VALUES (1, 2, 0), (1, 2, 1)")
1159+
sql("INSERT INTO T VALUES (2, 0, 0), (2, 0, 1)")
1160+
sql("INSERT INTO T VALUES (2, 1, 0), (2, 1, 1)")
1161+
sql("INSERT INTO T VALUES (2, 2, 0), (2, 2, 1)")
1162+
1163+
sql("CALL sys.compact(table => 'T', where => 'pt = 0')")
1164+
checkAnswer(
1165+
sql("select distinct partition, level from `T$files` order by partition"),
1166+
Seq(Row("{0}", 5), Row("{1}", 0))
1167+
)
1168+
1169+
sql("CALL sys.compact(table => 'T', where => 'pt = 1')")
1170+
checkAnswer(
1171+
sql("select distinct partition, level from `T$files` order by partition"),
1172+
Seq(Row("{0}", 5), Row("{1}", 5))
1173+
)
1174+
}
1175+
11421176
test("Paimon Procedure: cluster with deletion vectors") {
11431177
failAfter(Span(5, org.scalatest.time.Minutes)) {
11441178
withTempDir {

0 commit comments

Comments
 (0)