diff --git a/src/defib/cli/app.py b/src/defib/cli/app.py index 442419d..c6df023 100644 --- a/src/defib/cli/app.py +++ b/src/defib/cli/app.py @@ -1644,7 +1644,7 @@ def install( host_ip: str = typer.Option("192.168.1.10", "--host-ip", help="IP to assign to host NIC for TFTP"), device_ip: str = typer.Option("192.168.1.20", "--device-ip", help="IP for camera in U-Boot"), tftp_port: int = typer.Option(69, "--tftp-port", help="TFTP server port"), - nor_size: int = typer.Option(8, "--nor-size", help="NOR flash size in MB (8 or 16)"), + nor_size: int = typer.Option(8, "--nor-size", help="NOR flash size in MB (8, 16, or 32)"), nand: bool = typer.Option(False, "--nand", help="Use NAND flash instead of NOR"), output: str = typer.Option("human", "--output", help="Output mode: human, json"), debug: bool = typer.Option(False, "-d", "--debug", help="Enable debug logging"), @@ -1678,6 +1678,16 @@ def install( "rootfs": (0x350000, 0xA00000), # 10240KB } +# 32MB NOR flash layout — OpenIPC U-Boot has no setnor32m env var, so we +# send mtdparts directly. Pattern continues 8m/16m: same boot/env/kernel +# offsets, larger rootfs to use the extra space. +_NOR32M_LAYOUT = { + "boot": (0x000000, 0x40000), # 256KB + "env": (0x040000, 0x10000), # 64KB + "kernel": (0x050000, 0x300000), # 3MB + "rootfs": (0x350000, 0x1800000), # 24MB +} + # NAND flash layout: 1M(boot),1M(env),8M(kernel),-(ubi) _NAND_LAYOUT = { "boot": (0x000000, 0x100000), # 1MB @@ -1753,7 +1763,12 @@ async def _install_async( flash_cmd = "nand" flash_label = "NAND" else: - layout = _NOR16M_LAYOUT if nor_size >= 16 else _NOR8M_LAYOUT + if nor_size >= 32: + layout = _NOR32M_LAYOUT + elif nor_size >= 16: + layout = _NOR16M_LAYOUT + else: + layout = _NOR8M_LAYOUT flash_cmd = "sf" flash_label = f"NOR {nor_size}MB" @@ -2198,13 +2213,19 @@ async def tftp_and_flash( bootargs = _nand_bootargs(rootfs_is_ubi=is_ubi_image(rootfs_data)) await _cmd(f"setenv bootargs {bootargs}", timeout=3.0) else: - nor_cmd = "setnor8m" if nor_size < 16 else "setnor16m" if output == "human": - console.print(f"\n [bold]Setting boot environment[/bold] (run {nor_cmd})") - # setnor8m does: set mtdparts, set bootcmd, saveenv, reset - # We do it manually to avoid the auto-reset - mtdparts_var = f"mtdpartsnor{nor_size}m" - await _cmd(f"run {mtdparts_var}", timeout=3.0) + console.print("\n [bold]Setting boot environment[/bold]") + # OpenIPC U-Boot defines mtdpartsnor{8,16}m env vars but + # not 32m — for 32MB, send the raw mtdparts string. + if nor_size >= 32: + mtdparts = ( + "hi_sfc:256k(boot),64k(env),3072k(kernel)," + "24576k(rootfs),-(rootfs_data)" + ) + await _cmd(f"setenv mtdparts {mtdparts}", timeout=3.0) + else: + mtdparts_var = f"mtdpartsnor{nor_size}m" + await _cmd(f"run {mtdparts_var}", timeout=3.0) await _cmd("setenv bootcmd ${bootcmdnor}", timeout=3.0) resp = await _cmd("saveenv", timeout=10.0) if output == "human": diff --git a/src/defib/protocol/hisilicon_standard.py b/src/defib/protocol/hisilicon_standard.py index 6bd7327..2a5d71d 100644 --- a/src/defib/protocol/hisilicon_standard.py +++ b/src/defib/protocol/hisilicon_standard.py @@ -195,8 +195,8 @@ async def _send_frame_with_retry( for attempt in range(retries): await transport.flush_input() await transport.flush_output() - await transport.write(frame_data) try: + await transport.write(frame_data) ack = await transport.read(1, timeout=timeout) if ack == ACK_BYTE: logger.debug("TX %s ACKed (attempt %d/%d)", label, attempt + 1, retries) @@ -394,28 +394,38 @@ def _detect_spl_size(firmware: bytes, profile_max: int) -> int: """Detect actual SPL code size from firmware binary. HiSilicon mini-boot layout: vector table + .reg + executable code + - LZMA-compressed U-Boot payload. The SPL only needs the code region. - If the code is larger than the profile default (e.g. when SVB is - enabled), use the actual size so the bootrom receives all of it. + compressed U-Boot payload (LZMA or gzip, depending on the build). + The SPL only needs the code region — bytes past the compressed + payload boundary land in SRAM that the bootrom uses for its own + stack/state, so writing them corrupts the bootrom. + + When a compressed-payload boundary is found we trust it absolutely + and use it instead of profile_max — even if it's smaller. profile_max + comes from HiTool's reference SPL which fills the full window; an + OpenIPC build that's more compact must NOT be padded to that size. """ - # Search for LZMA header (0x5D + 4-byte LE dictionary size) after - # the .reg region. Common dict sizes: 64K, 128K, 256K, 512K, 1M, 8M. - VALID_DICT_SIZES = { - 1 << n for n in range(16, 25) # 64K .. 16M - } + # LZMA: 0x5D + 4-byte LE dictionary size (64K..16M) + VALID_LZMA_DICT = {1 << n for n in range(16, 25)} + # gzip: 1f 8b 08 (deflate method) for i in range(0x4000, min(len(firmware), 0x10000)): - if firmware[i] == 0x5D: + b = firmware[i] + detected: int | None = None + if b == 0x5D: ds = int.from_bytes(firmware[i + 1 : i + 5], "little") - if ds in VALID_DICT_SIZES: - # Round up to 1 KB boundary - detected = (i + 0x3FF) & ~0x3FF - if detected > profile_max: - logger.info( - "SPL code extends to 0x%X (%d bytes), " - "profile default was 0x%X (%d bytes)", - detected, detected, profile_max, profile_max, - ) - return max(detected, profile_max) + if ds in VALID_LZMA_DICT: + detected = i & ~0x3FF + label = "LZMA" + elif b == 0x1F and firmware[i + 1] == 0x8B and firmware[i + 2] == 0x08: + detected = i & ~0x3FF + label = "gzip" + if detected is not None: + if detected != profile_max: + logger.info( + "SPL boundary detected (%s) at 0x%X (%d bytes); " + "profile default was 0x%X (%d bytes)", + label, detected, detected, profile_max, profile_max, + ) + return detected return profile_max async def _send_spl( @@ -457,7 +467,11 @@ async def _send_spl( )) if not await self._send_tail(transport, len(chunks) + 1): - return False + # av200/av300 SPL detaches the bootrom protocol handler as + # soon as all declared bytes arrive — no TAIL ACK is sent. + if profile.prestep_data is None: + return False + logger.debug("SPL TAIL not ACKed (non-fatal for av200/av300, all data sent)") # DDR training delay: HiTool sleeps 300ms after SPL transfer. # Always apply — the SPL runs DDR training which needs time. @@ -471,6 +485,38 @@ async def _send_spl( )) return True + @staticmethod + def _zero_long_ff_runs(firmware: bytes, threshold: int = 12) -> bytes: + """Zero out long runs of 0xFF bytes. + + The hi3516cv500-family bootrom (av300, dv300, cv500) hangs mid-DATA + frame when the payload contains >=12 consecutive 0xFF bytes — almost + certainly a quirk in the bootrom's UART receive path. These runs + only appear as inert padding between SPL code and the compressed + U-Boot payload, so zeroing them is safe. + """ + if firmware.count(b"\xff" * threshold) == 0: + return firmware + out = bytearray(firmware) + run_start = -1 + for i, b in enumerate(out): + if b == 0xFF: + if run_start < 0: + run_start = i + else: + if run_start >= 0 and i - run_start >= threshold: + logger.debug( + "_zero_long_ff_runs: zeroed %d 0xFF bytes at offset 0x%X", + i - run_start, run_start, + ) + for j in range(run_start, i): + out[j] = 0 + run_start = -1 + if run_start >= 0 and len(out) - run_start >= threshold: + for j in range(run_start, len(out)): + out[j] = 0 + return bytes(out) + async def _send_uboot( self, transport: Transport, @@ -480,6 +526,7 @@ async def _send_uboot( label: str = "U-Boot", ) -> bool: """Send U-Boot (or agent) image to DDR.""" + firmware = self._zero_long_ff_runs(firmware) total = len(firmware) logger.debug( "=== %s === address=0x%08X total=%d chunks=%d", diff --git a/src/defib/transport/serial.py b/src/defib/transport/serial.py index 6d9116b..871b05d 100644 --- a/src/defib/transport/serial.py +++ b/src/defib/transport/serial.py @@ -82,9 +82,25 @@ async def read(self, size: int, timeout: float | None = None) -> bytes: self._port.timeout = old_timeout async def write(self, data: bytes) -> None: - await asyncio.get_event_loop().run_in_executor( - None, self._port.write, data - ) + # Pyserial honours write_timeout inside Serial.write(), so the worker + # thread actually returns instead of blocking in pselect6 forever. + # asyncio.wait_for can't help here — cancelling a run_in_executor + # future leaves the underlying thread still blocked. + # 5s ceiling: a 1KB write at 115200 baud is ~89ms; >5s means the + # kernel TX buffer isn't draining (USB-serial hung, flow control, + # cable unplugged). + old_wt = self._port.write_timeout + self._port.write_timeout = 5.0 + try: + await asyncio.get_event_loop().run_in_executor( + None, self._port.write, data + ) + except serial.SerialTimeoutException as exc: + raise TransportTimeout( + f"Write timeout (5.0s, {len(data)} bytes)" + ) from exc + finally: + self._port.write_timeout = old_wt async def flush_input(self) -> None: self._port.reset_input_buffer() diff --git a/tests/test_protocol_standard.py b/tests/test_protocol_standard.py index 961ac1c..86b34b5 100644 --- a/tests/test_protocol_standard.py +++ b/tests/test_protocol_standard.py @@ -8,6 +8,7 @@ from defib.protocol.hisilicon_standard import HiSiliconStandard from defib.profiles.loader import load_profile from defib.recovery.events import Stage +from defib.transport.base import TransportTimeout from defib.transport.mock import MockTransport PROFILES_DIR = __import__("pathlib").Path(__file__).parent.parent / "src" / "defib" / "profiles" / "data" @@ -286,3 +287,181 @@ async def test_send_ddr_step(self): assert Stage.UBOOT in result.stages_completed assert Stage.COMPLETE in result.stages_completed assert len(progress_events) > 0 + + +class TestDetectSplSize: + """Regression tests for _detect_spl_size — see PR #55 + gzip-detection fix. + + Bug history: + - PR #55 added LZMA-header detection so SVB-enabled av200 SPL (>profile_max) + wasn't truncated. Used max(detected, profile_max). + - hi3516av300 OpenIPC builds use gzip (not LZMA) for the embedded U-Boot + and the SPL is more compact than HiTool's reference. With max(), defib + sent profile_max (24KB) which overshoots the 21KB code into bootrom RAM + and hangs the SPL transfer mid-chunk. + - Fix: also detect gzip; trust the detected boundary regardless of + profile_max; round DOWN to 1KB so we never spill into compressed bytes. + """ + + @staticmethod + def _detect(firmware: bytes, profile_max: int = 0x6000) -> int: + return HiSiliconStandard._detect_spl_size(firmware, profile_max) + + def test_gzip_header_detected_below_profile_max(self): + # Real av300 layout: gzip at 0x52F0 inside a profile_max=0x6000 window. + firmware = bytes(0x52F0) + b"\x1f\x8b\x08" + b"\x00" * 100 + # Round DOWN to 1 KB — never include any byte of the gzip payload. + assert self._detect(firmware, profile_max=0x6000) == 0x5000 + + def test_gzip_header_above_profile_max(self): + firmware = bytes(0x6800) + b"\x1f\x8b\x08" + b"\x00" * 100 + assert self._detect(firmware, profile_max=0x6000) == 0x6800 + + def test_lzma_header_detected_below_profile_max(self): + # 0x5D + dict_size 0x10000 (64K) is a valid LZMA header. + firmware = bytes(0x5400) + b"\x5d\x00\x00\x01\x00" + b"\x00" * 100 + assert self._detect(firmware, profile_max=0x6000) == 0x5400 + + def test_lzma_header_rounds_down(self): + # Header at 0x53F0 rounds DOWN to 0x5000 (not up to 0x5400). + firmware = bytes(0x53F0) + b"\x5d\x00\x00\x01\x00" + b"\x00" * 100 + assert self._detect(firmware, profile_max=0x6000) == 0x5000 + + def test_lzma_invalid_dict_size_ignored(self): + # 0x5D followed by garbage dict size is not a real LZMA header. + firmware = bytes(0x5400) + b"\x5d\xff\xff\xff\xff" + b"\x00" * 100 + # Falls through to profile_max because no valid marker found. + assert self._detect(firmware, profile_max=0x6000) == 0x6000 + + def test_no_marker_falls_back_to_profile_max(self): + firmware = b"\xa5" * 0x10000 + assert self._detect(firmware, profile_max=0x6000) == 0x6000 + + def test_marker_before_search_window_ignored(self): + # The scan starts at 0x4000 — markers in the .reg region don't count. + firmware = bytes(0x100) + b"\x1f\x8b\x08" + bytes(0x6000) + assert self._detect(firmware, profile_max=0x6000) == 0x6000 + + +class TestZeroLongFfRuns: + """Regression tests for _zero_long_ff_runs. + + The hi3516cv500-family bootrom hangs mid-DATA frame when payload contains + >=12 consecutive 0xFF bytes (likely a UART RX-path quirk in the bootrom). + These runs only appear as inert padding between SPL code and the + compressed U-Boot payload, so zeroing them is safe. + """ + + @staticmethod + def _zero(firmware: bytes, threshold: int = 12) -> bytes: + return HiSiliconStandard._zero_long_ff_runs(firmware, threshold) + + def test_run_at_threshold_is_zeroed(self): + firmware = b"\xaa" + b"\xff" * 12 + b"\xbb" + out = self._zero(firmware) + assert out == b"\xaa" + b"\x00" * 12 + b"\xbb" + + def test_run_below_threshold_preserved(self): + firmware = b"\xaa" + b"\xff" * 11 + b"\xbb" + assert self._zero(firmware) == firmware + + def test_run_at_end_of_buffer_zeroed(self): + firmware = b"\xaa" + b"\xff" * 16 + out = self._zero(firmware) + assert out == b"\xaa" + b"\x00" * 16 + + def test_no_runs_returns_unchanged(self): + firmware = bytes(range(256)) * 4 + assert self._zero(firmware) is firmware # short-circuits, no copy + + def test_multiple_runs_all_zeroed(self): + firmware = b"\xff" * 12 + b"\xaa" + b"\xff" * 13 + b"\xbb" + out = self._zero(firmware) + assert out == b"\x00" * 12 + b"\xaa" + b"\x00" * 13 + b"\xbb" + + def test_av300_padding_pattern(self): + # The exact pattern from hi3516av300 u-boot.bin at offset 0x52E0: + # 4 SPL bytes, 12 0xFF padding, gzip header. + firmware = bytes([0x04, 0x00, 0x02, 0x12]) + b"\xff" * 12 + b"\x1f\x8b\x08" + out = self._zero(firmware) + assert out == bytes([0x04, 0x00, 0x02, 0x12]) + b"\x00" * 12 + b"\x1f\x8b\x08" + + +class TestSplTailNonFatalForFrameBlast: + """Regression test for av200/av300 SPL TAIL handling. + + On chips with PRESTEP0 (frame-blast handshake), the SPL detaches the + bootrom protocol handler as soon as it has received all declared bytes, + so the TAIL frame is never ACKed. Treating that as fatal stalls the + install at SPL completion. (Mirrors the existing U-Boot TAIL behavior.) + """ + + @pytest.mark.asyncio + async def test_spl_tail_no_ack_succeeds_for_av300(self): + transport = MockTransport(flush_clears_buffer=False) + profile = load_profile("hi3516av300", PROFILES_DIR) + protocol = HiSiliconStandard() + protocol.set_profile(profile) + + # av300 profile_max is 0x6000 = 24576 bytes = 24 chunks of 1024. + # No marker in zeros, so _detect_spl_size falls back to profile_max. + firmware = b"\x00" * profile.spl_max_size + spl_chunks = (profile.spl_max_size + 1023) // 1024 + + # ACKs for SPL HEAD + chunks. Deliberately NO ACK for TAIL — + # TAIL retries time out, then fall through (because prestep_data is set). + transport.enqueue_rx(ACK_BYTE * (1 + spl_chunks)) + + ok = await protocol._send_spl(transport, firmware, profile) + assert ok, "av300 SPL must succeed even without TAIL ACK" + + @pytest.mark.asyncio + async def test_spl_tail_no_ack_fails_for_chip_without_prestep(self): + # On chips WITHOUT prestep_data, TAIL ACK is required — verify the + # legacy strict behavior is preserved. + transport = MockTransport(flush_clears_buffer=False) + profile = load_profile("gk7201v300", PROFILES_DIR) # no PRESTEP0 + assert profile.prestep_data is None + protocol = HiSiliconStandard() + protocol.set_profile(profile) + + firmware = b"\x00" * profile.spl_max_size + spl_chunks = (profile.spl_max_size + 1023) // 1024 + # ACKs for HEAD + chunks, no TAIL ACK. + transport.enqueue_rx(ACK_BYTE * (1 + spl_chunks)) + + ok = await protocol._send_spl(transport, firmware, profile) + assert not ok, "chips without prestep_data must still treat SPL TAIL as fatal" + + +class TestWriteTimeoutRetry: + """Regression test for write-timeout handling in _send_frame_with_retry. + + Previously, transport.write() blocked outside the try/except, so a hung + write (e.g. PL2303 TX buffer not draining) would propagate a raw + TransportTimeout up to the caller — bypassing the retry loop entirely. + Now the write is inside the try block: a transient write failure is + treated like an ACK timeout and retried. + """ + + @pytest.mark.asyncio + async def test_transient_write_timeout_is_retried(self): + class FlakeyTransport(MockTransport): + """Times out the first N writes, then behaves normally.""" + def __init__(self, fail_writes: int) -> None: + super().__init__(flush_clears_buffer=False) + self._remaining_failures = fail_writes + + async def write(self, data: bytes) -> None: + if self._remaining_failures > 0: + self._remaining_failures -= 1 + raise TransportTimeout("simulated write hang") + await super().write(data) + + transport = FlakeyTransport(fail_writes=2) + transport.enqueue_rx(ACK_BYTE) + + protocol = HiSiliconStandard() + # _send_head goes through _send_frame_with_retry. + ok = await protocol._send_head(transport, length=64, address=0x04017000) + assert ok, "retry loop must recover from transient write timeouts"