Skip to content

Commit b3ed289

Browse files
committed
Add run_forward_models
1 parent b4c4666 commit b3ed289

File tree

1 file changed

+124
-43
lines changed

1 file changed

+124
-43
lines changed

src/backends.jl

Lines changed: 124 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ function calibrate(
150150
end
151151

152152
function calibrate(
153-
::Type{JuliaBackend},
153+
b::Type{JuliaBackend},
154154
ekp::EKP.EnsembleKalmanProcess,
155155
n_iterations,
156156
prior,
@@ -159,27 +159,9 @@ function calibrate(
159159
ekp = initialize(ekp, prior, output_dir)
160160
ensemble_size = EKP.get_N_ens(ekp)
161161

162-
on_error(e::InterruptException) = rethrow(e)
163-
on_error(e) =
164-
@error "Single ensemble member has errored. See stacktrace" exception =
165-
(e, catch_backtrace())
166-
167162
first_iter = last_completed_iteration(output_dir) + 1
168163
for iter in first_iter:(n_iterations - 1)
169-
errors = 0
170-
@info "Running iteration $iter"
171-
foreach(1:ensemble_size) do m
172-
try
173-
forward_model(iter, m)
174-
@info "Completed member $m"
175-
catch e
176-
errors += 1
177-
on_error(e)
178-
end
179-
end
180-
if errors == ensemble_size
181-
error("Full ensemble has failed, aborting calibration.")
182-
end
164+
run_forward_models(b, iter, ensemble_size)
183165
ekp = load_ekp_struct(output_dir, iter)
184166
terminate = observation_map_and_update!(ekp, output_dir, iter, prior)
185167
!isnothing(terminate) && break
@@ -275,21 +257,14 @@ function calibrate(
275257

276258
first_iter = last_completed_iteration(output_dir) + 1
277259
for iter in first_iter:(n_iterations - 1)
278-
@info "Running Iteration $iter"
279-
(; time) = @timed run_worker_iteration(
260+
run_forward_models(
261+
b,
280262
iter,
281-
ensemble_size,
282-
output_dir;
263+
ensemble_size;
264+
output_dir,
283265
worker_pool,
284266
failure_rate,
285267
)
286-
formatted_time =
287-
Dates.canonicalize(Dates.Millisecond(round(time * 1000)))
288-
if isempty(Dates.periods(formatted_time))
289-
@info "Iteration $iter time: 0 second"
290-
else
291-
@info "Iteration $iter time: $formatted_time"
292-
end
293268
ekp = load_ekp_struct(output_dir, iter)
294269
terminate = observation_map_and_update!(ekp, output_dir, iter, prior)
295270
!isnothing(terminate) && break
@@ -314,25 +289,20 @@ function calibrate(
314289
ensemble_size = EKP.get_N_ens(ekp)
315290
@info "Initializing calibration" n_iterations ensemble_size output_dir
316291
ekp = initialize(ekp, prior, output_dir)
317-
module_load_str = module_load_string(b)
318292
first_iter = last_completed_iteration(output_dir) + 1
319293

320294
for iter in first_iter:(n_iterations - 1)
321-
run_hpc_iteration(
295+
run_forward_models(
322296
b,
323-
ekp,
324297
iter,
325-
ensemble_size,
298+
ensemble_size;
326299
output_dir,
300+
hpc_kwargs,
327301
experiment_dir,
328302
model_interface,
329-
module_load_str,
330-
prior;
331-
hpc_kwargs = hpc_kwargs,
332-
verbose = verbose,
303+
verbose,
333304
exeflags,
334305
)
335-
@info "Completed iteration $iter, updating ensemble"
336306
ekp = load_ekp_struct(output_dir, iter)
337307
terminate = observation_map_and_update!(ekp, output_dir, iter, prior)
338308
!isnothing(terminate) && break
@@ -343,14 +313,12 @@ end
343313

344314
function run_hpc_iteration(
345315
b::Type{<:HPCBackend},
346-
ekp::EKP.EnsembleKalmanProcess,
347316
iter,
348317
ensemble_size,
349318
output_dir,
350319
experiment_dir,
351320
model_interface,
352-
module_load_str,
353-
prior;
321+
module_load_str;
354322
hpc_kwargs,
355323
verbose = false,
356324
exeflags = "",
@@ -460,3 +428,116 @@ backend_worker_kwargs(::Type{DerechoBackend}) = (; q = "main", A = "UCIT0011")
460428
backend_worker_kwargs(::Type{GCPBackend}) = (; partition = "a3")
461429

462430
backend_worker_kwargs(::Type{<:AbstractBackend}) = (;)
431+
432+
433+
"""
434+
run_forward_models(
435+
b::Type{WorkerBackend},
436+
iter,
437+
ensemble_size;
438+
output_dir
439+
worker_pool,
440+
failure_rate,
441+
)
442+
443+
Run the forward models for a single iteration for the `WorkerBackend`.
444+
"""
445+
function run_forward_models(
446+
b::Type{WorkerBackend},
447+
iter,
448+
ensemble_size;
449+
output_dir,
450+
failure_rate = DEFAULT_FAILURE_RATE,
451+
worker_pool = default_worker_pool(),
452+
)
453+
@info "Running Iteration $iter"
454+
(; time) = @timed run_worker_iteration(
455+
iter,
456+
ensemble_size,
457+
output_dir;
458+
worker_pool,
459+
failure_rate,
460+
)
461+
formatted_time = Dates.canonicalize(Dates.Millisecond(round(time * 1000)))
462+
if isempty(Dates.periods(formatted_time))
463+
@info "Iteration $iter time: 0 second"
464+
else
465+
@info "Iteration $iter time: $formatted_time"
466+
end
467+
return nothing
468+
end
469+
470+
"""
471+
run_forward_models(b::Type{JuliaBackend}, iter, ensemble_size)
472+
473+
Run the forward models for a single iteration for the `JuliaBackend`.
474+
"""
475+
# TODO: I don't understand why this doesn't require an output_dir
476+
function run_forward_models(b::Type{JuliaBackend}, iter, ensemble_size)
477+
on_error(e::InterruptException) = rethrow(e)
478+
on_error(e) =
479+
@error "Single ensemble member has errored. See stacktrace" exception =
480+
(e, catch_backtrace())
481+
errors = 0
482+
@info "Running iteration $iter"
483+
foreach(1:ensemble_size) do m
484+
try
485+
forward_model(iter, m)
486+
@info "Completed member $m"
487+
catch e
488+
errors += 1
489+
on_error(e)
490+
end
491+
end
492+
if errors == ensemble_size
493+
error("Full ensemble has failed, aborting calibration.")
494+
end
495+
return nothing
496+
end
497+
498+
"""
499+
run_forward_models(
500+
b::Type{<:HPCBackend},
501+
iter,
502+
ensemble_size;
503+
output_dir,
504+
experiment_dir,
505+
model_interface,
506+
module_load_str,
507+
hpc_kwargs,
508+
verbose,
509+
exeflags,
510+
)
511+
512+
Run the forward models for a single iteration for the `HPCBackend`.
513+
"""
514+
function run_forward_models(
515+
b::Type{<:HPCBackend},
516+
iter,
517+
ensemble_size;
518+
output_dir,
519+
hpc_kwargs,
520+
experiment_dir = project_dir(),
521+
model_interface = abspath(
522+
joinpath(experiment_dir, "..", "..", "model_interface.jl"),
523+
),
524+
verbose = false,
525+
exeflags = "",
526+
)
527+
module_load_str = module_load_string(b)
528+
run_hpc_iteration(
529+
b,
530+
iter,
531+
ensemble_size,
532+
output_dir,
533+
experiment_dir,
534+
model_interface,
535+
module_load_str;
536+
hpc_kwargs = hpc_kwargs,
537+
verbose = verbose,
538+
exeflags = exeflags,
539+
)
540+
541+
@info "Completed iteration $iter, updating ensemble"
542+
return nothing
543+
end

0 commit comments

Comments
 (0)