diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 804918a63bd..eedca7ab026 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -51,6 +51,8 @@ jobs:
java:
- 8
- 17
+ python:
+ - '3.9'
spark:
- '3.3'
- '3.4'
@@ -60,25 +62,41 @@ jobs:
comment: ["normal"]
include:
- java: 21
+ python: '3.11'
spark: '4.0'
spark-archive: '-Pscala-2.13'
exclude-tags: ''
comment: 'normal'
+ - java: 21
+ python: '3.11'
+ spark: '4.1'
+ spark-archive: '-Pscala-2.13'
+ exclude-tags: ''
+ comment: 'normal'
- java: 8
+ python: '3.9'
spark: '3.5'
spark-archive: '-Dspark.archive.mirror=https://www.apache.org/dyn/closer.lua/spark/spark-3.3.3 -Dspark.archive.name=spark-3.3.3-bin-hadoop3.tgz -Pzookeeper-3.6'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-3.3-binary'
- java: 8
+ python: '3.9'
spark: '3.5'
spark-archive: '-Dspark.archive.mirror=https://www.apache.org/dyn/closer.lua/spark/spark-3.4.3 -Dspark.archive.name=spark-3.4.3-bin-hadoop3.tgz -Pzookeeper-3.6'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-3.4-binary'
- java: 17
+ python: '3.11'
spark: '3.5'
spark-archive: '-Pscala-2.13 -Dspark.archive.mirror=https://www.apache.org/dyn/closer.lua/spark/spark-4.0.1 -Dspark.archive.name=spark-4.0.1-bin-hadoop3.tgz'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-4.0-binary'
+ - java: 17
+ python: '3.11'
+ spark: '3.5'
+ spark-archive: '-Pscala-2.13 -Dspark.archive.mirror=https://www.apache.org/dyn/closer.lua/spark/spark-4.1.0-preview4 -Dspark.archive.name=spark-4.1.0-preview4-bin-hadoop3.tgz'
+ exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
+ comment: 'verify-on-spark-4.1-binary'
env:
SPARK_LOCAL_IP: localhost
steps:
@@ -100,7 +118,7 @@ jobs:
- name: Setup Python
uses: actions/setup-python@v5
with:
- python-version: '3.9'
+ python-version: ${{ matrix.python }}
- name: Build and test Kyuubi and Spark with maven w/o linters
run: |
if [[ "${{ matrix.java }}" == "8" && "${{ matrix.spark }}" == "3.5" && "${{ matrix.spark-archive }}" == "" ]]; then
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index 3c8d670c053..b48863e3c8f 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -17,11 +17,14 @@
package org.apache.kyuubi.engine.spark.operation
+import java.lang.{Boolean => JBoolean}
+
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.kyuubi.SparkUtilsHelper
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.execution.CommandExecutionMode
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
@@ -29,11 +32,13 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf.{LINEAGE_PARSER_PLUGIN_PROVIDER, OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.getSessionConf
+import org.apache.kyuubi.engine.spark.operation.PlanOnlyStatement._
import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError}
import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
+import org.apache.kyuubi.util.reflect.DynMethods
/**
* Perform the statement parsing, analyzing or optimizing only without executing it
@@ -110,11 +115,8 @@ class PlanOnlyStatement(
spark.sessionState.analyzer.checkAnalysis(analyzed)
val optimized = spark.sessionState.optimizer.execute(analyzed)
optimized.stats
- iter = new IterableFetchIterator(Seq(Row(optimized.treeString(
- verbose = true,
- addSuffix = true,
- SQLConf.get.maxToStringFields,
- printOperatorId = false))))
+ iter = new IterableFetchIterator(
+ Seq(Row(treeString(optimized, verbose = true, addSuffix = true))))
case PhysicalMode =>
val physical = spark.sessionState.executePlan(plan, CommandExecutionMode.SKIP).sparkPlan
iter = new IterableFetchIterator(Seq(Row(physical.toString())))
@@ -184,3 +186,33 @@ class PlanOnlyStatement(
}
}
+
+object PlanOnlyStatement {
+
+ private val uboundTreeStringMehod = DynMethods.builder("treeString")
+ .impl( // SPARK-52065 (4.1.0)
+ classOf[TreeNode[_]],
+ classOf[Boolean],
+ classOf[Boolean],
+ classOf[Int],
+ classOf[Boolean],
+ classOf[Boolean])
+ .impl(
+ classOf[TreeNode[_]],
+ classOf[Boolean],
+ classOf[Boolean],
+ classOf[Int],
+ classOf[Boolean])
+ .build()
+
+ def treeString(
+ tree: TreeNode[_],
+ verbose: JBoolean,
+ addSuffix: JBoolean = false,
+ maxFields: Integer = SQLConf.get.maxToStringFields,
+ printOperatorId: JBoolean = false,
+ printOutputColumns: JBoolean = false): String = {
+ uboundTreeStringMehod.bind(tree)
+ .invoke(verbose, addSuffix, maxFields, printOperatorId, printOutputColumns)
+ }
+}
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
index 106be3fc789..0cf8327867e 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala
@@ -23,6 +23,7 @@ import org.apache.spark.SparkConf
import org.apache.spark.util.Utils
import org.apache.kyuubi.Logging
+import org.apache.kyuubi.util.reflect.{DynClasses, DynMethods}
/**
* A place to invoke non-public APIs of [[Utils]], anything to be added here need to
@@ -37,11 +38,21 @@ object SparkUtilsHelper extends Logging {
Utils.redact(regex, text)
}
+ private val readOnlySparkConfCls = DynClasses.builder()
+ .impl("org.apache.spark.ReadOnlySparkConf")
+ .orNull()
+ .build()
+
+ private val getLocalDirMethod = DynMethods.builder("getLocalDir")
+ .impl(Utils.getClass, readOnlySparkConfCls) // SPARK-53459 (4.1.0)
+ .impl(Utils.getClass, classOf[SparkConf])
+ .build(Utils)
+
/**
* Get the path of a temporary directory.
*/
def getLocalDir(conf: SparkConf): String = {
- Utils.getLocalDir(conf)
+ getLocalDirMethod.invoke(conf)
}
def classesArePresent(className: String): Boolean = {
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
index 3975b3780dc..84e297bf946 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala
@@ -64,8 +64,8 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar {
processBuilder.start
eventually(timeout(90.seconds), interval(500.milliseconds)) {
val error = processBuilder.getError
- assert(error.getMessage.contains(
- "java.lang.IllegalArgumentException: spark.ui.port should be int, but was abc"))
+ assert(error.getMessage.contains("spark.ui.port should be int") ||
+ error.getMessage.contains("INVALID_CONF_VALUE.TYPE_MISMATCH"))
assert(error.isInstanceOf[KyuubiSQLException])
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
index 7167ce230f9..4574d8b8c20 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala
@@ -259,7 +259,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
"--forward")
result = testPrematureExitForControlCli(logArgs, "")
assert(result.contains(s"Submitted application: $sparkBatchTestAppName"))
- assert(result.contains("Shutdown hook called"))
+ assert(result.contains("Successfully stopped SparkContext"))
}
test("submit batch test") {
@@ -272,7 +272,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
ldapUserPasswd)
val result = testPrematureExitForControlCli(submitArgs, "")
assert(result.contains(s"Submitted application: $sparkBatchTestAppName"))
- assert(result.contains("Shutdown hook called"))
+ assert(result.contains("Successfully stopped SparkContext"))
}
test("submit batch test with waitCompletion=false") {
@@ -289,7 +289,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat
s"${CtlConf.CTL_BATCH_LOG_QUERY_INTERVAL.key}=100")
val result = testPrematureExitForControlCli(submitArgs, "")
assert(result.contains("bin/spark-submit"))
- assert(!result.contains("Shutdown hook called"))
+ assert(!result.contains("Successfully stopped SparkContext"))
}
test("list batch test") {
diff --git a/pom.xml b/pom.xml
index 3f6acc0c351..50198adf81c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1980,7 +1980,7 @@
scala-2.13
2.13
- 2.13.16
+ 2.13.17
-scala${scala.binary.version}
@@ -2058,6 +2058,29 @@
+
+ spark-4.1
+
+ extensions/spark/kyuubi-spark-connector-hive
+
+
+ 17
+ 17
+ 4.1.0-preview4
+ 4.0
+ 4.13.1
+ 4.0.0
+ delta-spark_${scala.binary.version}
+
+ hudi-spark3.5-bundle_${scala.binary.version}
+
+ paimon-common
+ org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest
+ spark-${spark.version}-bin-hadoop3.tgz
+
+
+
spark-master