Skip to content

Commit c670824

Browse files
authored
Merge pull request #6818 from liaolecheng/spark
Implement default resource interpreter in third-party resourcecustomizations for SparkApplication
2 parents f26d76a + 3dd059a commit c670824

File tree

5 files changed

+530
-0
lines changed

5 files changed

+530
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
apiVersion: config.karmada.io/v1alpha1
2+
kind: ResourceInterpreterCustomization
3+
metadata:
4+
name: declarative-configuration-sparkapplication
5+
spec:
6+
target:
7+
apiVersion: sparkoperator.k8s.io/v1beta2
8+
kind: SparkApplication
9+
customizations:
10+
healthInterpretation:
11+
luaScript: >
12+
function InterpretHealth(observedObj)
13+
if not observedObj or
14+
not observedObj.status or
15+
not observedObj.status.applicationState or
16+
not observedObj.status.applicationState.state then
17+
return false
18+
end
19+
20+
-- Only the 'FAILED' state is considered unhealthy. All other states are treated
21+
-- as healthy or recoverable.
22+
local state = observedObj.status.applicationState.state
23+
if state == 'FAILED' then
24+
return false
25+
end
26+
return true
27+
end
28+
componentResource:
29+
luaScript: |
30+
local kube = require("kube")
31+
32+
local function isempty(s)
33+
return s == nil or s == ''
34+
end
35+
36+
-- Safe fetch of deeply nested table fields.
37+
local function get(obj, path)
38+
local cur = obj
39+
for i = 1, #path do
40+
if cur == nil then
41+
return nil
42+
end
43+
cur = cur[path[i]]
44+
end
45+
return cur
46+
end
47+
48+
-- Normalize possibly-string numbers with a default.
49+
local function to_num(v, default)
50+
if v == nil or v == '' then
51+
return default
52+
end
53+
local n = tonumber(v)
54+
if n ~= nil then
55+
return n
56+
end
57+
return default
58+
end
59+
60+
-- JSON-safe deep clone: strings/numbers/booleans/tables. Needed to prevent shared table references.
61+
local function clone_plain(val, seen)
62+
local tv = type(val)
63+
if tv ~= "table" then
64+
if tv == "string" or tv == "number" or tv == "boolean" or tv == "nil" then
65+
return val
66+
end
67+
return nil
68+
end
69+
seen = seen or {}
70+
if seen[val] then return nil end
71+
seen[val] = true
72+
local out = {}
73+
for k, v in pairs(val) do
74+
local tk = type(k)
75+
if tk == "string" or tk == "number" then
76+
local cv = clone_plain(v, seen)
77+
if cv ~= nil then out[k] = cv end
78+
end
79+
end
80+
seen[val] = nil
81+
return out
82+
end
83+
84+
local function apply_pod_template(pt_spec, requires)
85+
if pt_spec == nil then
86+
return
87+
end
88+
89+
local nodeSelector = clone_plain(pt_spec.nodeSelector)
90+
local tolerations = clone_plain(pt_spec.tolerations)
91+
local priority = pt_spec.priorityClassName
92+
local hardNodeAffinity = nil
93+
if pt_spec.affinity and pt_spec.affinity.nodeAffinity then
94+
hardNodeAffinity = clone_plain(pt_spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution)
95+
end
96+
97+
-- Only create nodeClaim if there is content
98+
if nodeSelector ~= nil or tolerations ~= nil or hardNodeAffinity ~= nil then
99+
requires.nodeClaim = requires.nodeClaim or {}
100+
requires.nodeClaim.nodeSelector = nodeSelector
101+
requires.nodeClaim.tolerations = tolerations
102+
requires.nodeClaim.hardNodeAffinity = hardNodeAffinity
103+
end
104+
105+
if not isempty(priority) then
106+
requires.priorityClassName = priority
107+
end
108+
end
109+
110+
local MIN_MEMORY_OVERHEAD = "384m"
111+
local JVM_DEFAULT_OVERHEAD_FACTOR = 0.10
112+
local NON_JVM_DEFAULT_OVERHEAD_FACTOR = 0.40
113+
114+
local kube_unit_order = { B = 0, Ki = 1, Mi = 2, Gi = 3, Ti = 4, Pi = 5 }
115+
local kube_unit_scale = { B = 1, Ki = 1024, Mi = 1024^2, Gi = 1024^3, Ti = 1024^4, Pi = 1024^5 }
116+
117+
local function parse_java_memory(java_mem_str)
118+
local value_str, unit = tostring(java_mem_str):lower():match("^(%d+%.?%d*)([a-z]*)$")
119+
local unit_map = {
120+
b = "B",
121+
kb = "Ki", k = "Ki",
122+
mb = "Mi", m = "Mi",
123+
gb = "Gi", g = "Gi",
124+
tb = "Ti", t = "Ti",
125+
pb = "Pi", p = "Pi",
126+
}
127+
local value = tonumber(value_str)
128+
return value, unit_map[unit]
129+
end
130+
131+
local function convert_unit(value, from_unit, to_unit)
132+
if from_unit == to_unit then return value end
133+
local scale = kube_unit_scale[from_unit] / kube_unit_scale[to_unit]
134+
return value * scale
135+
end
136+
137+
local function smaller_unit(u1, u2)
138+
return kube_unit_order[u1] < kube_unit_order[u2] and u1 or u2
139+
end
140+
141+
local function calculate_total_memory_request(memory_str, overhead_str, memory_overhead_factor, min_memory_overhead)
142+
local mem_value, mem_unit = parse_java_memory(memory_str)
143+
local over_value, over_unit
144+
145+
if overhead_str then
146+
over_value, over_unit = parse_java_memory(overhead_str)
147+
else
148+
local overhead_calc = mem_value * memory_overhead_factor
149+
local min_over_value, min_over_unit = parse_java_memory(min_memory_overhead)
150+
151+
local smaller_u = smaller_unit(mem_unit, min_over_unit)
152+
local overhead_calc_in_smaller = convert_unit(overhead_calc, mem_unit, smaller_u)
153+
local min_over_in_smaller = convert_unit(min_over_value, min_over_unit, smaller_u)
154+
155+
over_value, over_unit = math.max(overhead_calc_in_smaller, min_over_in_smaller), smaller_u
156+
end
157+
158+
local final_unit = smaller_unit(mem_unit, over_unit)
159+
local mem_in_final = convert_unit(mem_value, mem_unit, final_unit)
160+
local over_in_final = convert_unit(over_value, over_unit, final_unit)
161+
local total_value = mem_in_final + over_in_final
162+
163+
return string.format("%f%s", total_value, final_unit)
164+
end
165+
166+
function GetComponents(observedObj)
167+
local components = {}
168+
169+
local job_type = get(observedObj, {"spec","type"})
170+
local job = job_type:lower()
171+
local is_jvm_job = (job == "java" or job == "scala")
172+
local default_overhead_factor = is_jvm_job and JVM_DEFAULT_OVERHEAD_FACTOR or NON_JVM_DEFAULT_OVERHEAD_FACTOR
173+
local memory_overhead_factor = to_num(get(observedObj, {"spec", "memoryOverheadFactor"}), default_overhead_factor)
174+
175+
-- Driver
176+
local drv_replicas = 1 -- Spark Driver always has 1 instance
177+
local drv_requires = {
178+
resourceRequest = {}
179+
}
180+
181+
local drv_cpu = get(observedObj, {"spec","driver","cores"}) or 1
182+
local drv_memory_str = get(observedObj, {"spec", "driver", "memory"}) or "1g"
183+
local drv_overhead_str = get(observedObj, {"spec", "driver", "memoryOverhead"})
184+
drv_requires.resourceRequest.cpu = drv_cpu
185+
drv_requires.resourceRequest.memory = calculate_total_memory_request(drv_memory_str, drv_overhead_str, memory_overhead_factor, MIN_MEMORY_OVERHEAD)
186+
187+
local drv_gpu = get(observedObj, {"spec","driver","gpu"})
188+
if drv_gpu ~= nil then
189+
local gpu_name = drv_gpu.name
190+
local gpu_qty = to_num(drv_gpu.quantity, 0)
191+
if not isempty(gpu_name) and gpu_qty > 0 then
192+
drv_requires.resourceRequest[gpu_name] = gpu_qty
193+
end
194+
end
195+
196+
apply_pod_template(get(observedObj, {"spec","driver"}), drv_requires)
197+
198+
local driverComponent = {
199+
name = "driver",
200+
replicas = drv_replicas,
201+
replicaRequirements = drv_requires
202+
}
203+
table.insert(components, driverComponent)
204+
205+
-- Executor
206+
local exe_replicas = to_num(get(observedObj, {"spec","executor","instances"}), 1)
207+
local dynamic_enabled = get(observedObj, {"spec", "dynamicAllocation", "enabled"}) or false
208+
if dynamic_enabled then
209+
local initial_exec = to_num(get(observedObj, {"spec", "dynamicAllocation", "initialExecutors"}), 1)
210+
local min_exec = to_num(get(observedObj, {"spec", "dynamicAllocation", "minExecutors"}), 1)
211+
if initial_exec > exe_replicas then
212+
exe_replicas = initial_exec
213+
end
214+
if min_exec > exe_replicas then
215+
exe_replicas = min_exec
216+
end
217+
end
218+
219+
local exe_requires = {
220+
resourceRequest = {}
221+
}
222+
223+
local exe_cpu = get(observedObj, {"spec","executor","cores"}) or 1
224+
local exe_memory_str = get(observedObj, {"spec", "executor", "memory"}) or "1g"
225+
local exe_overhead_str = get(observedObj, {"spec", "executor", "memoryOverhead"})
226+
exe_requires.resourceRequest.cpu = exe_cpu
227+
exe_requires.resourceRequest.memory = calculate_total_memory_request(exe_memory_str, exe_overhead_str, memory_overhead_factor, MIN_MEMORY_OVERHEAD)
228+
229+
local exe_gpu = get(observedObj, {"spec","executor","gpu"})
230+
if exe_gpu ~= nil then
231+
local gpu_name = exe_gpu.name
232+
local gpu_qty = to_num(exe_gpu.quantity, 0)
233+
if not isempty(gpu_name) and gpu_qty > 0 then
234+
exe_requires.resourceRequest[gpu_name] = gpu_qty
235+
end
236+
end
237+
238+
apply_pod_template(get(observedObj, {"spec","executor"}), exe_requires)
239+
240+
local executorComponent = {
241+
name = "executor",
242+
replicas = exe_replicas,
243+
replicaRequirements = exe_requires
244+
}
245+
table.insert(components, executorComponent)
246+
247+
return components
248+
end
249+
statusAggregation:
250+
luaScript: >
251+
function AggregateStatus(desiredObj, statusItems)
252+
if statusItems == nil then
253+
return desiredObj
254+
end
255+
if desiredObj.status == nil then
256+
desiredObj.status = {}
257+
end
258+
259+
if #statusItems == 1 then
260+
desiredObj.status = statusItems[1].status
261+
return desiredObj
262+
end
263+
264+
local statePriority = {
265+
["UNKNOWN"] = 0,
266+
[""] = 0,
267+
["SUBMITTED"] = 1,
268+
["PENDING_RERUN"] = 1,
269+
["COMPLETED"] = 2,
270+
["SUSPENDING"] = 3,
271+
["SUSPENDED"] = 3,
272+
["RUNNING"] = 4,
273+
["RESUMING"] = 4,
274+
["SUCCEEDING"] = 4,
275+
["INVALIDATING"] = 5,
276+
["FAILING"] = 6,
277+
["FAILED"] = 6,
278+
["SUBMISSION_FAILED"] = 6,
279+
}
280+
281+
local applicationState = {}
282+
local executionAttempts = 0
283+
local executorState = {}
284+
local submissionAttempts = 0
285+
local lastSubmissionAttemptTime = ""
286+
local terminationTime = ""
287+
local worstPriority = -1
288+
289+
for i = 1, #statusItems do
290+
local currentStatus = statusItems[i].status
291+
if currentStatus ~= nil then
292+
if currentStatus.applicationState ~= nil then
293+
local s = currentStatus.applicationState.state
294+
local p = statePriority[s]
295+
if p > worstPriority then
296+
worstPriority = p
297+
applicationState = currentStatus.applicationState
298+
end
299+
end
300+
executionAttempts = executionAttempts + (currentStatus.executionAttempts or 0)
301+
submissionAttempts = submissionAttempts + (currentStatus.submissionAttempts or 0)
302+
if currentStatus.lastSubmissionAttemptTime and (lastSubmissionAttemptTime == "" or currentStatus.lastSubmissionAttemptTime > lastSubmissionAttemptTime) then
303+
lastSubmissionAttemptTime = currentStatus.lastSubmissionAttemptTime
304+
end
305+
if currentStatus.terminationTime and (terminationTime == "" or currentStatus.terminationTime > terminationTime) then
306+
terminationTime = currentStatus.terminationTime
307+
end
308+
if currentStatus.executorState ~= nil then
309+
for exec, state in pairs(currentStatus.executorState) do
310+
executorState[exec] = state
311+
end
312+
end
313+
end
314+
end
315+
316+
desiredObj.status.applicationState = applicationState
317+
desiredObj.status.executionAttempts = executionAttempts
318+
desiredObj.status.executorState = executorState
319+
desiredObj.status.submissionAttempts = submissionAttempts
320+
desiredObj.status.lastSubmissionAttemptTime = lastSubmissionAttemptTime
321+
desiredObj.status.terminationTime = terminationTime
322+
return desiredObj
323+
end
324+
statusReflection:
325+
luaScript: >
326+
function ReflectStatus(observedObj)
327+
local status = {}
328+
if observedObj == nil or observedObj.status == nil then
329+
return status
330+
end
331+
332+
status.applicationState = observedObj.status.applicationState
333+
status.driverInfo = observedObj.status.driverInfo
334+
status.executorState = observedObj.status.executorState
335+
status.sparkApplicationId = observedObj.status.sparkApplicationId
336+
status.lastSubmissionAttemptTime = observedObj.status.lastSubmissionAttemptTime
337+
status.submissionAttempts = observedObj.status.submissionAttempts
338+
status.executionAttempts = observedObj.status.executionAttempts
339+
status.submissionID = observedObj.status.submissionID
340+
status.terminationTime = observedObj.status.terminationTime
341+
return status
342+
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
tests:
2+
- desiredInputPath: testdata/desired-sparkapplication.yaml
3+
statusInputPath: testdata/status-file.yaml
4+
operation: AggregateStatus
5+
- desiredInputPath: testdata/desired-sparkapplication.yaml
6+
operation: InterpretComponent
7+
- observedInputPath: testdata/observed-sparkapplication.yaml
8+
operation: InterpretHealth
9+
- observedInputPath: testdata/observed-sparkapplication.yaml
10+
operation: InterpretStatus

0 commit comments

Comments
 (0)