Skip to content

Commit cedbe51

Browse files
committed
feat: Add exit job functionality and enhance job error handling with JobKillSignalError
1 parent 896a5cd commit cedbe51

File tree

5 files changed

+35
-13
lines changed

5 files changed

+35
-13
lines changed

examples/jobs/desktop_example.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,13 @@ jobs:
66
params:
77
message: "Rclone isnt running"
88

9+
- id: exit_job
10+
name: Exit Job
11+
enabled: false
12+
plugin: exit
13+
params:
14+
code: 1
15+
916
- id: check_rclone_health
1017
name: Check Rclone Health
1118
plugin: binary
@@ -19,3 +26,5 @@ jobs:
1926
"
2027
on_failure:
2128
- notification_on_failure
29+
# you can send kill switch to exit taskcrafter
30+
# - exit_job

taskcrafter/exceptions/job.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,7 @@ class JobFailedError(JobError):
1212

1313
class JobValidationError(JobError):
1414
pass
15+
16+
17+
class JobKillSignalError(JobError):
18+
pass

taskcrafter/job_loader.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33
from datetime import datetime
44
from taskcrafter.exceptions.job import (
55
JobFailedError,
6+
JobKillSignalError,
67
JobNotFoundError,
78
)
8-
from taskcrafter.exceptions.plugin import PluginExecutionTimeoutError
9+
from taskcrafter.exceptions.plugin import (
10+
PluginExecutionError,
11+
PluginExecutionTimeoutError,
12+
)
913
from taskcrafter.plugin_loader import plugin_execute
1014
from taskcrafter.logger import app_logger
1115
from taskcrafter.container import run_job_in_docker
@@ -162,7 +166,11 @@ def run_job(self, job: Job, execution_stack: list[str] = [], force: bool = False
162166
queue_result = queue.get()
163167

164168
if isinstance(queue_result, Exception) or job.plugin == "exit":
165-
raise queue_result
169+
# add job to executed jobs since its kill switch afterwards.
170+
job.result.stop()
171+
job.result.set_status(JobStatus.ERROR)
172+
self.executed_jobs.append(deepcopy(job))
173+
raise JobKillSignalError(queue_result)
166174

167175
process.terminate()
168176

@@ -185,7 +193,7 @@ def run_job(self, job: Job, execution_stack: list[str] = [], force: bool = False
185193
job.result.set_status(JobStatus.ERROR)
186194
break
187195

188-
except Exception as e:
196+
except PluginExecutionError as e:
189197
app_logger.error(
190198
f"Job {job.id} executed with exception ({type(e)}): {e}"
191199
)

taskcrafter/preview.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ def result_table(jobs: list[Job]):
7979
case JobStatus.SUCCESS:
8080
status = Text(job.result.get_status().value, "green")
8181
case _:
82-
continue
82+
status = Text("n/a", "yellow")
8383

8484
table.add_row(
8585
str(job.id),

taskcrafter/scheduler.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
JobEvent,
1212
)
1313
from taskcrafter.exceptions.hook import HookNotFound
14+
from taskcrafter.exceptions.job import JobKillSignalError
1415
from taskcrafter.logger import app_logger
1516
from taskcrafter.job_loader import JobManager
1617
from taskcrafter.hook_loader import HookManager
@@ -36,7 +37,7 @@ def start_scheduler(self):
3637
self.scheduler.add_listener(self.event_listener_job, EVENT_ALL)
3738
self.scheduler.start()
3839

39-
app_logger.info("Scheduler started.")
40+
app_logger.debug("Scheduler started.")
4041

4142
# check and execute BEFORE_ALL hook
4243
self.schedule_hook_jobs(HookType.BEFORE_ALL)
@@ -47,7 +48,7 @@ def start_scheduler(self):
4748
time.sleep(1)
4849
except (KeyboardInterrupt, SystemExit):
4950
self.stop_scheduler()
50-
app_logger.info("Scheduler stopped.")
51+
app_logger.debug("Scheduler stopped.")
5152

5253
def event_listener_job(self, event):
5354
if isinstance(event, JobEvent):
@@ -57,6 +58,13 @@ def event_listener_job(self, event):
5758
self.schedule_hook_jobs(HookType.BEFORE_JOB, event)
5859
elif isinstance(event, JobExecutionEvent):
5960
if event.exception:
61+
if isinstance(event.exception, JobKillSignalError):
62+
app_logger.warning(
63+
f"Job {job_id} is exit job, scheduler will be stopped."
64+
)
65+
self._event.set()
66+
return
67+
6068
self.schedule_hook_jobs(HookType.ON_ERROR, event)
6169
app_logger.error(
6270
f"scheduler: {event.job_id} failed with exception: {event.exception}"
@@ -73,13 +81,6 @@ def event_listener_job(self, event):
7381

7482
self.schedule_hook_jobs(HookType.AFTER_JOB, event)
7583

76-
if self.job_manager.job_get_by_id(job_id).plugin == "exit":
77-
app_logger.warning(
78-
f"Job {job_id} is exit job, scheduler will be stopped."
79-
)
80-
self._event.set()
81-
return
82-
8384
if self.job_manager.get_in_progress() == 0:
8485
hook_executed = self.schedule_hook_jobs(HookType.AFTER_ALL, event)
8586

0 commit comments

Comments
 (0)