Skip to content

Commit 6464bf2

Browse files
authored
Merge pull request #540 from camunda-community-hub/add-logging-error-in-job-handler
fix: add logging a exception in job execution task
2 parents 3533397 + 1e42f6a commit 6464bf2

File tree

6 files changed

+49
-2
lines changed

6 files changed

+49
-2
lines changed

pyproject.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ profile = "black"
7171

7272
[tool.pytest.ini_options]
7373
asyncio_mode = "auto"
74+
markers = [
75+
"e2e: end to end tests",
76+
]
7477

7578
[tool.ruff]
7679
target-version = "py39"

pyzeebe/worker/job_executor.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import annotations
2+
13
import asyncio
24
import logging
35
from typing import Callable
@@ -14,7 +16,7 @@
1416

1517

1618
class JobExecutor:
17-
def __init__(self, task: Task, jobs: "asyncio.Queue[Job]", task_state: TaskState, zeebe_adapter: ZeebeAdapter):
19+
def __init__(self, task: Task, jobs: asyncio.Queue[Job], task_state: TaskState, zeebe_adapter: ZeebeAdapter):
1820
self.task = task
1921
self.jobs = jobs
2022
self.task_state = task_state
@@ -45,7 +47,11 @@ async def stop(self) -> None:
4547

4648

4749
def create_job_callback(job_executor: JobExecutor, job: Job) -> AsyncTaskCallback:
48-
def callback(_: "asyncio.Future[None]") -> None:
50+
def callback(fut: asyncio.Future[None]) -> None:
51+
err = fut.done() and not fut.cancelled() and fut.exception()
52+
if err:
53+
logger.exception("Error in job executor. Task: %s. Error: %s.", job.type, err, exc_info=err)
54+
4955
job_executor.jobs.task_done()
5056
job_executor.task_state.remove(job)
5157

tests/integration/cancel_process_test.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
1+
import pytest
2+
13
from pyzeebe import ZeebeClient
24

35

6+
@pytest.mark.e2e
47
async def test_cancel_process(zeebe_client: ZeebeClient, process_name: str, process_variables: dict):
58
response = await zeebe_client.run_process(process_name, process_variables)
69

tests/integration/publish_message_test.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1+
import pytest
2+
13
from pyzeebe import ZeebeClient
24
from tests.integration.utils import ProcessStats
35
from tests.integration.utils.wait_for_process import wait_for_process_with_variables
46

57

8+
@pytest.mark.e2e
69
async def test_publish_message(zeebe_client: ZeebeClient, process_stats: ProcessStats, process_variables: dict):
710
initial_amount_of_processes = process_stats.get_process_runs()
811

tests/integration/run_process_test.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
PROCESS_TIMEOUT_IN_MS = 60_000
1010

1111

12+
@pytest.mark.e2e
1213
async def test_run_process(
1314
zeebe_client: ZeebeClient, process_name: str, process_variables: dict, process_stats: ProcessStats
1415
):
@@ -20,11 +21,13 @@ async def test_run_process(
2021
assert process_stats.get_process_runs() == initial_amount_of_processes + 1
2122

2223

24+
@pytest.mark.e2e
2325
async def test_non_existent_process(zeebe_client: ZeebeClient):
2426
with pytest.raises(ProcessDefinitionNotFoundError):
2527
await zeebe_client.run_process(str(uuid4()))
2628

2729

30+
@pytest.mark.e2e
2831
async def test_run_process_with_result(zeebe_client: ZeebeClient, process_name: str, process_variables: dict):
2932
response = await zeebe_client.run_process_with_result(
3033
process_name, process_variables, timeout=PROCESS_TIMEOUT_IN_MS

tests/unit/worker/job_executor_test.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import json
23
from unittest.mock import AsyncMock, Mock
34

45
import pytest
@@ -80,3 +81,31 @@ def test_signals_that_job_is_done(self, job_executor: JobExecutor, job_from_task
8081

8182
task_done_mock.assert_called_once()
8283
remove_from_task_state_mock.assert_called_once_with(job_from_task)
84+
85+
def test_signals_that_job_is_done_with_exception(
86+
self, job_executor: JobExecutor, job_from_task: Job, caplog: pytest.LogCaptureFixture
87+
):
88+
task_done_mock = Mock()
89+
remove_from_task_state_mock = Mock()
90+
job_executor.jobs.task_done = task_done_mock
91+
job_executor.task_state.remove = remove_from_task_state_mock
92+
93+
callback = create_job_callback(job_executor, job_from_task)
94+
95+
exception = None
96+
try:
97+
json.dumps({"foo": object})
98+
except TypeError as err:
99+
exception = err
100+
101+
assert exception
102+
103+
fut = asyncio.Future()
104+
fut.set_exception(exception)
105+
callback(fut)
106+
107+
task_done_mock.assert_called_once()
108+
remove_from_task_state_mock.assert_called_once_with(job_from_task)
109+
110+
assert len(caplog.records) == 1
111+
assert caplog.records[0].getMessage().startswith("Error in job executor. Task:")

0 commit comments

Comments
 (0)