Skip to content

Commit 04f4437

Browse files
wankundebowenliang123
authored andcommitted
[KYUUBI #6754][AUTHZ] Improve the performance of Ranger access requests deduplication
# 🔍 Description ## Issue References 🔗 This pull request fixes #6754 ## Describe Your Solution 🔧 Right now in RuleAuthorization we use an ArrayBuffer to collect access requests, which is very slow because each new PrivilegeObject needs to be compared with all access requests. ## Types of changes 🔖 - [x] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan 🧪 #### Behavior Without This Pull Request ⚰️ Add benchmark Before ```sh Java HotSpot(TM) 64-Bit Server VM 17.0.12+8-LTS-286 on Mac OS X 14.6 Apple M3 Collecting files ranger access request: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ 50000 files benchmark 181863 189434 NaN -0.0 -181863368958.0 1.0X ```` #### Behavior With This Pull Request 🎉 After ```sh Java HotSpot(TM) 64-Bit Server VM 17.0.12+8-LTS-286 on Mac OS X 14.6 Apple M3 Collecting files ranger access request: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------------------------------ 50000 files benchmark 1281 1310 33 -0.0 -1280563000.0 1.0X ``` #### Related Unit Tests Exists UT --- # Checklist 📝 - [x] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6758 from wankunde/ranger2. Closes #6754 9d7d196 [wankunde] [KYUUBI #6754] Improve the performance of ranger access requests 88b9c04 [wankun] Update extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/spark/sql/RuleAuthorizationBenchmark.scala 20c55fb [wankun] Update extensions/spark/kyuubi-spark-authz/pom.xml f5a3b6c [wankunde] [KYUUBI #6754] Improve the performance of ranger access requests 9793249 [wankunde] [KYUUBI #6754] Improve the performance of ranger access requests d86b01f [wankunde] [KYUUBI #6754] Improve the performance of ranger access requests b904b49 [wankunde] [KYUUBI #6754] Improve the performance of ranger access requests aad08a6 [wankunde] [KYUUBI #6754] Improve the performance of ranger access requests 1374604 [wankunde] [KYUUBI #6754] Improve the performance of ranger access requests 01e15c1 [wankun] Update extensions/spark/kyuubi-spark-authz/pom.xml 805e8a9 [wankun] Update extensions/spark/kyuubi-spark-authz/pom.xml e198179 [wankunde] [KYUUBI #6754] Improve the performance of ranger access requests Lead-authored-by: wankunde <[email protected]> Co-authored-by: wankun <[email protected]> Signed-off-by: Bowen Liang <[email protected]>
1 parent 1e9d68b commit 04f4437

File tree

5 files changed

+187
-4
lines changed

5 files changed

+187
-4
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
Java HotSpot(TM) 64-Bit Server VM 17.0.12+8-LTS-286 on Mac OS X 14.6
2+
Apple M3
3+
Collecting files ranger access request: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative
4+
------------------------------------------------------------------------------------------------------------------------
5+
50000 files benchmark 1281 1310 33 -0.0 -1280563000.0 1.0X
6+

extensions/spark/kyuubi-spark-authz/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,13 @@
380380
<artifactId>${hudi.artifact}</artifactId>
381381
<scope>test</scope>
382382
</dependency>
383+
<dependency>
384+
<groupId>org.apache.spark</groupId>
385+
<artifactId>spark-core_${scala.binary.version}</artifactId>
386+
<type>test-jar</type>
387+
<scope>test</scope>
388+
</dependency>
389+
383390
</dependencies>
384391

385392
<build>

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RuleAuthorization.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,15 @@
1717

1818
package org.apache.kyuubi.plugin.spark.authz.ranger
1919

20-
import scala.collection.mutable.ArrayBuffer
20+
import scala.collection.mutable
2121

2222
import org.apache.ranger.plugin.policyengine.RangerAccessRequest
2323
import org.apache.spark.sql.SparkSession
2424
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
2525

2626
import org.apache.kyuubi.plugin.spark.authz._
2727
import org.apache.kyuubi.plugin.spark.authz.ObjectType._
28+
import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType.AccessType
2829
import org.apache.kyuubi.plugin.spark.authz.ranger.SparkRangerAdminPlugin._
2930
import org.apache.kyuubi.plugin.spark.authz.rule.Authorization
3031
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
@@ -34,15 +35,19 @@ case class RuleAuthorization(spark: SparkSession) extends Authorization(spark) {
3435
val auditHandler = new SparkRangerAuditHandler
3536
val ugi = getAuthzUgi(spark.sparkContext)
3637
val (inputs, outputs, opType) = PrivilegesBuilder.build(plan, spark)
37-
val requests = new ArrayBuffer[AccessRequest]()
38+
39+
// Use a HashSet to deduplicate the same AccessResource and AccessType, the requests will be all
40+
// the non-duplicate requests and in the same order as the input requests.
41+
val requests = new mutable.ArrayBuffer[AccessRequest]()
42+
val requestsSet = new mutable.HashSet[(AccessResource, AccessType)]()
3843

3944
def addAccessRequest(objects: Iterable[PrivilegeObject], isInput: Boolean): Unit = {
4045
objects.foreach { obj =>
4146
val resource = AccessResource(obj, opType)
4247
val accessType = ranger.AccessType(obj, opType, isInput)
43-
if (accessType != AccessType.NONE && !requests.exists(o =>
44-
o.accessType == accessType && o.getResource == resource)) {
48+
if (accessType != AccessType.NONE && !requestsSet.contains((resource, accessType))) {
4549
requests += AccessRequest(resource, ugi, opType, accessType)
50+
requestsSet.add(resource, accessType)
4651
}
4752
}
4853
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.plugin.spark.authz.benchmark
19+
20+
import java.io.{File, FileOutputStream, OutputStream}
21+
22+
import scala.collection.JavaConverters._
23+
24+
import com.google.common.reflect.ClassPath
25+
import org.scalatest.Assertions._
26+
27+
trait KyuubiBenchmarkBase {
28+
var output: Option[OutputStream] = None
29+
30+
private val prefix = {
31+
val benchmarkClasses = ClassPath.from(Thread.currentThread.getContextClassLoader)
32+
.getTopLevelClassesRecursive("org.apache.spark.sql").asScala.toArray
33+
assert(benchmarkClasses.nonEmpty)
34+
val benchmark = benchmarkClasses.find(_.load().getName.endsWith("Benchmark"))
35+
val targetDirOrProjDir =
36+
new File(benchmark.get.load().getProtectionDomain.getCodeSource.getLocation.toURI)
37+
.getParentFile.getParentFile
38+
if (targetDirOrProjDir.getName == "target") {
39+
targetDirOrProjDir.getParentFile.getCanonicalPath + "/"
40+
} else {
41+
targetDirOrProjDir.getCanonicalPath + "/"
42+
}
43+
}
44+
45+
def withHeader(func: => Unit): Unit = {
46+
val version = System.getProperty("java.version").split("\\D+")(0).toInt
47+
val jdkString = if (version > 8) s"-jdk$version" else ""
48+
val resultFileName =
49+
s"${this.getClass.getSimpleName.replace("$", "")}$jdkString-results.txt"
50+
val dir = new File(s"${prefix}benchmarks/")
51+
if (!dir.exists()) {
52+
// scalastyle:off println
53+
println(s"Creating ${dir.getAbsolutePath} for benchmark results.")
54+
// scalastyle:on println
55+
dir.mkdirs()
56+
}
57+
val file = new File(dir, resultFileName)
58+
if (!file.exists()) {
59+
file.createNewFile()
60+
}
61+
output = Some(new FileOutputStream(file))
62+
63+
func
64+
65+
output.foreach { o =>
66+
if (o != null) {
67+
o.close()
68+
}
69+
}
70+
}
71+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql
19+
20+
import java.io.File
21+
import java.nio.file.Files
22+
23+
import scala.concurrent.ExecutionContext.Implicits.global
24+
import scala.concurrent.Future
25+
import scala.concurrent.duration.Duration
26+
import scala.reflect.io.Path.jfile2path
27+
28+
import org.apache.spark.benchmark.Benchmark
29+
import org.scalatest.BeforeAndAfterAll
30+
// scalastyle:off
31+
import org.scalatest.funsuite.AnyFunSuite
32+
33+
import org.apache.kyuubi.plugin.spark.authz.SparkSessionProvider
34+
import org.apache.kyuubi.plugin.spark.authz.benchmark.KyuubiBenchmarkBase
35+
import org.apache.kyuubi.plugin.spark.authz.ranger.RuleAuthorization
36+
import org.apache.kyuubi.util.ThreadUtils
37+
38+
/**
39+
* Benchmark to measure performance of collecting ranger access requests.
40+
*
41+
* {{{
42+
* RUN_BENCHMARK=1 ./build/mvn clean test \
43+
* -pl extensions/spark/kyuubi-spark-authz -am \
44+
* -Dtest=none -DwildcardSuites=org.apache.spark.sql.RuleAuthorizationBenchmark
45+
* }}}
46+
*/
47+
class RuleAuthorizationBenchmark extends AnyFunSuite
48+
with SparkSessionProvider with BeforeAndAfterAll
49+
with KyuubiBenchmarkBase {
50+
// scalastyle:on
51+
52+
override protected val catalogImpl: String = "hive"
53+
private val runBenchmark = sys.env.contains("RUN_BENCHMARK")
54+
55+
private val fileNumbers = 50000
56+
57+
private var tempDir: File = _
58+
59+
override def beforeAll(): Unit = {
60+
tempDir = Files.createTempDirectory("kyuubi-test-").toFile
61+
}
62+
63+
override def afterAll(): Unit = {
64+
if (tempDir != null) {
65+
tempDir.deleteRecursively()
66+
}
67+
spark.stop()
68+
super.afterAll()
69+
}
70+
71+
test("Collecting files ranger access request") {
72+
assume(runBenchmark)
73+
74+
val futures = (1 to fileNumbers).map { i =>
75+
Future {
76+
val file = new File(tempDir, s"file_$i.txt")
77+
file.createNewFile()
78+
}
79+
}
80+
val allFutures = Future.sequence(futures)
81+
ThreadUtils.awaitResult(allFutures, Duration.Inf)
82+
83+
val df = spark.read.text(tempDir + "/file_*.txt")
84+
val plan = df.queryExecution.optimizedPlan
85+
86+
withHeader {
87+
val benchmark = new Benchmark(s"Collecting files ranger access request", -1, output = output)
88+
benchmark.addCase(s"$fileNumbers files benchmark", 3) { _ =>
89+
RuleAuthorization(spark).checkPrivileges(spark, plan)
90+
}
91+
benchmark.run()
92+
}
93+
}
94+
}

0 commit comments

Comments
 (0)