Skip to content

Commit 38aa584

Browse files
author
zhangyongxiang.alpha
committed
[spark] Support drop partition from top level
1 parent 5ff77ac commit 38aa584

File tree

11 files changed

+454
-10
lines changed

11 files changed

+454
-10
lines changed

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/analysis/PaimonAnalysis.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package org.apache.paimon.spark.catalyst.analysis
2121
import org.apache.paimon.spark.{SparkConnectorOptions, SparkTable}
2222
import org.apache.paimon.spark.catalyst.Compatibility
2323
import org.apache.paimon.spark.catalyst.analysis.PaimonRelation.isPaimonTable
24+
import org.apache.paimon.spark.catalyst.plans.logical.PaimonDropPartitions
2425
import org.apache.paimon.spark.commands.{PaimonAnalyzeTableColumnCommand, PaimonDynamicPartitionOverwriteCommand, PaimonShowColumnsCommand, PaimonTruncateTableCommand}
2526
import org.apache.paimon.spark.util.OptionUtils
2627
import org.apache.paimon.table.FileStoreTable
@@ -33,13 +34,13 @@ import org.apache.spark.sql.catalyst.rules.Rule
3334
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
3435
import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId
3536
import org.apache.spark.sql.connector.catalog.TableCapability
36-
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
37+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation}
3738
import org.apache.spark.sql.types._
3839

3940
import scala.collection.mutable
4041

4142
class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
42-
43+
import DataSourceV2Implicits._
4344
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDown {
4445

4546
case a @ PaimonV2WriteCommand(table) if !paimonWriteResolved(a.query, table) =>
@@ -59,6 +60,11 @@ class PaimonAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
5960

6061
case s @ ShowColumns(PaimonRelation(table), _, _) if s.resolved =>
6162
PaimonShowColumnsCommand(table)
63+
64+
case d @ PaimonDropPartitions(ResolvedTable(_, _, table: SparkTable, _), parts, _, _)
65+
if d.resolved =>
66+
PaimonDropPartitions.validate(table, parts.asResolvedPartitionSpecs)
67+
d
6268
}
6369

6470
private def writeOptions(v2WriteCommand: V2WriteCommand): Map[String, String] = {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.catalyst.plans.logical
20+
21+
import org.apache.paimon.spark.SparkTable
22+
23+
import org.apache.spark.sql.{types, PaimonUtils}
24+
import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec}
25+
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, V2PartitionCommand}
26+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits.TableHelper
27+
import org.apache.spark.sql.types.StructType;
28+
29+
case class PaimonDropPartitions(
30+
table: LogicalPlan,
31+
parts: Seq[PartitionSpec],
32+
ifExists: Boolean,
33+
purge: Boolean)
34+
extends V2PartitionCommand {
35+
36+
override def allowPartialPartitionSpec: Boolean = true
37+
override protected def withNewChildInternal(newChild: LogicalPlan): PaimonDropPartitions =
38+
copy(table = newChild)
39+
}
40+
41+
object PaimonDropPartitions {
42+
def validate(table: SparkTable, partialSpecs: Seq[ResolvedPartitionSpec]): Unit = {
43+
val partitionSchema = table.asPartitionable.partitionSchema();
44+
partialSpecs.foreach {
45+
partialSpec =>
46+
if (!partitionSchema.names.toSeq.startsWith(partialSpec.names)) {
47+
val values = partialSpec.names.zipWithIndex.map {
48+
case (name, ordinal) =>
49+
partialSpec.ident.get(ordinal, partitionSchema.apply(name).dataType).toString
50+
}
51+
val spec = partialSpec.names
52+
.zip(values)
53+
.map { case (name, value) => s"$name = '$value'" }
54+
.mkString(",")
55+
throw PaimonUtils.invalidPartitionSpecError(spec, partitionSchema.fieldNames, table.name)
56+
}
57+
}
58+
}
59+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.spark.execution
20+
21+
import org.apache.paimon.spark.SparkTable
22+
23+
import org.apache.spark.internal.Logging
24+
import org.apache.spark.sql.catalyst.InternalRow
25+
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionsException, ResolvedPartitionSpec}
26+
import org.apache.spark.sql.catalyst.expressions.Attribute
27+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits.TableHelper
28+
import org.apache.spark.sql.execution.datasources.v2.LeafV2CommandExec
29+
30+
case class PaimonDropPartitionsExec(
31+
table: SparkTable,
32+
partSpecs: Seq[ResolvedPartitionSpec],
33+
ignoreIfNotExists: Boolean,
34+
purge: Boolean,
35+
refreshCache: () => Unit)
36+
extends LeafV2CommandExec
37+
with Logging {
38+
override protected def run(): Seq[InternalRow] = {
39+
val partitionSchema = table.asPartitionable.partitionSchema()
40+
val (partialPartSpecs, fullPartSpecs) =
41+
partSpecs.partition(_.ident.numFields != partitionSchema.length)
42+
43+
val (existsPartIdents, notExistsPartIdents) =
44+
fullPartSpecs.map(_.ident).partition(table.partitionExists)
45+
if (notExistsPartIdents.nonEmpty && !ignoreIfNotExists) {
46+
throw new NoSuchPartitionsException(
47+
table.name(),
48+
notExistsPartIdents,
49+
table.asPartitionable.partitionSchema())
50+
}
51+
val allExistsPartIdents = existsPartIdents ++ partialPartSpecs.flatMap(expendPartialSpec)
52+
logInfo("Try to drop partitions: " + allExistsPartIdents.mkString(","))
53+
val isTableAltered = if (allExistsPartIdents.nonEmpty) {
54+
allExistsPartIdents
55+
.map(
56+
partIdent => {
57+
if (purge) table.purgePartition(partIdent) else table.dropPartition(partIdent)
58+
})
59+
.reduce(_ || _)
60+
} else false
61+
62+
if (isTableAltered) refreshCache()
63+
Seq.empty
64+
}
65+
66+
private def expendPartialSpec(partialSpec: ResolvedPartitionSpec): Seq[InternalRow] = {
67+
assertSpec(partialSpec)
68+
table.listPartitionIdentifiers(partialSpec.names.toArray, partialSpec.ident).toSeq
69+
}
70+
71+
private def assertSpec(partialSpec: ResolvedPartitionSpec): Unit = {
72+
val partitionSchema = table.asPartitionable.partitionSchema();
73+
if (partialSpec.ident.numFields > partitionSchema.length) {
74+
throw new IllegalArgumentException(
75+
s"Partial partition spec should be part of $partitionSchema fields, but got $partialSpec")
76+
}
77+
if (!partitionSchema.names.toSeq.startsWith(partialSpec.names)) {
78+
throw new IllegalArgumentException(
79+
s"Partial partition spec should be prefix of $partitionSchema fields, but got $partialSpec")
80+
}
81+
}
82+
83+
override def output: Seq[Attribute] = Seq.empty
84+
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,22 @@
1818

1919
package org.apache.paimon.spark.execution
2020

21-
import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog, SparkUtils}
21+
import org.apache.paimon.spark.{SparkCatalog, SparkGenericCatalog, SparkTable, SparkUtils}
2222
import org.apache.paimon.spark.catalog.SupportView
2323
import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView
2424
import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand}
25+
import org.apache.paimon.spark.catalyst.plans.logical.PaimonDropPartitions
2526

2627
import org.apache.spark.sql.SparkSession
2728
import org.apache.spark.sql.catalyst.InternalRow
28-
import org.apache.spark.sql.catalyst.analysis.ResolvedNamespace
29+
import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable}
2930
import org.apache.spark.sql.catalyst.expressions.{Expression, GenericInternalRow, PredicateHelper}
3031
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, DescribeRelation, LogicalPlan, ShowCreateTable}
3132
import org.apache.spark.sql.connector.catalog.{Identifier, PaimonLookupCatalog, TableCatalog}
3233
import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
34+
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation}
3335
import org.apache.spark.sql.execution.shim.PaimonCreateTableAsSelectStrategy
36+
import org.apache.spark.sql.paimon.shims.SparkShimLoader
3437

3538
import scala.collection.JavaConverters._
3639

@@ -39,6 +42,7 @@ case class PaimonStrategy(spark: SparkSession)
3942
with PredicateHelper
4043
with PaimonLookupCatalog {
4144

45+
import DataSourceV2Implicits._
4246
protected lazy val catalogManager = spark.sessionState.catalogManager
4347

4448
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
@@ -107,6 +111,18 @@ case class PaimonStrategy(spark: SparkSession)
107111
case DescribeRelation(ResolvedPaimonView(viewCatalog, ident), _, isExtended, output) =>
108112
DescribePaimonViewExec(output, viewCatalog, ident, isExtended) :: Nil
109113

114+
case d @ PaimonDropPartitions(
115+
r @ ResolvedTable(_, _, table: SparkTable, _),
116+
parts,
117+
ifExists,
118+
purge) =>
119+
PaimonDropPartitionsExec(
120+
table,
121+
parts.asResolvedPartitionSpecs,
122+
ifExists,
123+
purge,
124+
recacheTable(r)) :: Nil
125+
110126
case _ => Nil
111127
}
112128

@@ -132,4 +148,9 @@ case class PaimonStrategy(spark: SparkSession)
132148
}
133149
}
134150
}
151+
152+
private def recacheTable(r: ResolvedTable)(): Unit = {
153+
val v2Relation = DataSourceV2Relation.create(r.table, Some(r.catalog), Some(r.identifier))
154+
SparkShimLoader.shim.classicApi.recacheByPlan(spark, v2Relation)
155+
}
135156
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/PaimonUtils.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
2626
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2727
import org.apache.spark.sql.connector.expressions.FieldReference
2828
import org.apache.spark.sql.connector.expressions.filter.Predicate
29+
import org.apache.spark.sql.errors.QueryCompilationErrors
2930
import org.apache.spark.sql.execution.datasources.DataSourceStrategy
3031
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy.translateFilterV2WithMapping
3132
import org.apache.spark.sql.internal.connector.PredicateUtils
@@ -134,4 +135,11 @@ object PaimonUtils {
134135
def classForName(clazz: String): Class[_] = {
135136
SparkUtils.classForName(clazz)
136137
}
138+
139+
def invalidPartitionSpecError(
140+
specKeys: String,
141+
partitionColumnNames: Seq[String],
142+
tableName: String): Throwable = {
143+
QueryCompilationErrors.invalidPartitionSpecError(specKeys, partitionColumnNames, tableName)
144+
}
137145
}

paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,8 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf
7979
private def parserRules(sparkSession: SparkSession): Seq[Rule[LogicalPlan]] = {
8080
Seq(
8181
RewritePaimonViewCommands(sparkSession),
82-
RewritePaimonFunctionCommands(sparkSession)
82+
RewritePaimonFunctionCommands(sparkSession),
83+
RewriteSparkDDLCommands(sparkSession)
8384
)
8485
}
8586

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.spark.sql.catalyst.parser.extensions
20+
21+
import org.apache.paimon.spark.catalog.SupportView
22+
import org.apache.paimon.spark.catalyst.plans.logical.{PaimonDropPartitions, ResolvedIdentifier}
23+
24+
import org.apache.spark.sql.SparkSession
25+
import org.apache.spark.sql.catalyst.analysis.UnresolvedIdentifier
26+
import org.apache.spark.sql.catalyst.plans.logical.{DropPartitions, LogicalPlan}
27+
import org.apache.spark.sql.catalyst.rules.Rule
28+
import org.apache.spark.sql.connector.catalog.{CatalogManager, LookupCatalog}
29+
30+
case class RewriteSparkDDLCommands(spark: SparkSession)
31+
extends Rule[LogicalPlan]
32+
with LookupCatalog {
33+
34+
protected lazy val catalogManager: CatalogManager = spark.sessionState.catalogManager
35+
36+
override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
37+
38+
// A new member was added to CreatePaimonView since spark4.0,
39+
// unapply pattern matching is not used here to ensure compatibility across multiple spark versions.
40+
case DropPartitions(UnresolvedPaimonRelation(aliasedTable), parts, ifExists, purge) =>
41+
PaimonDropPartitions(aliasedTable, parts, ifExists, purge)
42+
43+
}
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.spark.sql.catalyst.parser.extensions
20+
21+
import org.apache.spark.sql.SparkSession
22+
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedTable}
23+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
24+
import org.apache.spark.sql.connector.catalog.{PaimonLookupCatalog, TableCatalog}
25+
26+
import scala.util.Try
27+
28+
object UnresolvedPaimonRelation extends PaimonLookupCatalog {
29+
protected lazy val catalogManager = SparkSession.active.sessionState.catalogManager
30+
31+
def unapply(plan: LogicalPlan): Option[LogicalPlan] = {
32+
EliminateSubqueryAliases(plan) match {
33+
case UnresolvedTable(multipartIdentifier, _, _) if isPaimonTable(multipartIdentifier) =>
34+
Some(plan)
35+
case _ => None
36+
}
37+
}
38+
39+
private def isPaimonTable(multipartIdentifier: Seq[String]): Boolean = {
40+
multipartIdentifier match {
41+
case CatalogAndIdentifier(catalog: TableCatalog, ident) =>
42+
Try(catalog.loadTable(ident))
43+
.map(t => t.isInstanceOf[org.apache.paimon.spark.SparkTable])
44+
.getOrElse(false)
45+
case _ => false
46+
}
47+
}
48+
49+
}

0 commit comments

Comments
 (0)