Skip to content

Commit 05d0637

Browse files
ygjiawForget
authored andcommitted
[KYUUBI #7186] Introduce RuleFunctionAuthorization for persistent function calls authorization
### Why are the changes needed? Close #7186 ### How was this patch tested? Add new UTs and verified in a cluster with ranger hive spark service. ### Was this patch authored or co-authored using generative AI tooling? No Closes #7187 from ygjia/hive-udf. Closes #7186 cfeb2e6 [Yaguang Jia] address review comments 4977783 [Yaguang Jia] Merge remote-tracking branch 'apache/master' into hive-udf 211da6d [Yaguang Jia] address review comments 249c321 [Yaguang Jia] change config to spark.kyuubi.authz.udf.enabled cbb0225 [Yaguang Jia] fix scalastyle check a99e2d2 [Yaguang Jia] add spark.kyuubi.conf.authz.udf.enabled 6afed7a [Yaguang Jia] update RuleFunctionAuthorization 85d19d2 [Yaguang Jia] mv test to HiveCatalogRangerSparkExtensionSuite 114aafe [Yaguang Jia] mv test to HiveCatalogRangerSparkExtensionSuite 05c7de0 [Yaguang Jia] revert mock 0096db0 [Yaguang Jia] test RuleFunctionAuthorization in RangerSparkExtensionSuite. 60df218 [Yaguang Jia] fix buildFunctions for command 5b20926 [Yaguang Jia] add ut for RuleFunctionAuthorization 38c2577 [Yaguang Jia] add built in and udf test 939dd98 [Yaguang Jia] add RuleFunctionAuthorization to support hive udf Authorization Authored-by: Yaguang Jia <[email protected]> Signed-off-by: wforget <[email protected]>
1 parent 0b1b2f2 commit 05d0637

File tree

8 files changed

+309
-23
lines changed

8 files changed

+309
-23
lines changed

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,9 +267,7 @@ object PrivilegesBuilder {
267267
val spec = getTableCommandSpec(command)
268268
val functionPrivAndOpType = spec.queries(plan)
269269
.map(plan => buildFunctions(plan, spark))
270-
functionPrivAndOpType.map(_._1)
271-
.reduce(_ ++ _)
272-
.foreach(functionPriv => inputObjs += functionPriv)
270+
inputObjs ++= functionPrivAndOpType.flatMap(_._1)
273271

274272
case plan => plan transformAllExpressions {
275273
case hiveFunction: Expression if isKnownFunction(hiveFunction) =>

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/AccessResource.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class AccessResource private (val objectType: ObjectType, val catalog: Option[St
3232
extends RangerAccessResourceImpl {
3333
implicit def asString(obj: Object): String = if (obj != null) obj.asInstanceOf[String] else null
3434
def getDatabase: String = getValue("database")
35+
def getUdf: String = getValue("udf")
3536
def getTable: String = getValue("table")
3637
def getColumn: String = getValue("column")
3738
def getColumns: Seq[String] = {

extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/ranger/RangerSparkExtension.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ class RangerSparkExtension extends (SparkSessionExtensions => Unit) {
4545

4646
override def apply(v1: SparkSessionExtensions): Unit = {
4747
v1.injectCheckRule(AuthzConfigurationChecker)
48+
// RuleFunctionAuthorization should use injectCheckRule instead of injectOptimizerRule,
49+
// because ConstantFolding will optimize deterministic UDFs with foldable
50+
// inputs (e.g., literals), replacing them with their results and bypassing permission checks.
51+
v1.injectCheckRule(RuleFunctionAuthorization)
4852
v1.injectResolutionRule(_ => RuleReplaceShowObjectCommands)
4953
v1.injectResolutionRule(_ => RuleApplyPermanentViewMarker)
5054
v1.injectResolutionRule(_ => RuleApplyTypeOfMarker)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.kyuubi.plugin.spark.authz.ranger
19+
20+
import scala.collection.mutable
21+
22+
import org.apache.ranger.plugin.policyengine.RangerAccessRequest
23+
import org.apache.spark.sql.SparkSession
24+
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
25+
26+
import org.apache.kyuubi.plugin.spark.authz._
27+
import org.apache.kyuubi.plugin.spark.authz.ranger.AccessType.AccessType
28+
import org.apache.kyuubi.plugin.spark.authz.ranger.SparkRangerAdminPlugin._
29+
import org.apache.kyuubi.plugin.spark.authz.util.AuthZUtils._
30+
31+
case class RuleFunctionAuthorization(spark: SparkSession) extends (LogicalPlan => Unit) {
32+
final val AUTHZ_UDF_KEY: String = "spark.kyuubi.authz.udf.enabled"
33+
private val authzUDFEnabled: Boolean =
34+
spark.conf.getOption(AUTHZ_UDF_KEY).exists(_.equalsIgnoreCase("true"))
35+
override def apply(plan: LogicalPlan): Unit = {
36+
if (!authzUDFEnabled) {
37+
return
38+
}
39+
40+
val auditHandler = new SparkRangerAuditHandler
41+
val ugi = getAuthzUgi(spark.sparkContext)
42+
val (inputs, _, opType) = PrivilegesBuilder.buildFunctions(plan, spark)
43+
44+
// Use a HashSet to deduplicate the same AccessResource and AccessType, the requests will be all
45+
// the non-duplicate requests and in the same order as the input requests.
46+
val requests = new mutable.ArrayBuffer[AccessRequest]()
47+
val requestsSet = new mutable.HashSet[(AccessResource, AccessType)]()
48+
49+
def addAccessRequest(objects: Iterable[PrivilegeObject], isInput: Boolean): Unit = {
50+
objects.foreach { obj =>
51+
val resource = AccessResource(obj, opType)
52+
val accessType = ranger.AccessType(obj, opType, isInput)
53+
if (accessType != AccessType.NONE && !requestsSet.contains((resource, accessType))) {
54+
requests += AccessRequest(resource, ugi, opType, accessType)
55+
requestsSet.add(resource, accessType)
56+
}
57+
}
58+
}
59+
60+
addAccessRequest(inputs, isInput = true)
61+
62+
val requestSeq: Seq[RangerAccessRequest] =
63+
requests.map(_.asInstanceOf[RangerAccessRequest]).toSeq
64+
65+
if (authorizeInSingleCall) {
66+
verify(requestSeq, auditHandler)
67+
} else {
68+
requestSeq.foreach { req =>
69+
verify(Seq(req), auditHandler)
70+
}
71+
}
72+
}
73+
}

extensions/spark/kyuubi-spark-authz/src/test/gen/scala/org/apache/kyuubi/plugin/spark/authz/gen/PolicyJsonFileGenerator.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ class PolicyJsonFileGenerator extends AnyFunSuite {
110110
policyAccessForPermViewAccessOnly,
111111
policyAccessForTable2AccessOnly,
112112
policyAccessForPaimonNsTable1SelectOnly,
113+
policyAccessForDefaultDbUDF,
113114
// row filter
114115
policyFilterForSrcTableKeyLessThan20,
115116
policyFilterForPermViewKeyLessThan20,
@@ -371,4 +372,20 @@ class PolicyJsonFileGenerator extends AnyFunSuite {
371372
users = List(table1OnlyUserForNs),
372373
accesses = allowTypes(select),
373374
delegateAdmin = true)))
375+
376+
private val policyAccessForDefaultDbUDF = KRangerPolicy(
377+
name = "defaultdb_udf",
378+
description = "Policy for default db udf",
379+
resources = Map(
380+
databaseRes(defaultDb),
381+
"udf" -> KRangerPolicyResource(values = List("kyuubi_func*"))),
382+
policyItems = List(
383+
KRangerPolicyItem(
384+
users = List(bob),
385+
accesses = allowTypes(select, update, create, drop, alter, index, lock, all, read, write),
386+
delegateAdmin = true),
387+
KRangerPolicyItem(
388+
users = List(kent),
389+
accesses = allowTypes(select),
390+
delegateAdmin = true)))
374391
}

extensions/spark/kyuubi-spark-authz/src/test/resources/sparkSql_hive_jenkins.json

Lines changed: 80 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,72 @@
510510
"isEnabled" : true,
511511
"version" : 1,
512512
"service" : "hive_jenkins",
513+
"name" : "defaultdb_udf",
514+
"policyType" : 0,
515+
"policyPriority" : 0,
516+
"description" : "Policy for default db udf",
517+
"isAuditEnabled" : true,
518+
"resources" : {
519+
"database" : {
520+
"values" : [ "default" ],
521+
"isExcludes" : false,
522+
"isRecursive" : false
523+
},
524+
"udf" : {
525+
"values" : [ "kyuubi_func*" ],
526+
"isExcludes" : false,
527+
"isRecursive" : false
528+
}
529+
},
530+
"policyItems" : [ {
531+
"accesses" : [ {
532+
"type" : "select",
533+
"isAllowed" : true
534+
}, {
535+
"type" : "update",
536+
"isAllowed" : true
537+
}, {
538+
"type" : "create",
539+
"isAllowed" : true
540+
}, {
541+
"type" : "drop",
542+
"isAllowed" : true
543+
}, {
544+
"type" : "alter",
545+
"isAllowed" : true
546+
}, {
547+
"type" : "index",
548+
"isAllowed" : true
549+
}, {
550+
"type" : "lock",
551+
"isAllowed" : true
552+
}, {
553+
"type" : "all",
554+
"isAllowed" : true
555+
}, {
556+
"type" : "read",
557+
"isAllowed" : true
558+
}, {
559+
"type" : "write",
560+
"isAllowed" : true
561+
} ],
562+
"users" : [ "bob" ],
563+
"delegateAdmin" : true
564+
}, {
565+
"accesses" : [ {
566+
"type" : "select",
567+
"isAllowed" : true
568+
} ],
569+
"users" : [ "kent" ],
570+
"delegateAdmin" : true
571+
} ],
572+
"isDenyAllElse" : false
573+
}, {
574+
"id" : 11,
575+
"guid" : "6512bd43-d9ca-36e0-ac99-0b0a82652dca",
576+
"isEnabled" : true,
577+
"version" : 1,
578+
"service" : "hive_jenkins",
513579
"name" : "src_key_less_than_20",
514580
"policyType" : 2,
515581
"policyPriority" : 0,
@@ -539,8 +605,8 @@
539605
} ],
540606
"isDenyAllElse" : false
541607
}, {
542-
"id" : 11,
543-
"guid" : "6512bd43-d9ca-36e0-ac99-0b0a82652dca",
608+
"id" : 12,
609+
"guid" : "c20ad4d7-6fe9-3759-aa27-a0c99bff6710",
544610
"isEnabled" : true,
545611
"version" : 1,
546612
"service" : "hive_jenkins",
@@ -573,8 +639,8 @@
573639
} ],
574640
"isDenyAllElse" : false
575641
}, {
576-
"id" : 12,
577-
"guid" : "c20ad4d7-6fe9-3759-aa27-a0c99bff6710",
642+
"id" : 13,
643+
"guid" : "c51ce410-c124-310e-8db5-e4b97fc2af39",
578644
"isEnabled" : true,
579645
"version" : 1,
580646
"service" : "hive_jenkins",
@@ -612,8 +678,8 @@
612678
} ],
613679
"isDenyAllElse" : false
614680
}, {
615-
"id" : 13,
616-
"guid" : "c51ce410-c124-310e-8db5-e4b97fc2af39",
681+
"id" : 14,
682+
"guid" : "aab32389-22bc-325a-af60-6eb525ffdc56",
617683
"isEnabled" : true,
618684
"version" : 1,
619685
"service" : "hive_jenkins",
@@ -651,8 +717,8 @@
651717
} ],
652718
"isDenyAllElse" : false
653719
}, {
654-
"id" : 14,
655-
"guid" : "aab32389-22bc-325a-af60-6eb525ffdc56",
720+
"id" : 15,
721+
"guid" : "9bf31c7f-f062-336a-96d3-c8bd1f8f2ff3",
656722
"isEnabled" : true,
657723
"version" : 1,
658724
"service" : "hive_jenkins",
@@ -690,8 +756,8 @@
690756
} ],
691757
"isDenyAllElse" : false
692758
}, {
693-
"id" : 15,
694-
"guid" : "9bf31c7f-f062-336a-96d3-c8bd1f8f2ff3",
759+
"id" : 16,
760+
"guid" : "c74d97b0-1eae-357e-84aa-9d5bade97baf",
695761
"isEnabled" : true,
696762
"version" : 1,
697763
"service" : "hive_jenkins",
@@ -729,8 +795,8 @@
729795
} ],
730796
"isDenyAllElse" : false
731797
}, {
732-
"id" : 16,
733-
"guid" : "c74d97b0-1eae-357e-84aa-9d5bade97baf",
798+
"id" : 17,
799+
"guid" : "70efdf2e-c9b0-3607-9795-c442636b55fb",
734800
"isEnabled" : true,
735801
"version" : 1,
736802
"service" : "hive_jenkins",
@@ -768,8 +834,8 @@
768834
} ],
769835
"isDenyAllElse" : false
770836
}, {
771-
"id" : 17,
772-
"guid" : "70efdf2e-c9b0-3607-9795-c442636b55fb",
837+
"id" : 18,
838+
"guid" : "6f4922f4-5568-361a-8cdf-4ad2299f6d23",
773839
"isEnabled" : true,
774840
"version" : 1,
775841
"service" : "hive_jenkins",

extensions/spark/kyuubi-spark-authz/src/test/scala/org/apache/kyuubi/plugin/spark/authz/FunctionPrivilegesBuilderSuite.scala

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,4 +193,89 @@ class HiveFunctionPrivilegesBuilderSuite extends FunctionPrivilegesBuilderSuite
193193
}
194194
}
195195

196+
test("Built in and UDF Function Call Query") {
197+
val plan = sql(
198+
s"""
199+
|SELECT
200+
| kyuubi_fun_0('TESTSTRING') AS col1,
201+
| kyuubi_fun_0(value) AS col2,
202+
| abs(key) AS col3, abs(-100) AS col4,
203+
| lower(value) AS col5,lower('TESTSTRING') AS col6
204+
|FROM $reusedTable
205+
|""".stripMargin).queryExecution.analyzed
206+
val (inputs, _, _) = PrivilegesBuilder.buildFunctions(plan, spark)
207+
assert(inputs.size === 2)
208+
inputs.foreach { po =>
209+
assert(po.actionType === PrivilegeObjectActionType.OTHER)
210+
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
211+
assert(po.dbname startsWith reusedDb.toLowerCase)
212+
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
213+
val accessType = ranger.AccessType(po, QUERY, isInput = true)
214+
assert(accessType === AccessType.SELECT)
215+
}
216+
}
217+
218+
test("Function Call in Create Table/View") {
219+
val plan1 = sql(
220+
s"""
221+
|CREATE TABLE table1 AS
222+
|SELECT
223+
| kyuubi_fun_0('KYUUBI_TESTSTRING'),
224+
| kyuubi_fun_0(value)
225+
|FROM $reusedTable
226+
|""".stripMargin).queryExecution.analyzed
227+
val (inputs1, _, _) = PrivilegesBuilder.buildFunctions(plan1, spark)
228+
assert(inputs1.size === 2)
229+
inputs1.foreach { po =>
230+
assert(po.actionType === PrivilegeObjectActionType.OTHER)
231+
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
232+
assert(po.dbname startsWith reusedDb.toLowerCase)
233+
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
234+
val accessType = ranger.AccessType(po, QUERY, isInput = true)
235+
assert(accessType === AccessType.SELECT)
236+
}
237+
val plan2 = sql("DROP TABLE IF EXISTS table1").queryExecution.analyzed
238+
val (inputs2, _, _) = PrivilegesBuilder.buildFunctions(plan2, spark)
239+
assert(inputs2.size === 0)
240+
241+
val plan3 = sql(
242+
s"""
243+
|CREATE VIEW view1 AS SELECT
244+
| kyuubi_fun_0('KYUUBI_TESTSTRING') AS fun1,
245+
| kyuubi_fun_0(value) AS fun2
246+
|FROM $reusedTable
247+
|""".stripMargin).queryExecution.analyzed
248+
val (inputs3, _, _) = PrivilegesBuilder.buildFunctions(plan3, spark)
249+
assert(inputs3.size === 2)
250+
inputs3.foreach { po =>
251+
assert(po.actionType === PrivilegeObjectActionType.OTHER)
252+
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
253+
assert(po.dbname startsWith reusedDb.toLowerCase)
254+
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
255+
val accessType = ranger.AccessType(po, QUERY, isInput = true)
256+
assert(accessType === AccessType.SELECT)
257+
}
258+
val plan4 = sql("DROP VIEW IF EXISTS view1").queryExecution.analyzed
259+
val (inputs4, _, _) = PrivilegesBuilder.buildFunctions(plan4, spark)
260+
assert(inputs4.size === 0)
261+
}
262+
263+
test("Function Call in INSERT OVERWRITE") {
264+
val plan = sql(
265+
s"""
266+
|INSERT OVERWRITE TABLE $reusedTable
267+
|SELECT key, kyuubi_fun_0(value)
268+
|FROM $reusedPartTable
269+
|""".stripMargin).queryExecution.analyzed
270+
val (inputsUpdate, _, _) = PrivilegesBuilder.buildFunctions(plan, spark)
271+
assert(inputsUpdate.size === 1)
272+
inputsUpdate.foreach { po =>
273+
assert(po.actionType === PrivilegeObjectActionType.OTHER)
274+
assert(po.privilegeObjectType === PrivilegeObjectType.FUNCTION)
275+
assert(po.dbname startsWith reusedDb.toLowerCase)
276+
assert(po.objectName startsWith functionNamePrefix.toLowerCase)
277+
val accessType = ranger.AccessType(po, QUERY, isInput = true)
278+
assert(accessType === AccessType.SELECT)
279+
}
280+
}
196281
}

0 commit comments

Comments
 (0)