diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index 804918a63bd..eedca7ab026 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -51,6 +51,8 @@ jobs: java: - 8 - 17 + python: + - '3.9' spark: - '3.3' - '3.4' @@ -60,25 +62,41 @@ jobs: comment: ["normal"] include: - java: 21 + python: '3.11' spark: '4.0' spark-archive: '-Pscala-2.13' exclude-tags: '' comment: 'normal' + - java: 21 + python: '3.11' + spark: '4.1' + spark-archive: '-Pscala-2.13' + exclude-tags: '' + comment: 'normal' - java: 8 + python: '3.9' spark: '3.5' spark-archive: '-Dspark.archive.mirror=https://www.apache.org/dyn/closer.lua/spark/spark-3.3.3 -Dspark.archive.name=spark-3.3.3-bin-hadoop3.tgz -Pzookeeper-3.6' exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest' comment: 'verify-on-spark-3.3-binary' - java: 8 + python: '3.9' spark: '3.5' spark-archive: '-Dspark.archive.mirror=https://www.apache.org/dyn/closer.lua/spark/spark-3.4.3 -Dspark.archive.name=spark-3.4.3-bin-hadoop3.tgz -Pzookeeper-3.6' exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.SparkLocalClusterTest' comment: 'verify-on-spark-3.4-binary' - java: 17 + python: '3.11' spark: '3.5' spark-archive: '-Pscala-2.13 -Dspark.archive.mirror=https://www.apache.org/dyn/closer.lua/spark/spark-4.0.1 -Dspark.archive.name=spark-4.0.1-bin-hadoop3.tgz' exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.SparkLocalClusterTest' comment: 'verify-on-spark-4.0-binary' + - java: 17 + python: '3.11' + spark: '3.5' + spark-archive: '-Pscala-2.13 -Dspark.archive.mirror=https://www.apache.org/dyn/closer.lua/spark/spark-4.1.0-preview4 -Dspark.archive.name=spark-4.1.0-preview4-bin-hadoop3.tgz' + exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest' + comment: 'verify-on-spark-4.1-binary' env: SPARK_LOCAL_IP: localhost steps: @@ -100,7 +118,7 @@ jobs: - name: Setup Python uses: actions/setup-python@v5 with: - python-version: '3.9' + python-version: ${{ matrix.python }} - name: Build and test Kyuubi and Spark with maven w/o linters run: | if [[ "${{ matrix.java }}" == "8" && "${{ matrix.spark }}" == "3.5" && "${{ matrix.spark-archive }}" == "" ]]; then diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala index 3c8d670c053..b48863e3c8f 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala @@ -17,11 +17,14 @@ package org.apache.kyuubi.engine.spark.operation +import java.lang.{Boolean => JBoolean} + import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.apache.spark.kyuubi.SparkUtilsHelper import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.execution.CommandExecutionMode import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -29,11 +32,13 @@ import org.apache.spark.sql.types.StructType import org.apache.kyuubi.KyuubiSQLException import org.apache.kyuubi.config.KyuubiConf.{LINEAGE_PARSER_PLUGIN_PROVIDER, OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE} import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.getSessionConf +import org.apache.kyuubi.engine.spark.operation.PlanOnlyStatement._ import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, LineageMode, OperationHandle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle} import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError} import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError} import org.apache.kyuubi.operation.log.OperationLog import org.apache.kyuubi.session.Session +import org.apache.kyuubi.util.reflect.DynMethods /** * Perform the statement parsing, analyzing or optimizing only without executing it @@ -110,11 +115,8 @@ class PlanOnlyStatement( spark.sessionState.analyzer.checkAnalysis(analyzed) val optimized = spark.sessionState.optimizer.execute(analyzed) optimized.stats - iter = new IterableFetchIterator(Seq(Row(optimized.treeString( - verbose = true, - addSuffix = true, - SQLConf.get.maxToStringFields, - printOperatorId = false)))) + iter = new IterableFetchIterator( + Seq(Row(treeString(optimized, verbose = true, addSuffix = true)))) case PhysicalMode => val physical = spark.sessionState.executePlan(plan, CommandExecutionMode.SKIP).sparkPlan iter = new IterableFetchIterator(Seq(Row(physical.toString()))) @@ -184,3 +186,33 @@ class PlanOnlyStatement( } } + +object PlanOnlyStatement { + + private val uboundTreeStringMehod = DynMethods.builder("treeString") + .impl( // SPARK-52065 (4.1.0) + classOf[TreeNode[_]], + classOf[Boolean], + classOf[Boolean], + classOf[Int], + classOf[Boolean], + classOf[Boolean]) + .impl( + classOf[TreeNode[_]], + classOf[Boolean], + classOf[Boolean], + classOf[Int], + classOf[Boolean]) + .build() + + def treeString( + tree: TreeNode[_], + verbose: JBoolean, + addSuffix: JBoolean = false, + maxFields: Integer = SQLConf.get.maxToStringFields, + printOperatorId: JBoolean = false, + printOutputColumns: JBoolean = false): String = { + uboundTreeStringMehod.bind(tree) + .invoke(verbose, addSuffix, maxFields, printOperatorId, printOutputColumns) + } +} diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala index 106be3fc789..0cf8327867e 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/kyuubi/SparkUtilsHelper.scala @@ -23,6 +23,7 @@ import org.apache.spark.SparkConf import org.apache.spark.util.Utils import org.apache.kyuubi.Logging +import org.apache.kyuubi.util.reflect.{DynClasses, DynMethods} /** * A place to invoke non-public APIs of [[Utils]], anything to be added here need to @@ -37,11 +38,21 @@ object SparkUtilsHelper extends Logging { Utils.redact(regex, text) } + private val readOnlySparkConfCls = DynClasses.builder() + .impl("org.apache.spark.ReadOnlySparkConf") + .orNull() + .build() + + private val getLocalDirMethod = DynMethods.builder("getLocalDir") + .impl(Utils.getClass, readOnlySparkConfCls) // SPARK-53459 (4.1.0) + .impl(Utils.getClass, classOf[SparkConf]) + .build(Utils) + /** * Get the path of a temporary directory. */ def getLocalDir(conf: SparkConf): String = { - Utils.getLocalDir(conf) + getLocalDirMethod.invoke(conf) } def classesArePresent(className: String): Boolean = { diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala index 3975b3780dc..84e297bf946 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilderSuite.scala @@ -64,8 +64,8 @@ class SparkProcessBuilderSuite extends KerberizedTestHelper with MockitoSugar { processBuilder.start eventually(timeout(90.seconds), interval(500.milliseconds)) { val error = processBuilder.getError - assert(error.getMessage.contains( - "java.lang.IllegalArgumentException: spark.ui.port should be int, but was abc")) + assert(error.getMessage.contains("spark.ui.port should be int") || + error.getMessage.contains("INVALID_CONF_VALUE.TYPE_MISMATCH")) assert(error.isInstanceOf[KyuubiSQLException]) } diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala index 7167ce230f9..4574d8b8c20 100644 --- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala +++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/rest/client/BatchCliSuite.scala @@ -259,7 +259,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat "--forward") result = testPrematureExitForControlCli(logArgs, "") assert(result.contains(s"Submitted application: $sparkBatchTestAppName")) - assert(result.contains("Shutdown hook called")) + assert(result.contains("Successfully stopped SparkContext")) } test("submit batch test") { @@ -272,7 +272,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat ldapUserPasswd) val result = testPrematureExitForControlCli(submitArgs, "") assert(result.contains(s"Submitted application: $sparkBatchTestAppName")) - assert(result.contains("Shutdown hook called")) + assert(result.contains("Successfully stopped SparkContext")) } test("submit batch test with waitCompletion=false") { @@ -289,7 +289,7 @@ class BatchCliSuite extends RestClientTestHelper with TestPrematureExit with Bat s"${CtlConf.CTL_BATCH_LOG_QUERY_INTERVAL.key}=100") val result = testPrematureExitForControlCli(submitArgs, "") assert(result.contains("bin/spark-submit")) - assert(!result.contains("Shutdown hook called")) + assert(!result.contains("Successfully stopped SparkContext")) } test("list batch test") { diff --git a/pom.xml b/pom.xml index 3f6acc0c351..50198adf81c 100644 --- a/pom.xml +++ b/pom.xml @@ -1980,7 +1980,7 @@ scala-2.13 2.13 - 2.13.16 + 2.13.17 -scala${scala.binary.version} @@ -2058,6 +2058,29 @@ + + spark-4.1 + + extensions/spark/kyuubi-spark-connector-hive + + + 17 + 17 + 4.1.0-preview4 + 4.0 + 4.13.1 + 4.0.0 + delta-spark_${scala.binary.version} + + hudi-spark3.5-bundle_${scala.binary.version} + + paimon-common + org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.PaimonTest,org.apache.kyuubi.tags.HudiTest + spark-${spark.version}-bin-hadoop3.tgz + + + spark-master