diff --git a/docker/development/files/requirements.lock b/docker/development/files/requirements.lock index 102164142d1..bf419073064 100644 --- a/docker/development/files/requirements.lock +++ b/docker/development/files/requirements.lock @@ -36,7 +36,9 @@ cryptography==46.0.3 distlib==0.4.0 # via virtualenv doipclient==1.1.1 - # via -r tools/UdsTool/requirements.txt + # via + # -r test/pyTest/requirements.txt + # -r tools/UdsTool/requirements.txt dulwich==0.24.10 # via poetry exceptiongroup==1.2.2 diff --git a/test/pyTest/conftest.py b/test/pyTest/conftest.py index d0afa5c5bab..044bbc24978 100644 --- a/test/pyTest/conftest.py +++ b/test/pyTest/conftest.py @@ -1,4 +1,5 @@ -import pytest, os +import pytest +import os from can.interfaces import socketcan import isotp from doipclient import DoIPClient @@ -232,7 +233,6 @@ def pytest_generate_tests(metafunc): fixture_names += ",uds_transport" for name, target_info in TargetInfo.by_name.items(): - if not need_hw_tester and not need_uds_transport: all_targets_fixture_args.append(name) diff --git a/test/pyTest/helper/helper.py b/test/pyTest/helper/helper.py deleted file mode 100644 index 0b55c4bef66..00000000000 --- a/test/pyTest/helper/helper.py +++ /dev/null @@ -1,18 +0,0 @@ -import subprocess -import time -import os - - -def run_process(cmd): - session_name = f"RefAppTest{os.getpid()}" - subprocess.run(["tmux", "new-session", "-d", "-s", session_name]) - subprocess.run(["tmux", "send-keys", "-t", session_name, cmd, "C-m"]) - time.sleep(1) - - result = subprocess.run( - ["tmux", "capture-pane", "-t", session_name, "-p"], - capture_output=True, - text=True, - ) - subprocess.run(["tmux", "kill-session", "-t", session_name]) - return result.stdout.strip().splitlines()[-4:-1] diff --git a/test/pyTest/uds/helpers/helper_functions.py b/test/pyTest/uds/helpers/helper_functions.py index 7af0666bf1f..3bd32a7c089 100644 --- a/test/pyTest/uds/helpers/helper_functions.py +++ b/test/pyTest/uds/helpers/helper_functions.py @@ -1,5 +1,10 @@ import binascii import udsoncan +import subprocess +import time +import select +import os + class ByteCodec(udsoncan.DidCodec): def encode(self, value: bytes) -> bytes: @@ -8,7 +13,68 @@ def encode(self, value: bytes) -> bytes: def decode(self, data: bytes) -> bytes: return data + def hexlify(value): response = binascii.hexlify(value).decode("ascii") - formatted_response = ' '.join(response[i:i+2] for i in range(0, len(response), 2)) - return formatted_response \ No newline at end of file + formatted_response = " ".join( + response[i : i + 2] for i in range(0, len(response), 2) + ) + return formatted_response + + +def run_process(cmd, output_contains, timeout=5): + """Run a shell command and wait until specific output appears. + + The command is executed in ``/bin/bash`` using ``subprocess``. The function waits + until `` output_contains`` is matched in the process output and returns the captured + text up to and including the matched token, plus the remainder of that line + (if any). + + Args: + cmd: Shell command to execute. + output_contains: Output text or pattern to wait for. + timeout: Maximum number of seconds to wait for `` output_contains``. + + Returns: + Captured process output containing `` output_contains``. + + Raises: + AssertionError: If the process times out or exits before producing + `` output_contains``. + """ + proc = subprocess.Popen( + ["/bin/bash", "-c", cmd], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=False, + bufsize=0, + ) + + poller = select.poll() + poller.register(proc.stdout, select.POLLIN) + + output = [] + deadline = time.monotonic() + timeout + try: + while True: + remaining_ms = max(0, int((deadline - time.monotonic()) * 1000)) + if remaining_ms < 0: + raise subprocess.TimeoutExpired(cmd=cmd, timeout=timeout) + events = poller.poll(remaining_ms) + if not events: + if remaining_ms == 0: + raise subprocess.TimeoutExpired(cmd=cmd, timeout=timeout) + continue + chunk = os.read(proc.stdout.fileno(), 4096).decode() + if not chunk: + if proc.poll() is not None: + break + output.append(chunk) + combined = "".join(output) + + if output_contains in combined: + return combined + raise AssertionError(f"Did not find {output_contains} within {timeout}s\n Output:\n{''.join(output)}") + finally: + proc.kill() + proc.wait() diff --git a/test/pyTest/uds/test_udsToolRDBI.py b/test/pyTest/uds/test_udsToolRDBI.py index e6520f86b3f..39fa968faf3 100644 --- a/test/pyTest/uds/test_udsToolRDBI.py +++ b/test/pyTest/uds/test_udsToolRDBI.py @@ -1,6 +1,5 @@ import os -import helper.helper as helper -from target_info import TargetInfo +from helpers.helper_functions import run_process def create_uds_tool_command(channel, did): @@ -14,13 +13,11 @@ def test_rdbi(target_session, did="CF01"): command = create_uds_tool_command( target_session.target_info.socketcan["channel"], did ) - print(command) - output = helper.run_process(command) + output = run_process(command, "PositiveResponse", timeout=5) + lines = output.strip().splitlines() - assert any( - "PositiveResponse" in line for line in output - ), "Missing PositiveResponse" + assert any("PositiveResponse" in line for line in lines), "Missing PositiveResponse" assert any( "62cf01010200022202160f0100006d2f0000010600008fe0000001" in line - for line in output + for line in lines ), "Incorrect expected output"