@@ -31,13 +31,17 @@ Defines options for job execution behavior.
3131- `retry_delays::Union{Base.ExponentialBackOff, Nothing}`: Delay strategy for retries (defaults to exponential backoff if `retries > 0`).
3232- `retry_check`: Custom function to determine retry behavior (`check` argument from `Base.retry`).
3333- `on_fail_policy::Union{Tuple{Symbol, Int}, Nothing}`: Determines job failure handling (`:ignore`, `:disable`, `:unschedule`), with a threshold for consecutive failures.
34+ - `max_executions::Union{Int, Nothing}`: Maximum number of executions allowed for a job.
35+ - `expires_at::Union{DateTime, Nothing}`: Expiration time for a job.
3436"""
3537@kwdef struct JobOptions
3638 overlap_policy:: Union{Symbol, Nothing} = nothing # :skip, :queue, :concurrent
3739 retries:: Int = 0
3840 retry_delays:: Union{Base.ExponentialBackOff, Nothing} = retries > 0 ? Base. ExponentialBackOff (; n= retries) : nothing # see Base.ExponentialBackOff
3941 retry_check = nothing # see Base.retry `check` keyword argument
4042 on_fail_policy:: Union{Tuple{Symbol, Int}, Nothing} = nothing # :ignore, :disable, :unschedule + max consecutive failures for disable/unschedule
43+ max_executions:: Union{Int, Nothing} = nothing # max number of executions job is allowed to run
44+ expires_at:: Union{DateTime, Nothing} = nothing # expiration time for job
4145end
4246
4347"""
@@ -53,16 +57,15 @@ Represents a scheduled job in the Tempus scheduler.
5357- `disabledAt::Union{DateTime, Nothing}`: Timestamp when the job was disabled (if applicable).
5458"""
5559mutable struct Job
60+ const action:: Function
5661 const name:: String
5762 const schedule:: Cron
58- const action:: Function
5963 const options:: JobOptions
6064 # fields managed by scheduler
6165 disabledAt:: Union{DateTime, Nothing}
6266end
6367
64- Job (name, schedule, action:: Function ; kw... ) = Job (string (name), schedule isa Cron ? schedule : parseCron (schedule), action, JobOptions (kw... ), nothing )
65- Job (action:: Function , name, schedule; kw... ) = Job (name, schedule, action; kw... )
68+ Job (action:: Function , name, schedule; kw... ) = Job (action, string (name), schedule isa Cron ? schedule : parseCron (schedule), JobOptions (kw... ), nothing )
6669
6770"""
6871 disable!(job::Job)
@@ -325,6 +328,25 @@ function checkDisable!(scheduler::Scheduler, store::Store, job::Job, on_fail_pol
325328 return
326329end
327330
331+ function checkExpirationAndMaxExecutions! (scheduler:: Scheduler , job:: Job )
332+ max_executions = _some (job. options. max_executions, scheduler. jobOptions. max_executions)
333+ expires_at = _some (job. options. expires_at, scheduler. jobOptions. expires_at)
334+ if max_executions != = nothing
335+ execs = getNMostRecentJobExecutions (scheduler. store, job. name, max_executions)
336+ if length (execs) >= max_executions
337+ unschedule! (scheduler, job)
338+ @info " Unscheduling job $(job. name) after reaching maximum executions of $(max_executions) ."
339+ return true
340+ end
341+ end
342+ if expires_at != = nothing && expires_at < Dates. now (UTC)
343+ unschedule! (scheduler, job)
344+ @info " Unscheduling job $(job. name) after expiration at $(expires_at) ."
345+ return true
346+ end
347+ return false
348+ end
349+
328350"""
329351 run!(scheduler::Scheduler)
330352
@@ -371,8 +393,10 @@ function run!(scheduler::Scheduler)
371393 @warn " Job $(je. job. name) already executing, keeping scheduled execution queued until current execution finishes. There are $nexecs queued for this job."
372394 # check if we need to schedule the next execution
373395 next = getnext (je. job. schedule)
374- if any (j -> j. job. name == je. job. name && j. scheduledStart == next, scheduler. jobExecutions)
375- # we've already scheduled this execution, skip
396+ # check job expiration and max executions
397+ unscheduled = checkExpirationAndMaxExecutions! (scheduler, je. job)
398+ if unscheduled || any (j -> j. job. name == je. job. name && j. scheduledStart == next, scheduler. jobExecutions)
399+ # job is unscheduled or we've already scheduled this execution, skip
376400 else
377401 push! (scheduler. jobExecutions, JobExecution (je. job, next))
378402 end
@@ -390,6 +414,8 @@ function run!(scheduler::Scheduler)
390414 for (i, toSkip, je) in readyToExecute
391415 deleteat! (scheduler. jobExecutions, i)
392416 next = getnext (je. job. schedule)
417+ # check job expiration and max executions
418+ checkExpirationAndMaxExecutions! (scheduler, je. job) && continue
393419 newje = JobExecution (je. job, next)
394420 push! (scheduler. jobExecutions, newje)
395421 if je. job. disabledAt != = nothing
@@ -466,18 +492,23 @@ function executeJob!(scheduler::Scheduler, jobExecution::JobExecution)
466492 return
467493end
468494
469- currentlyExecutionJobs (sch:: Scheduler ) = [je. jobExecutionId for je in sch. executingJobExecutions]
470-
471495"""
472496 close(scheduler::Scheduler)
473497
474498Closes the scheduler, stopping job execution; waits for any currently executing jobs to finish.
499+ Will wait `timeout` seconds (5 by default) for any currently executing jobs to finish before returning.
475500"""
476- function Base. close (scheduler:: Scheduler )
477- @info " Closing scheduler and stopping job execution ."
501+ function Base. close (scheduler:: Scheduler ; timeout :: Real = 5 )
502+ @info " Closing scheduler and waiting $(timeout) s for job executions to stop ."
478503 @lock scheduler. lock begin
479504 scheduler. running = false
480505 end
506+ # we use a Timer here to notify jobExecutionFinished ourself if the scheduler
507+ # or last executing job doesn't do it themselves in time
508+ Timer (timeout) do t
509+ @warn " Scheduler closing timeout reached, returning without waiting for job executions to finish."
510+ notify (scheduler. jobExecutionFinished)
511+ end
481512 wait (scheduler. jobExecutionFinished)
482513 @info " Scheduler closed and job execution stopped."
483514 return
0 commit comments