Skip to content

Commit 2431435

Browse files
committed
[KYUUBI #7198] Remove support for Spark 3.2
### Why are the changes needed? Discussion in mailing list: https://lists.apache.org/thread/qrbknzjj3jcwjc9nd64qmtohrwfk1kjp Spark 3.2 is EOL on 2023/04, Kyuubi deprecated support for Spark 3.2 in 1.10 #6545 This PR proposes to fully drop support for Spark 3.2. ### How was this patch tested? Pass GHA. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #7198 from pan3793/remove-spark-3.2. Closes #7198 7d7e21c [Cheng Pan] address comments 219fb79 [Cheng Pan] fix b4c8ae7 [Cheng Pan] fix 6ffd9c2 [Cheng Pan] address comments 1d0dc2d [Cheng Pan] Update extensions/spark/kyuubi-spark-lineage/src/test/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParserHelperSuite.scala 343b658 [Cheng Pan] Remove support of Spark 3.2 Authored-by: Cheng Pan <[email protected]> Signed-off-by: Cheng Pan <[email protected]>
1 parent 8b56295 commit 2431435

File tree

34 files changed

+123
-384
lines changed

34 files changed

+123
-384
lines changed

.github/workflows/master.yml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,6 @@ jobs:
6060
spark-archive: '-Pscala-2.13'
6161
exclude-tags: ''
6262
comment: 'normal'
63-
- java: 8
64-
spark: '3.5'
65-
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 -Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6'
66-
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
67-
comment: 'verify-on-spark-3.2-binary'
6863
- java: 8
6964
spark: '3.5'
7065
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.3.3 -Dspark.archive.name=spark-3.3.3-bin-hadoop3.tgz -Pzookeeper-3.6'

docs/deployment/migration-guide.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
## Upgrading from Kyuubi 1.10 to 1.11
2121

22+
* Since Kyuubi 1.11, the support of Spark engine for Spark 3.2 is removed.
2223
* Since Kyuubi 1.11, the support of Flink engine for Flink 1.17 and 1.18 are deprecated, and will be removed in the future.
2324
* Since Kyuubi 1.11, the configuration `spark.sql.watchdog.forcedMaxOutputRows` provided by Kyuubi Spark extension is removed, consider using `kyuubi.operation.result.max.rows` instead. Note, the latter works without requirement of installing Kyuubi Spark extension.
2425
* Since Kyuubi 1.11, if the engine is running in cluster mode, Kyuubi will respect the `kyuubi.session.engine.startup.waitCompletion` config to determine whether to wait for the engine completion or not. If the engine is running in client mode, Kyuubi will always wait for the engine completion. And for Spark engine, Kyuubi will append the `spark.yarn.submit.waitAppCompletion` and `spark.kubernetes.submission.waitAppCompletion` configs to the engine conf based on the value of `kyuubi.session.engine.startup.waitCompletion`.

docs/quick_start/quick_start.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pre-installed and the ``JAVA_HOME`` is correctly set to each component.
4343
**Kyuubi** Gateway \ |release| \ - Kyuubi Server
4444
Engine lib - Kyuubi Engine
4545
Beeline - Kyuubi Beeline
46-
**Spark** Engine 3.2 to 3.5, 4.0 A Spark distribution
46+
**Spark** Engine 3.3 to 3.5, 4.0 A Spark distribution
4747
**Flink** Engine 1.17 to 1.20 A Flink distribution
4848
**Trino** Engine N/A A Trino cluster allows to access via trino-client v411
4949
**Doris** Engine N/A A Doris cluster

extensions/spark/kyuubi-spark-authz/README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ build/mvn clean package -DskipTests -pl :kyuubi-spark-authz_2.12 -am -Dspark.ver
3737
- [x] 3.5.x (default)
3838
- [x] 3.4.x
3939
- [x] 3.3.x
40-
- [x] 3.2.x
41-
- [x] 3.1.x
40+
- [ ] 3.2.x
41+
- [ ] 3.1.x
4242
- [ ] 3.0.x
4343
- [ ] 2.4.x and earlier
4444

extensions/spark/kyuubi-spark-authz/src/main/resources/table_command_spec.json

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -840,17 +840,6 @@
840840
"isInput" : true,
841841
"comment" : ""
842842
} ]
843-
}, {
844-
"classname" : "org.apache.spark.sql.execution.command.AddFileCommand",
845-
"tableDescs" : [ ],
846-
"opType" : "ADD",
847-
"queryDescs" : [ ],
848-
"uriDescs" : [ {
849-
"fieldName" : "path",
850-
"fieldExtractor" : "StringURIExtractor",
851-
"isInput" : true,
852-
"comment" : ""
853-
} ]
854843
}, {
855844
"classname" : "org.apache.spark.sql.execution.command.AddFilesCommand",
856845
"tableDescs" : [ ],
@@ -969,22 +958,6 @@
969958
"opType" : "ALTERTABLE_DROPPARTS",
970959
"queryDescs" : [ ],
971960
"uriDescs" : [ ]
972-
}, {
973-
"classname" : "org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand",
974-
"tableDescs" : [ {
975-
"fieldName" : "tableName",
976-
"fieldExtractor" : "TableIdentifierTableExtractor",
977-
"columnDesc" : null,
978-
"actionTypeDesc" : null,
979-
"tableTypeDesc" : null,
980-
"catalogDesc" : null,
981-
"isInput" : false,
982-
"setCurrentDatabaseIfMissing" : false,
983-
"comment" : ""
984-
} ],
985-
"opType" : "MSCK",
986-
"queryDescs" : [ ],
987-
"uriDescs" : [ ]
988961
}, {
989962
"classname" : "org.apache.spark.sql.execution.command.AlterTableRenameCommand",
990963
"tableDescs" : [ {

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -224,24 +224,7 @@ object PrivilegesBuilder {
224224
}
225225
}
226226
spec.queries(plan).foreach { p =>
227-
if (p.resolved) {
228-
buildQuery(Project(p.output, p), inputObjs, spark = spark)
229-
} else {
230-
try {
231-
// For spark 3.1, Some command such as CreateTableASSelect, its query was unresolved,
232-
// Before this pr, we just ignore it, now we support this.
233-
val analyzed = spark.sessionState.analyzer.execute(p)
234-
buildQuery(Project(analyzed.output, analyzed), inputObjs, spark = spark)
235-
} catch {
236-
case e: Exception =>
237-
LOG.debug(
238-
s"""
239-
|Failed to analyze unresolved
240-
|$p
241-
|due to ${e.getMessage}""".stripMargin,
242-
e)
243-
}
244-
}
227+
buildQuery(Project(p.output, p), inputObjs, spark = spark)
245228
}
246229
spec.operationType
247230

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/Authorization.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.kyuubi.plugin.spark.authz.rule
1919

2020
import org.apache.spark.sql.SparkSession
21-
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, View}
21+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2222
import org.apache.spark.sql.catalyst.rules.Rule
2323
import org.apache.spark.sql.catalyst.trees.TreeNodeTag
2424
import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
@@ -52,12 +52,7 @@ object Authorization {
5252

5353
def markAuthChecked(plan: LogicalPlan): LogicalPlan = {
5454
plan.setTagValue(KYUUBI_AUTHZ_TAG, ())
55-
plan transformDown {
56-
// TODO: Add this line Support for spark3.1, we can remove this
57-
// after spark 3.2 since https://issues.apache.org/jira/browse/SPARK-34269
58-
case view: View =>
59-
markAllNodesAuthChecked(view.child)
60-
}
55+
plan
6156
}
6257

6358
protected def isAuthChecked(plan: LogicalPlan): Boolean = {

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/rule/rowfilter/FilterDataSourceV2Strategy.scala

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,11 @@
1717
package org.apache.kyuubi.plugin.spark.authz.rule.rowfilter
1818

1919
import org.apache.spark.sql.{SparkSession, Strategy}
20-
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
20+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2121
import org.apache.spark.sql.execution.SparkPlan
2222

2323
case class FilterDataSourceV2Strategy(spark: SparkSession) extends Strategy {
2424
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
25-
// For Spark 3.1 and below, `ColumnPruning` rule will set `ObjectFilterPlaceHolder#child` to
26-
// `Project`
27-
case ObjectFilterPlaceHolder(Project(_, child)) if child.nodeName == "ShowNamespaces" =>
28-
spark.sessionState.planner.plan(child)
29-
.map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq
30-
31-
// For Spark 3.2 and above
3225
case ObjectFilterPlaceHolder(child) if child.nodeName == "ShowNamespaces" =>
3326
spark.sessionState.planner.plan(child)
3427
.map(FilteredShowNamespaceExec(_, spark.sparkContext)).toSeq

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/util/AuthZUtils.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,6 @@ private[authz] object AuthZUtils {
8484
}
8585

8686
lazy val SPARK_RUNTIME_VERSION: SemanticVersion = SemanticVersion(SPARK_VERSION)
87-
lazy val isSparkV32OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.2"
88-
lazy val isSparkV33OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.3"
8987
lazy val isSparkV34OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.4"
9088
lazy val isSparkV35OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "3.5"
9189
lazy val isSparkV40OrGreater: Boolean = SPARK_RUNTIME_VERSION >= "4.0"

extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilderSuite.scala

Lines changed: 5 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -244,41 +244,6 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
244244
assert(accessType === AccessType.ALTER)
245245
}
246246

247-
test("AlterTableRecoverPartitionsCommand") {
248-
// AlterTableRecoverPartitionsCommand exists in the version below 3.2
249-
assume(!isSparkV32OrGreater)
250-
val tableName = reusedDb + "." + "TableToMsck"
251-
withTable(tableName) { _ =>
252-
sql(
253-
s"""
254-
|CREATE TABLE $tableName
255-
|(key int, value string, pid string)
256-
|USING parquet
257-
|PARTITIONED BY (pid)""".stripMargin)
258-
val sqlStr =
259-
s"""
260-
|MSCK REPAIR TABLE $tableName
261-
|""".stripMargin
262-
val plan = sql(sqlStr).queryExecution.analyzed
263-
val (inputs, outputs, operationType) = PrivilegesBuilder.build(plan, spark)
264-
assert(operationType === MSCK)
265-
assert(inputs.isEmpty)
266-
267-
assert(outputs.size === 1)
268-
outputs.foreach { po =>
269-
assert(po.actionType === PrivilegeObjectActionType.OTHER)
270-
assert(po.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
271-
assert(po.catalog.isEmpty)
272-
assertEqualsIgnoreCase(reusedDb)(po.dbname)
273-
assertEqualsIgnoreCase(tableName.split("\\.").last)(po.objectName)
274-
assert(po.columns.isEmpty)
275-
checkTableOwner(po)
276-
val accessType = ranger.AccessType(po, operationType, isInput = false)
277-
assert(accessType === AccessType.ALTER)
278-
}
279-
}
280-
}
281-
282247
// ALTER TABLE default.StudentInfo PARTITION (age='10') RENAME TO PARTITION (age='15');
283248
test("AlterTableRenamePartitionCommand") {
284249
sql(s"ALTER TABLE $reusedPartTable ADD IF NOT EXISTS PARTITION (pid=1)")
@@ -367,11 +332,8 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
367332
assert(po0.privilegeObjectType === PrivilegeObjectType.TABLE_OR_VIEW)
368333
assertEqualsIgnoreCase(reusedDb)(po0.dbname)
369334
assertEqualsIgnoreCase(reusedPartTableShort)(po0.objectName)
370-
if (isSparkV32OrGreater) {
371-
// Query in AlterViewAsCommand can not be resolved before SPARK-34698
372-
assert(po0.columns === Seq("key", "pid", "value"))
373-
checkTableOwner(po0)
374-
}
335+
assert(po0.columns === Seq("key", "pid", "value"))
336+
checkTableOwner(po0)
375337
val accessType0 = ranger.AccessType(po0, operationType, isInput = true)
376338
assert(accessType0 === AccessType.SELECT)
377339

@@ -482,7 +444,6 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
482444
}
483445

484446
test("AnalyzeTablesCommand") {
485-
assume(isSparkV32OrGreater)
486447
val plan = sql(s"ANALYZE TABLES IN $reusedDb COMPUTE STATISTICS")
487448
.queryExecution.analyzed
488449
val (in, out, operationType) = PrivilegesBuilder.build(plan, spark)
@@ -626,7 +587,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
626587
assert(po.actionType === PrivilegeObjectActionType.OTHER)
627588
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
628589
assert(po.catalog.isEmpty)
629-
val db = if (isSparkV33OrGreater) defaultDb else null
590+
val db = defaultDb
630591
assertEqualsIgnoreCase(db)(po.dbname)
631592
assertEqualsIgnoreCase("CreateFunctionCommand")(po.objectName)
632593
assert(po.columns.isEmpty)
@@ -658,7 +619,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
658619
assert(po.actionType === PrivilegeObjectActionType.OTHER)
659620
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
660621
assert(po.catalog.isEmpty)
661-
val db = if (isSparkV33OrGreater) defaultDb else null
622+
val db = defaultDb
662623
assertEqualsIgnoreCase(db)(po.dbname)
663624
assertEqualsIgnoreCase("DropFunctionCommand")(po.objectName)
664625
assert(po.columns.isEmpty)
@@ -678,7 +639,7 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
678639
assert(po.actionType === PrivilegeObjectActionType.OTHER)
679640
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
680641
assert(po.catalog.isEmpty)
681-
val db = if (isSparkV33OrGreater) defaultDb else null
642+
val db = defaultDb
682643
assertEqualsIgnoreCase(db)(po.dbname)
683644
assertEqualsIgnoreCase("RefreshFunctionCommand")(po.objectName)
684645
assert(po.columns.isEmpty)
@@ -927,8 +888,6 @@ abstract class PrivilegesBuilderSuite extends AnyFunSuite
927888
}
928889

929890
test("RepairTableCommand") {
930-
// only spark 3.2 or greater has RepairTableCommand
931-
assume(isSparkV32OrGreater)
932891
val tableName = reusedDb + "." + "TableToRepair"
933892
withTable(tableName) { _ =>
934893
sql(

0 commit comments

Comments
 (0)