Skip to content

Commit b209e5a

Browse files
wForgetbowenliang123
authored andcommitted
[KYUUBI #6688] [SPARK] Avoid trigger execution when getting result schema
# 🔍 Description ## Issue References 🔗 `DataFrame.isEmpty` may trigger execution again, we should avoid it. ## Describe Your Solution 🔧 ## Types of changes 🔖 - [X] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ #### Behavior With This Pull Request 🎉 #### Related Unit Tests --- # Checklist 📝 - [X] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6688 from wForget/planonly_schema. Closes #6688 265f0ec [wforget] fix style d71cc4a [wforget] refactor resultSchema for spark operation 0c36b3d [wforget] Avoid trigger execution when getting result schema Authored-by: wforget <[email protected]> Signed-off-by: Bowen Liang <[email protected]> (cherry picked from commit da2401c) Signed-off-by: Bowen Liang <[email protected]>
1 parent 92675b1 commit b209e5a

File tree

7 files changed

+16
-28
lines changed

7 files changed

+16
-28
lines changed

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,14 @@ class ExecutePython(
5858
override protected def supportProgress: Boolean = true
5959

6060
override protected def resultSchema: StructType = {
61-
if (result == null || result.schema.isEmpty) {
61+
if (result == null) {
6262
new StructType().add("output", "string")
6363
.add("status", "string")
6464
.add("ename", "string")
6565
.add("evalue", "string")
6666
.add("traceback", "array<string>")
6767
} else {
68-
result.schema
68+
super.resultSchema
6969
}
7070
}
7171

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,10 @@ class ExecuteScala(
6161
override protected def supportProgress: Boolean = true
6262

6363
override protected def resultSchema: StructType = {
64-
if (result == null || result.schema.isEmpty) {
64+
if (result == null) {
6565
new StructType().add("output", "string")
6666
} else {
67-
result.schema
67+
super.resultSchema
6868
}
6969
}
7070

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
2525
import org.apache.hadoop.fs.Path
2626
import org.apache.spark.sql.DataFrame
2727
import org.apache.spark.sql.kyuubi.SparkDatasetHelper._
28-
import org.apache.spark.sql.types._
2928

3029
import org.apache.kyuubi.{KyuubiSQLException, Logging}
3130
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT_CANCEL_JOB_GROUP, OPERATION_RESULT_MAX_ROWS, OPERATION_RESULT_SAVE_TO_FILE, OPERATION_RESULT_SAVE_TO_FILE_MIN_ROWS, OPERATION_RESULT_SAVE_TO_FILE_MINSIZE}
@@ -50,13 +49,6 @@ class ExecuteStatement(
5049

5150
private var fetchOrcStatement: Option[FetchOrcStatement] = None
5251
private var saveFilePath: Option[Path] = None
53-
override protected def resultSchema: StructType = {
54-
if (result == null || result.schema.isEmpty) {
55-
new StructType().add("Result", "string")
56-
} else {
57-
result.schema
58-
}
59-
}
6052

6153
override protected def beforeRun(): Unit = {
6254
OperationLog.setCurrentOperationLog(operationLog)

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,9 @@ class PlanOnlyStatement(
5858
override protected def resultSchema: StructType = {
5959
if (result == null) {
6060
new StructType().add("plan", "string")
61-
} else if (result.isEmpty) {
62-
new StructType().add("result", "string")
63-
} else result.schema
61+
} else {
62+
super.resultSchema
63+
}
6464
}
6565

6666
override protected def beforeRun(): Unit = {

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentCatalog.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.kyuubi.engine.spark.operation
1919

20-
import org.apache.spark.sql.types.StructType
21-
2220
import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils
2321
import org.apache.kyuubi.operation.log.OperationLog
2422
import org.apache.kyuubi.session.Session
@@ -29,10 +27,6 @@ class SetCurrentCatalog(session: Session, catalog: String) extends SparkOperatio
2927

3028
override def getOperationLog: Option[OperationLog] = Option(operationLog)
3129

32-
override protected def resultSchema: StructType = {
33-
new StructType()
34-
}
35-
3630
override protected def runInternal(): Unit = {
3731
try {
3832
SparkCatalogUtils.setCurrentCatalog(spark, catalog)

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SetCurrentDatabase.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.kyuubi.engine.spark.operation
1919

20-
import org.apache.spark.sql.types.StructType
21-
2220
import org.apache.kyuubi.operation.log.OperationLog
2321
import org.apache.kyuubi.session.Session
2422

@@ -29,10 +27,6 @@ class SetCurrentDatabase(session: Session, database: String)
2927

3028
override def getOperationLog: Option[OperationLog] = Option(operationLog)
3129

32-
override protected def resultSchema: StructType = {
33-
new StructType()
34-
}
35-
3630
override protected def runInternal(): Unit = {
3731
try {
3832
spark.sessionState.catalogManager.setCurrentNamespace(Array(database))

externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,15 @@ abstract class SparkOperation(session: Session)
6060

6161
protected var result: DataFrame = _
6262

63-
protected def resultSchema: StructType
63+
protected def resultSchema: StructType = {
64+
if (!hasResultSet) {
65+
new StructType()
66+
} else if (result == null || result.schema.isEmpty) {
67+
new StructType().add("Result", "string")
68+
} else {
69+
result.schema
70+
}
71+
}
6472

6573
override def redactedStatement: String =
6674
redact(spark.sessionState.conf.stringRedactionPattern, statement)

0 commit comments

Comments
 (0)