|
| 1 | +apiVersion: config.karmada.io/v1alpha1 |
| 2 | +kind: ResourceInterpreterCustomization |
| 3 | +metadata: |
| 4 | + name: declarative-configuration-sparkapplication |
| 5 | +spec: |
| 6 | + target: |
| 7 | + apiVersion: sparkoperator.k8s.io/v1beta2 |
| 8 | + kind: SparkApplication |
| 9 | + customizations: |
| 10 | + healthInterpretation: |
| 11 | + luaScript: > |
| 12 | + function InterpretHealth(observedObj) |
| 13 | + if not observedObj or |
| 14 | + not observedObj.status or |
| 15 | + not observedObj.status.applicationState or |
| 16 | + not observedObj.status.applicationState.state then |
| 17 | + return false |
| 18 | + end |
| 19 | + |
| 20 | + -- Only the 'FAILED' state is considered unhealthy. All other states are treated |
| 21 | + -- as healthy or recoverable. |
| 22 | + local state = observedObj.status.applicationState.state |
| 23 | + if state == 'FAILED' then |
| 24 | + return false |
| 25 | + end |
| 26 | + return true |
| 27 | + end |
| 28 | + componentResource: |
| 29 | + luaScript: | |
| 30 | + local kube = require("kube") |
| 31 | +
|
| 32 | + local function isempty(s) |
| 33 | + return s == nil or s == '' |
| 34 | + end |
| 35 | +
|
| 36 | + -- Safe fetch of deeply nested table fields. |
| 37 | + local function get(obj, path) |
| 38 | + local cur = obj |
| 39 | + for i = 1, #path do |
| 40 | + if cur == nil then |
| 41 | + return nil |
| 42 | + end |
| 43 | + cur = cur[path[i]] |
| 44 | + end |
| 45 | + return cur |
| 46 | + end |
| 47 | +
|
| 48 | + -- Normalize possibly-string numbers with a default. |
| 49 | + local function to_num(v, default) |
| 50 | + if v == nil or v == '' then |
| 51 | + return default |
| 52 | + end |
| 53 | + local n = tonumber(v) |
| 54 | + if n ~= nil then |
| 55 | + return n |
| 56 | + end |
| 57 | + return default |
| 58 | + end |
| 59 | +
|
| 60 | + -- JSON-safe deep clone: strings/numbers/booleans/tables. Needed to prevent shared table references. |
| 61 | + local function clone_plain(val, seen) |
| 62 | + local tv = type(val) |
| 63 | + if tv ~= "table" then |
| 64 | + if tv == "string" or tv == "number" or tv == "boolean" or tv == "nil" then |
| 65 | + return val |
| 66 | + end |
| 67 | + return nil |
| 68 | + end |
| 69 | + seen = seen or {} |
| 70 | + if seen[val] then return nil end |
| 71 | + seen[val] = true |
| 72 | + local out = {} |
| 73 | + for k, v in pairs(val) do |
| 74 | + local tk = type(k) |
| 75 | + if tk == "string" or tk == "number" then |
| 76 | + local cv = clone_plain(v, seen) |
| 77 | + if cv ~= nil then out[k] = cv end |
| 78 | + end |
| 79 | + end |
| 80 | + seen[val] = nil |
| 81 | + return out |
| 82 | + end |
| 83 | +
|
| 84 | + local function apply_pod_template(pt_spec, requires) |
| 85 | + if pt_spec == nil then |
| 86 | + return |
| 87 | + end |
| 88 | +
|
| 89 | + local nodeSelector = clone_plain(pt_spec.nodeSelector) |
| 90 | + local tolerations = clone_plain(pt_spec.tolerations) |
| 91 | + local priority = pt_spec.priorityClassName |
| 92 | + local hardNodeAffinity = nil |
| 93 | + if pt_spec.affinity and pt_spec.affinity.nodeAffinity then |
| 94 | + hardNodeAffinity = clone_plain(pt_spec.affinity.nodeAffinity.requiredDuringSchedulingIgnoredDuringExecution) |
| 95 | + end |
| 96 | +
|
| 97 | + -- Only create nodeClaim if there is content |
| 98 | + if nodeSelector ~= nil or tolerations ~= nil or hardNodeAffinity ~= nil then |
| 99 | + requires.nodeClaim = requires.nodeClaim or {} |
| 100 | + requires.nodeClaim.nodeSelector = nodeSelector |
| 101 | + requires.nodeClaim.tolerations = tolerations |
| 102 | + requires.nodeClaim.hardNodeAffinity = hardNodeAffinity |
| 103 | + end |
| 104 | +
|
| 105 | + if not isempty(priority) then |
| 106 | + requires.priorityClassName = priority |
| 107 | + end |
| 108 | + end |
| 109 | +
|
| 110 | + local MIN_MEMORY_OVERHEAD = "384m" |
| 111 | + local JVM_DEFAULT_OVERHEAD_FACTOR = 0.10 |
| 112 | + local NON_JVM_DEFAULT_OVERHEAD_FACTOR = 0.40 |
| 113 | +
|
| 114 | + local kube_unit_order = { B = 0, Ki = 1, Mi = 2, Gi = 3, Ti = 4, Pi = 5 } |
| 115 | + local kube_unit_scale = { B = 1, Ki = 1024, Mi = 1024^2, Gi = 1024^3, Ti = 1024^4, Pi = 1024^5 } |
| 116 | +
|
| 117 | + local function parse_java_memory(java_mem_str) |
| 118 | + local value_str, unit = tostring(java_mem_str):lower():match("^(%d+%.?%d*)([a-z]*)$") |
| 119 | + local unit_map = { |
| 120 | + b = "B", |
| 121 | + kb = "Ki", k = "Ki", |
| 122 | + mb = "Mi", m = "Mi", |
| 123 | + gb = "Gi", g = "Gi", |
| 124 | + tb = "Ti", t = "Ti", |
| 125 | + pb = "Pi", p = "Pi", |
| 126 | + } |
| 127 | + local value = tonumber(value_str) |
| 128 | + return value, unit_map[unit] |
| 129 | + end |
| 130 | + |
| 131 | + local function convert_unit(value, from_unit, to_unit) |
| 132 | + if from_unit == to_unit then return value end |
| 133 | + local scale = kube_unit_scale[from_unit] / kube_unit_scale[to_unit] |
| 134 | + return value * scale |
| 135 | + end |
| 136 | +
|
| 137 | + local function smaller_unit(u1, u2) |
| 138 | + return kube_unit_order[u1] < kube_unit_order[u2] and u1 or u2 |
| 139 | + end |
| 140 | +
|
| 141 | + local function calculate_total_memory_request(memory_str, overhead_str, memory_overhead_factor, min_memory_overhead) |
| 142 | + local mem_value, mem_unit = parse_java_memory(memory_str) |
| 143 | + local over_value, over_unit |
| 144 | +
|
| 145 | + if overhead_str then |
| 146 | + over_value, over_unit = parse_java_memory(overhead_str) |
| 147 | + else |
| 148 | + local overhead_calc = mem_value * memory_overhead_factor |
| 149 | + local min_over_value, min_over_unit = parse_java_memory(min_memory_overhead) |
| 150 | +
|
| 151 | + local smaller_u = smaller_unit(mem_unit, min_over_unit) |
| 152 | + local overhead_calc_in_smaller = convert_unit(overhead_calc, mem_unit, smaller_u) |
| 153 | + local min_over_in_smaller = convert_unit(min_over_value, min_over_unit, smaller_u) |
| 154 | +
|
| 155 | + over_value, over_unit = math.max(overhead_calc_in_smaller, min_over_in_smaller), smaller_u |
| 156 | + end |
| 157 | +
|
| 158 | + local final_unit = smaller_unit(mem_unit, over_unit) |
| 159 | + local mem_in_final = convert_unit(mem_value, mem_unit, final_unit) |
| 160 | + local over_in_final = convert_unit(over_value, over_unit, final_unit) |
| 161 | + local total_value = mem_in_final + over_in_final |
| 162 | +
|
| 163 | + return string.format("%f%s", total_value, final_unit) |
| 164 | + end |
| 165 | +
|
| 166 | + function GetComponents(observedObj) |
| 167 | + local components = {} |
| 168 | +
|
| 169 | + local job_type = get(observedObj, {"spec","type"}) |
| 170 | + local job = job_type:lower() |
| 171 | + local is_jvm_job = (job == "java" or job == "scala") |
| 172 | + local default_overhead_factor = is_jvm_job and JVM_DEFAULT_OVERHEAD_FACTOR or NON_JVM_DEFAULT_OVERHEAD_FACTOR |
| 173 | + local memory_overhead_factor = to_num(get(observedObj, {"spec", "memoryOverheadFactor"}), default_overhead_factor) |
| 174 | +
|
| 175 | + -- Driver |
| 176 | + local drv_replicas = 1 -- Spark Driver always has 1 instance |
| 177 | + local drv_requires = { |
| 178 | + resourceRequest = {} |
| 179 | + } |
| 180 | +
|
| 181 | + local drv_cpu = get(observedObj, {"spec","driver","cores"}) or 1 |
| 182 | + local drv_memory_str = get(observedObj, {"spec", "driver", "memory"}) or "1g" |
| 183 | + local drv_overhead_str = get(observedObj, {"spec", "driver", "memoryOverhead"}) |
| 184 | + drv_requires.resourceRequest.cpu = drv_cpu |
| 185 | + drv_requires.resourceRequest.memory = calculate_total_memory_request(drv_memory_str, drv_overhead_str, memory_overhead_factor, MIN_MEMORY_OVERHEAD) |
| 186 | +
|
| 187 | + local drv_gpu = get(observedObj, {"spec","driver","gpu"}) |
| 188 | + if drv_gpu ~= nil then |
| 189 | + local gpu_name = drv_gpu.name |
| 190 | + local gpu_qty = to_num(drv_gpu.quantity, 0) |
| 191 | + if not isempty(gpu_name) and gpu_qty > 0 then |
| 192 | + drv_requires.resourceRequest[gpu_name] = gpu_qty |
| 193 | + end |
| 194 | + end |
| 195 | + |
| 196 | + apply_pod_template(get(observedObj, {"spec","driver"}), drv_requires) |
| 197 | + |
| 198 | + local driverComponent = { |
| 199 | + name = "driver", |
| 200 | + replicas = drv_replicas, |
| 201 | + replicaRequirements = drv_requires |
| 202 | + } |
| 203 | + table.insert(components, driverComponent) |
| 204 | + |
| 205 | + -- Executor |
| 206 | + local exe_replicas = to_num(get(observedObj, {"spec","executor","instances"}), 1) |
| 207 | + local dynamic_enabled = get(observedObj, {"spec", "dynamicAllocation", "enabled"}) or false |
| 208 | + if dynamic_enabled then |
| 209 | + local initial_exec = to_num(get(observedObj, {"spec", "dynamicAllocation", "initialExecutors"}), 1) |
| 210 | + local min_exec = to_num(get(observedObj, {"spec", "dynamicAllocation", "minExecutors"}), 1) |
| 211 | + if initial_exec > exe_replicas then |
| 212 | + exe_replicas = initial_exec |
| 213 | + end |
| 214 | + if min_exec > exe_replicas then |
| 215 | + exe_replicas = min_exec |
| 216 | + end |
| 217 | + end |
| 218 | + |
| 219 | + local exe_requires = { |
| 220 | + resourceRequest = {} |
| 221 | + } |
| 222 | + |
| 223 | + local exe_cpu = get(observedObj, {"spec","executor","cores"}) or 1 |
| 224 | + local exe_memory_str = get(observedObj, {"spec", "executor", "memory"}) or "1g" |
| 225 | + local exe_overhead_str = get(observedObj, {"spec", "executor", "memoryOverhead"}) |
| 226 | + exe_requires.resourceRequest.cpu = exe_cpu |
| 227 | + exe_requires.resourceRequest.memory = calculate_total_memory_request(exe_memory_str, exe_overhead_str, memory_overhead_factor, MIN_MEMORY_OVERHEAD) |
| 228 | + |
| 229 | + local exe_gpu = get(observedObj, {"spec","executor","gpu"}) |
| 230 | + if exe_gpu ~= nil then |
| 231 | + local gpu_name = exe_gpu.name |
| 232 | + local gpu_qty = to_num(exe_gpu.quantity, 0) |
| 233 | + if not isempty(gpu_name) and gpu_qty > 0 then |
| 234 | + exe_requires.resourceRequest[gpu_name] = gpu_qty |
| 235 | + end |
| 236 | + end |
| 237 | + |
| 238 | + apply_pod_template(get(observedObj, {"spec","executor"}), exe_requires) |
| 239 | + |
| 240 | + local executorComponent = { |
| 241 | + name = "executor", |
| 242 | + replicas = exe_replicas, |
| 243 | + replicaRequirements = exe_requires |
| 244 | + } |
| 245 | + table.insert(components, executorComponent) |
| 246 | + |
| 247 | + return components |
| 248 | + end |
| 249 | + statusAggregation: |
| 250 | + luaScript: > |
| 251 | + function AggregateStatus(desiredObj, statusItems) |
| 252 | + if statusItems == nil then |
| 253 | + return desiredObj |
| 254 | + end |
| 255 | + if desiredObj.status == nil then |
| 256 | + desiredObj.status = {} |
| 257 | + end |
| 258 | +
|
| 259 | + if #statusItems == 1 then |
| 260 | + desiredObj.status = statusItems[1].status |
| 261 | + return desiredObj |
| 262 | + end |
| 263 | +
|
| 264 | + local statePriority = { |
| 265 | + ["UNKNOWN"] = 0, |
| 266 | + [""] = 0, |
| 267 | + ["SUBMITTED"] = 1, |
| 268 | + ["PENDING_RERUN"] = 1, |
| 269 | + ["COMPLETED"] = 2, |
| 270 | + ["SUSPENDING"] = 3, |
| 271 | + ["SUSPENDED"] = 3, |
| 272 | + ["RUNNING"] = 4, |
| 273 | + ["RESUMING"] = 4, |
| 274 | + ["SUCCEEDING"] = 4, |
| 275 | + ["INVALIDATING"] = 5, |
| 276 | + ["FAILING"] = 6, |
| 277 | + ["FAILED"] = 6, |
| 278 | + ["SUBMISSION_FAILED"] = 6, |
| 279 | + } |
| 280 | +
|
| 281 | + local applicationState = {} |
| 282 | + local executionAttempts = 0 |
| 283 | + local executorState = {} |
| 284 | + local submissionAttempts = 0 |
| 285 | + local lastSubmissionAttemptTime = "" |
| 286 | + local terminationTime = "" |
| 287 | + local worstPriority = -1 |
| 288 | +
|
| 289 | + for i = 1, #statusItems do |
| 290 | + local currentStatus = statusItems[i].status |
| 291 | + if currentStatus ~= nil then |
| 292 | + if currentStatus.applicationState ~= nil then |
| 293 | + local s = currentStatus.applicationState.state |
| 294 | + local p = statePriority[s] |
| 295 | + if p > worstPriority then |
| 296 | + worstPriority = p |
| 297 | + applicationState = currentStatus.applicationState |
| 298 | + end |
| 299 | + end |
| 300 | + executionAttempts = executionAttempts + (currentStatus.executionAttempts or 0) |
| 301 | + submissionAttempts = submissionAttempts + (currentStatus.submissionAttempts or 0) |
| 302 | + if currentStatus.lastSubmissionAttemptTime and (lastSubmissionAttemptTime == "" or currentStatus.lastSubmissionAttemptTime > lastSubmissionAttemptTime) then |
| 303 | + lastSubmissionAttemptTime = currentStatus.lastSubmissionAttemptTime |
| 304 | + end |
| 305 | + if currentStatus.terminationTime and (terminationTime == "" or currentStatus.terminationTime > terminationTime) then |
| 306 | + terminationTime = currentStatus.terminationTime |
| 307 | + end |
| 308 | + if currentStatus.executorState ~= nil then |
| 309 | + for exec, state in pairs(currentStatus.executorState) do |
| 310 | + executorState[exec] = state |
| 311 | + end |
| 312 | + end |
| 313 | + end |
| 314 | + end |
| 315 | +
|
| 316 | + desiredObj.status.applicationState = applicationState |
| 317 | + desiredObj.status.executionAttempts = executionAttempts |
| 318 | + desiredObj.status.executorState = executorState |
| 319 | + desiredObj.status.submissionAttempts = submissionAttempts |
| 320 | + desiredObj.status.lastSubmissionAttemptTime = lastSubmissionAttemptTime |
| 321 | + desiredObj.status.terminationTime = terminationTime |
| 322 | + return desiredObj |
| 323 | + end |
| 324 | + statusReflection: |
| 325 | + luaScript: > |
| 326 | + function ReflectStatus(observedObj) |
| 327 | + local status = {} |
| 328 | + if observedObj == nil or observedObj.status == nil then |
| 329 | + return status |
| 330 | + end |
| 331 | +
|
| 332 | + status.applicationState = observedObj.status.applicationState |
| 333 | + status.driverInfo = observedObj.status.driverInfo |
| 334 | + status.executorState = observedObj.status.executorState |
| 335 | + status.sparkApplicationId = observedObj.status.sparkApplicationId |
| 336 | + status.lastSubmissionAttemptTime = observedObj.status.lastSubmissionAttemptTime |
| 337 | + status.submissionAttempts = observedObj.status.submissionAttempts |
| 338 | + status.executionAttempts = observedObj.status.executionAttempts |
| 339 | + status.submissionID = observedObj.status.submissionID |
| 340 | + status.terminationTime = observedObj.status.terminationTime |
| 341 | + return status |
| 342 | + end |
0 commit comments