Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docker/development/files/requirements.lock
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ cryptography==46.0.3
distlib==0.4.0
# via virtualenv
doipclient==1.1.1
# via -r tools/UdsTool/requirements.txt
# via
# -r test/pyTest/requirements.txt
# -r tools/UdsTool/requirements.txt
dulwich==0.24.10
# via poetry
exceptiongroup==1.2.2
Expand Down
4 changes: 2 additions & 2 deletions test/pyTest/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pytest, os
import pytest
import os
from can.interfaces import socketcan
import isotp
from doipclient import DoIPClient
Expand Down Expand Up @@ -232,7 +233,6 @@ def pytest_generate_tests(metafunc):
fixture_names += ",uds_transport"

for name, target_info in TargetInfo.by_name.items():

if not need_hw_tester and not need_uds_transport:
all_targets_fixture_args.append(name)

Expand Down
18 changes: 0 additions & 18 deletions test/pyTest/helper/helper.py

This file was deleted.

70 changes: 68 additions & 2 deletions test/pyTest/uds/helpers/helper_functions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import binascii
import udsoncan
import subprocess
import time
import select
import os


class ByteCodec(udsoncan.DidCodec):
def encode(self, value: bytes) -> bytes:
Expand All @@ -8,7 +13,68 @@ def encode(self, value: bytes) -> bytes:
def decode(self, data: bytes) -> bytes:
return data


def hexlify(value):
response = binascii.hexlify(value).decode("ascii")
formatted_response = ' '.join(response[i:i+2] for i in range(0, len(response), 2))
return formatted_response
formatted_response = " ".join(
response[i : i + 2] for i in range(0, len(response), 2)
)
return formatted_response


def run_process(cmd, output_contains, timeout=5):
"""Run a shell command and wait until specific output appears.

The command is executed in ``/bin/bash`` using ``subprocess``. The function waits
until `` output_contains`` is matched in the process output and returns the captured
text up to and including the matched token, plus the remainder of that line
(if any).

Args:
cmd: Shell command to execute.
output_contains: Output text or pattern to wait for.
timeout: Maximum number of seconds to wait for `` output_contains``.

Returns:
Captured process output containing `` output_contains``.

Raises:
pytest.fail: If the process times out or exits before producing
`` output_contains``.
"""
proc = subprocess.Popen(
["/bin/bash", "-c", cmd],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=0,
)

poller = select.poll()
poller.register(proc.stdout, select.POLLIN)

output = []
deadline = time.monotonic() + timeout
try:
while True:
remaining_ms = max(0, int((deadline - time.monotonic()) * 1000))
events = poller.poll(remaining_ms)
if not events:
if proc.poll() is not None:
break
if remaining_ms <= 0:
break
continue
chunk = os.read(proc.stdout.fileno(), 4096).decode()
if not chunk:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check if the process has finished?

if not chunk:
   if proc.poll() is not None:
      break

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

proc.poll() check is added after poller.poll()

break
output.append(chunk)
combined = "".join(output)

if output_contains in combined:
return combined

raise AssertionError(f"[TIMEOUT] Did not find {output_contains} within {timeout}s\n Output:\n{''.join(output)}")
finally:
proc.kill()
proc.wait()
13 changes: 5 additions & 8 deletions test/pyTest/uds/test_udsToolRDBI.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os
import helper.helper as helper
from target_info import TargetInfo
from helpers.helper_functions import run_process


def create_uds_tool_command(channel, did):
Expand All @@ -14,13 +13,11 @@ def test_rdbi(target_session, did="CF01"):
command = create_uds_tool_command(
target_session.target_info.socketcan["channel"], did
)
print(command)
output = helper.run_process(command)
output = run_process(command, "PositiveResponse", timeout=5)
lines = output.strip().splitlines()

assert any(
"PositiveResponse" in line for line in output
), "Missing PositiveResponse"
assert any("PositiveResponse" in line for line in lines), "Missing PositiveResponse"
assert any(
"62cf01010200022202160f0100006d2f0000010600008fe0000001" in line
for line in output
for line in lines
), "Incorrect expected output"
Loading