diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 40e070f..08e9cb8 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -62,6 +62,7 @@ jobs: test64.ext4 test64.ext4.tmp test_htree.ext4 + test_htree_multi.ext4 if-no-files-found: error test: diff --git a/_test_image.sh b/_test_image.sh index 259afd2..1df1b5c 100755 --- a/_test_image.sh +++ b/_test_image.sh @@ -45,7 +45,6 @@ mkimage test32 "$tmp_dir" 20 -O ^64bit mkimage test64 "$tmp_dir" 20 -O 64bit rm -f "$tmp_dir"/test*.txt "$tmp_dir"/symlink.txt -echo "[test] Generating files..." echo "[test] Making image test_htree..." chronic dd if=/dev/zero of=test_htree.ext4 count=20 bs=1048576 @@ -59,3 +58,15 @@ sudo umount "$tmp_dir" # shellcheck disable=SC2064 trap "rmdir \"$tmp_dir\"" EXIT chronic e2fsck -Dy test_htree.ext4 + +echo "[test] Making image test_htree_multi..." +chronic dd if=/dev/zero of=test_htree_multi.ext4 count=20 bs=1048576 +chronic mkfs.ext4 -g 1024 -b 1024 -O 64bit,dir_index -N 13000 test_htree_multi.ext4 +sudo mount -t ext4 test_htree_multi.ext4 "$tmp_dir" +# shellcheck disable=SC2064 +trap "sudo umount \"$tmp_dir\";rmdir \"$tmp_dir\"" EXIT +printf '%s\n' "$tmp_dir"/{1..12000} | xargs sudo touch +sudo umount "$tmp_dir" +# shellcheck disable=SC2064 +trap "rmdir \"$tmp_dir\"" EXIT +chronic e2fsck -Dy test_htree_multi.ext4 diff --git a/ext4/directory.py b/ext4/directory.py index be825d8..72145e7 100644 --- a/ext4/directory.py +++ b/ext4/directory.py @@ -36,12 +36,16 @@ def __init__(self, directory: "Directory", offset: int) -> None: @override def read_from_volume(self) -> None: - data = self.directory._open().read()[self.offset : self.offset + self.size] # pyright: ignore[reportPrivateUsage] - _ = memmove(addressof(self), data, self.size) + reader = self.directory._open() # pyright: ignore[reportPrivateUsage] + _ = reader.seek(self.offset) + data = reader.read(self.size) + # Do not guard against short reads, this happens for the last entry + _ = memmove(addressof(self), data, len(data)) class DirectoryEntryBase(DirectoryEntryStruct): __slots__: tuple[str, ...] = () + @property def name_bytes(self) -> bytes: return bytes(self.name)[: self.name_len] # pyright: ignore[reportAny] diff --git a/ext4/htree.py b/ext4/htree.py index bc0676b..dd53f54 100644 --- a/ext4/htree.py +++ b/ext4/htree.py @@ -1,5 +1,6 @@ +import errno import warnings -from collections.abc import Generator +from collections.abc import Iterator from ctypes import ( LittleEndianStructure, addressof, @@ -19,6 +20,7 @@ assert_cast, override, ) +from .block import BlockIO from .enum import DX_HASH from .struct import ( Ext4Struct, @@ -30,6 +32,10 @@ from .volume import Volume +class HtreeHashError(Exception): + pass + + class LittleEndianStructureWithVolume(LittleEndianStructure): __slots__: tuple[str, ...] = ("_volume",) @@ -104,6 +110,12 @@ def read_from_volume(self) -> None: reader = self.directory._open() # pyright: ignore[reportPrivateUsage] _ = reader.seek(self.offset) data = reader.read(sizeof(self)) + if len(data) != sizeof(self): + raise OSError( + errno.EIO, + f"Short read for {type(self).__name__} at offset {self.offset}", + ) + _ = memmove(addressof(self), data, sizeof(self)) @@ -130,34 +142,184 @@ def __init__(self, parent: "DXEntriesBase", index: int) -> None: ) -class DXEntriesBase(DXBase): - __slots__: tuple[str, ...] = () +@final +class DXEntries: + __slots__ = ( + "_cache", + "base", + ) - @override - def read_from_volume(self) -> None: - super().read_from_volume() + def __init__(self, base: "DXEntriesBase") -> None: + self.base: DXEntriesBase = base + self._cache: dict[int, DXEntry] = {} + + def __contains__(self, index: int) -> bool: + return 0 <= index < self.base.count # pyright: ignore[reportAny] + + def __len__(self) -> int: + """Length of entries minus the DXTail""" + return self.base.count - 1 # pyright: ignore[reportAny] + + def __iter__(self) -> Iterator[DXEntry]: + for index in range(len(self)): + yield self[index] + + def __getitem__(self, index: int) -> DXEntry: + if index not in self: + raise KeyError() + + entry = self._cache.get(index, None) + if entry is None: + entry = DXEntry(self.base, index) + self._cache[index] = entry + + return entry - @property - def entries(self) -> Generator[DXEntry, None, None]: - count = assert_cast(self.count, int) # pyright: ignore[reportAny] - for i in range(0, count - 1): - yield DXEntry(self, i) + +class DXEntriesBase(DXBase): + __slots__: tuple[str, ...] = ( + "entries", + "parent", + ) + + def __init__(self, directory: "Directory", offset: int) -> None: + super().__init__(directory, offset) + self.parent: DXRoot | None = directory.htree + self.entries: DXEntries = DXEntries(self) @property def info_length(self) -> int: parent = self while not isinstance(parent, DXRoot): - parent = assert_cast(parent.parent, DXEntriesBase) # pyright: ignore[reportAny] + parent = assert_cast(parent.parent, DXEntriesBase) dx_root_info = assert_cast(parent.dx_root_info, DXRootInfo) # pyright: ignore[reportAny] return assert_cast(dx_root_info.info_length, int) # pyright: ignore[reportAny] +def str2hashbuf( + name: bytes, length: int, num: int, signed_hash: bool = False +) -> list[int]: + """Convert name bytes to array of 32-bit words (LSB-first).""" + pad: int = length | (length << 8) + pad |= pad << 16 + pad &= 0xFFFFFFFF + + buf: list[int] = [] + val: int = pad + length = min(length, num * 4) + for i in range(length): + byte_val = sign_extended_byte(signed_hash, name[i]) + val = (byte_val + (val << 8)) & 0xFFFFFFFF + if i % 4 == 3: + buf.append(val) + val = pad + + if len(buf) < num: + buf.append(val) + + while len(buf) < num: + buf.append(pad) + + return buf[:num] + + +def sign_extended_byte(signed_hash: bool, value: int) -> int: + if signed_hash and value > 127: + return value - 256 + + return value + + +def half_md4_transform(buf: list[int], inp: list[int]) -> None: + """ + Args: + buf: Input/output [a, b, c, d] (modified in place) + inp: 8 input words from str2hashbuf + """ + + def F(x: int, y: int, z: int) -> int: + return (z) ^ ((x) & ((y) ^ (z))) + + def G(x: int, y: int, z: int) -> int: + return ((x) & (y)) + (((x) ^ (y)) & (z)) + + def H(x: int, y: int, z: int) -> int: + return (x) ^ (y) ^ (z) + + def rol32(val: int, shift: int) -> int: + """32-bit rotate left.""" + shift &= 31 + return ((val << shift) | (val >> (32 - shift))) & 0xFFFFFFFF + + a: int = buf[0] + b: int = buf[1] + c: int = buf[2] + d: int = buf[3] + + k: int = 0 + a = rol32((a + F(b, c, d) + inp[0] + k) & 0xFFFFFFFF, 3) + d = rol32((d + F(a, b, c) + inp[1] + k) & 0xFFFFFFFF, 7) + c = rol32((c + F(d, a, b) + inp[2] + k) & 0xFFFFFFFF, 11) + b = rol32((b + F(c, d, a) + inp[3] + k) & 0xFFFFFFFF, 19) + a = rol32((a + F(b, c, d) + inp[4] + k) & 0xFFFFFFFF, 3) + d = rol32((d + F(a, b, c) + inp[5] + k) & 0xFFFFFFFF, 7) + c = rol32((c + F(d, a, b) + inp[6] + k) & 0xFFFFFFFF, 11) + b = rol32((b + F(c, d, a) + inp[7] + k) & 0xFFFFFFFF, 19) + + k = 0x5A827999 + a = rol32((a + G(b, c, d) + inp[1] + k) & 0xFFFFFFFF, 3) + d = rol32((d + G(a, b, c) + inp[3] + k) & 0xFFFFFFFF, 5) + c = rol32((c + G(d, a, b) + inp[5] + k) & 0xFFFFFFFF, 9) + b = rol32((b + G(c, d, a) + inp[7] + k) & 0xFFFFFFFF, 13) + a = rol32((a + G(b, c, d) + inp[0] + k) & 0xFFFFFFFF, 3) + d = rol32((d + G(a, b, c) + inp[2] + k) & 0xFFFFFFFF, 5) + c = rol32((c + G(d, a, b) + inp[4] + k) & 0xFFFFFFFF, 9) + b = rol32((b + G(c, d, a) + inp[6] + k) & 0xFFFFFFFF, 13) + + k = 0x6ED9EBA1 + a = rol32((a + H(b, c, d) + inp[3] + k) & 0xFFFFFFFF, 3) + d = rol32((d + H(a, b, c) + inp[7] + k) & 0xFFFFFFFF, 9) + c = rol32((c + H(d, a, b) + inp[2] + k) & 0xFFFFFFFF, 11) + b = rol32((b + H(c, d, a) + inp[6] + k) & 0xFFFFFFFF, 15) + a = rol32((a + H(b, c, d) + inp[1] + k) & 0xFFFFFFFF, 3) + d = rol32((d + H(a, b, c) + inp[5] + k) & 0xFFFFFFFF, 9) + c = rol32((c + H(d, a, b) + inp[0] + k) & 0xFFFFFFFF, 11) + b = rol32((b + H(c, d, a) + inp[4] + k) & 0xFFFFFFFF, 15) + + buf[0] = (buf[0] + a) & 0xFFFFFFFF + buf[1] = (buf[1] + b) & 0xFFFFFFFF + buf[2] = (buf[2] + c) & 0xFFFFFFFF + buf[3] = (buf[3] + d) & 0xFFFFFFFF + + +def tea_transform(buf: list[int], inp: list[int]) -> None: + DELTA: int = 0x9E3779B9 + sum_val: int = 0 + b0: int = buf[0] + b1: int = buf[1] + a: int = inp[0] + b: int = inp[1] + c: int = inp[2] + d: int = inp[3] + + for _ in range(16): + sum_val = (sum_val + DELTA) & 0xFFFFFFFF + b0 = (b0 + (((b1 << 4) + a) ^ (b1 + sum_val) ^ ((b1 >> 5) + b))) & 0xFFFFFFFF + b1 = (b1 + (((b0 << 4) + c) ^ (b0 + sum_val) ^ ((b0 >> 5) + d))) & 0xFFFFFFFF + + buf[0] = (buf[0] + b0) & 0xFFFFFFFF + buf[1] = (buf[1] + b1) & 0xFFFFFFFF + + @final class DXRoot(DXEntriesBase): __slots__ = () - _pack_ = 1 + + def __init__(self, inode: "Directory") -> None: + super().__init__(inode, 0) + # _anonymous_ = ("") _fields_ = [ ("dot", DotDirectoryEntry2), @@ -169,8 +331,179 @@ class DXRoot(DXEntriesBase): # ("entries", DXEntry * self.count), ] - def __init__(self, inode: "Directory") -> None: - super().__init__(inode, 0) + def compute_dx_hash(self, name: bytes, hash_version: int) -> tuple[int, int]: + seed = self.directory.volume.superblock.s_hash_seed # pyright: ignore[reportAny] + buf: list[int] = [0x67452301, 0xEFCDAB89, 0x98BADCFE, 0x10325476] + if seed is not None and any(seed): # pyright: ignore[reportAny] + buf = list(seed) # pyright: ignore[reportAny] + + match hash_version: + case DX_HASH.LEGACY | DX_HASH.LEGACY_UNSIGNED: + hash0: int = 0x12A3FE2D + hash1: int = 0x37ABE8F9 + for i in range(len(name)): + byte_val = sign_extended_byte( + hash_version == DX_HASH.LEGACY, name[i] + ) + hash_val: int = hash1 + (hash0 ^ (byte_val * 7152373)) + if hash_val & 0x80000000: + hash_val -= 0x7FFFFFFF + + hash1 = hash0 + hash0 = hash_val & 0xFFFFFFFF + + hash_val = (hash0 << 1) & 0xFFFFFFFF + minor_hash = 0 + + case DX_HASH.HALF_MD4 | DX_HASH.HALF_MD4_UNSIGNED: + signed_hash = hash_version == DX_HASH.HALF_MD4 + p = 0 + length = len(name) + while length > 0: + inp = str2hashbuf(name[p:], length, 8, signed_hash) + half_md4_transform(buf, inp) + length -= 32 + p += 32 + + hash_val = buf[1] + minor_hash = buf[2] + + case DX_HASH.TEA | DX_HASH.TEA_UNSIGNED: + signed_hash = hash_version == DX_HASH.TEA + p = 0 + length = len(name) + while length > 0: + inp = str2hashbuf(name[p:], length, 4, signed_hash) + tea_transform(buf, inp) + length -= 16 + p += 16 + + hash_val = buf[0] + minor_hash = buf[1] + + case DX_HASH.SIPHASH: + raise HtreeHashError("SipHash not yet supported for htree lookup") + + case _: + raise HtreeHashError(f"Unknown hash version: {hash_version}") + + hash_val &= ~1 + if hash_val == 0xFFFFFFFE: + hash_val = 0xFFFFFFFC + + return hash_val, minor_hash + + def lookup(self, name: str | bytes) -> int | None: + if isinstance(name, str): + name = name.encode("utf-8") + + hash_version = assert_cast(self.dx_root_info.hash_version, int) # pyright: ignore[reportAny] + hash_val, _minor_hash = self.compute_dx_hash(name, hash_version) + block_num: int = assert_cast(self.block, int) # pyright: ignore[reportAny] # default block + entries = self.entries + lo: int = 0 + hi: int = len(entries) - 1 + while lo <= hi: + mid: int = (lo + hi) // 2 + if entries[mid].hash <= hash_val: # pyright: ignore[reportAny] + lo = mid + 1 + + else: + hi = mid - 1 + + leaf_entries: DXEntries | None = None + leaf_idx: int = -1 + if hi >= 0: + block_num = entries[hi].block # pyright: ignore[reportAny] + leaf_entries = entries + leaf_idx = hi + + for _ in range(assert_cast(self.dx_root_info.indirect_levels, int)): # pyright: ignore[reportAny] + node = DXNode( + self.directory, + block_num * self.directory.block_size, + ) + node_entries = node.entries + + lo = 0 + hi = len(node_entries) - 1 + while lo <= hi: + mid = (lo + hi) // 2 + if node_entries[mid].hash <= hash_val: # pyright: ignore[reportAny] + lo = mid + 1 + + else: + hi = mid - 1 + + block_num = ( + node_entries[hi].block # pyright: ignore[reportAny] + if hi >= 0 + else assert_cast(node.block, int) # pyright: ignore[reportAny] + ) + + if hi >= 0: + leaf_entries = node_entries + leaf_idx = hi + + else: + leaf_entries = None + leaf_idx = -1 + + blocks_to_scan: list[int] = [] + if leaf_entries is not None and leaf_idx >= 0: + current_idx = leaf_idx + while current_idx < len(leaf_entries): + entry = leaf_entries[current_idx] + blocks_to_scan.append(entry.block) # pyright: ignore[reportAny] + next_idx = current_idx + 1 + if ( + next_idx >= len(leaf_entries) + or not (leaf_entries[next_idx].hash & 1) # pyright: ignore[reportAny] + or (leaf_entries[next_idx].hash & ~1) != (hash_val & ~1) # pyright: ignore[reportAny] + ): + break + + current_idx = next_idx + else: + blocks_to_scan = [block_num] + + has_filetype = self.directory.has_filetype + with BlockIO(self.directory) as blockio: + for scan_block in blocks_to_scan: + leaf_data = blockio.blocks[scan_block] + offset: int = 0 + while offset + 8 <= len(leaf_data): + inode_val = int.from_bytes(leaf_data[offset : offset + 4], "little") + rec_len = int.from_bytes( + leaf_data[offset + 4 : offset + 6], "little" + ) + + if rec_len == 0: + break + + name_len = ( + leaf_data[offset + 6] + if has_filetype + else int.from_bytes( + leaf_data[offset + 6 : offset + 8], "little" + ) + ) + + if inode_val == 0 or name_len == 0: + offset += rec_len + continue + + name_start = offset + 8 + if name_start + name_len > len(leaf_data): + break + + entry_name = leaf_data[name_start : name_start + name_len] + if entry_name == name: + return inode_val + + offset += rec_len + + return None @final diff --git a/ext4/inode.py b/ext4/inode.py index 0ca8c6d..0a9ce59 100644 --- a/ext4/inode.py +++ b/ext4/inode.py @@ -48,7 +48,10 @@ ExtentIndex, ExtentTree, ) -from .htree import DXRoot +from .htree import ( + DXRoot, + HtreeHashError, +) from .struct import ( Ext4Struct, MagicError, @@ -527,6 +530,7 @@ class Directory(Inode): "_dirents", "_inode_at_cache", "htree", + "__dict__", ) def __init__(self, volume: Volume, offset: int, i_no: int) -> None: @@ -634,18 +638,20 @@ def opendir( self, ) -> Generator[tuple[DirectoryEntry | DirectoryEntry2, EXT4_FT], Any, None]: # pyright: ignore[reportExplicitAny] for dirent in self._opendir(): - if isinstance(dirent, DirectoryEntry2): - file_type = EXT4_FT(dirent.file_type) # pyright: ignore[reportAny] - if file_type == EXT4_FT.DIR_CSUM: - continue + if not isinstance(dirent, DirectoryEntry2): + file_type = self._get_file_type(dirent) + yield dirent, file_type + continue - if not self._is_valid_file_type(file_type): - raise OpenDirectoryError(f"Unexpected file type: {file_type}") + file_type = EXT4_FT(dirent.file_type) # pyright: ignore[reportAny] + if file_type == EXT4_FT.DIR_CSUM: + continue - else: - file_type = self._get_file_type(dirent) + if self._is_valid_file_type(file_type): + yield dirent, file_type + continue - yield dirent, file_type + raise OpenDirectoryError(f"Unexpected file type: {file_type}") @cachedmethod(lambda self: self._inode_at_cache) # pyright: ignore[reportAny] def inode_at(self, path: str | bytes) -> Inode: @@ -668,11 +674,22 @@ def inode_at(self, path: str | bytes) -> Inode: name = paths.pop(0) inode = None - for dirent, _ in cwd.opendir(): - if dirent.name_bytes == name: - dirent_inode = assert_cast(dirent.inode, int) # pyright: ignore[reportAny] - inode = self.volume.inodes[dirent_inode] - break + + if cwd.is_htree and cwd.htree is not None: + try: + inode_no = cwd.htree.lookup(name) + if inode_no is not None: + inode = self.volume.inodes[inode_no] + + except HtreeHashError: + pass + + if inode is None: + for dirent, _ in cwd.opendir(): + if dirent.name_bytes == name: + dirent_inode = assert_cast(dirent.inode, int) # pyright: ignore[reportAny] + inode = self.volume.inodes[dirent_inode] + break if inode is None: raise FileNotFoundError(path) diff --git a/ext4/volume.py b/ext4/volume.py index bd22269..8b76610 100644 --- a/ext4/volume.py +++ b/ext4/volume.py @@ -34,6 +34,7 @@ class Inodes: "_group_cache", "_offset_cache", "volume", + "__dict__", ) def __init__(self, volume: Volume) -> None: @@ -74,7 +75,6 @@ def __getitem__(self, index: int) -> Inode: class Volume: __slots__: tuple[str, ...] = ( - "_inode_at_cache", "cursor", "group_descriptors", "ignore_attr_name_index", @@ -134,7 +134,6 @@ def __init__( self.group_descriptors.insert(index, descriptor) self.inodes: Inodes = Inodes(self) - self._inode_at_cache: LRUCache[str | bytes, Inode] = LRUCache(maxsize=32) def __len__(self) -> int: _ = self.stream.seek(0, io.SEEK_END) @@ -237,28 +236,5 @@ def path_tuple(path: str | bytes) -> tuple[bytes, ...]: return tuple(x.encode("utf-8") for x in PurePosixPath(path).parts[1:]) - @cachedmethod(lambda self: self._inode_at_cache) # pyright: ignore[reportAny] def inode_at(self, path: str | bytes) -> Inode: - paths = list(self.path_tuple(path)) - cwd = self.root - if not paths: - return cwd - - while paths: - if not isinstance(cwd, Directory): - raise OSError(errno.ENOTDIR, os.strerror(errno.ENOTDIR)) - - name = paths.pop(0) - inode = None - for dirent, _ in cwd.opendir(): - if dirent.name_bytes == name: - dirent_inode = assert_cast(dirent.inode, int) # pyright: ignore[reportAny] - inode = self.inodes[dirent_inode] - break - - if inode is None: - raise FileNotFoundError(path) - - cwd = inode - - return cwd + return self.root.inode_at(b"/".join(self.path_tuple(path))) diff --git a/pyproject.toml b/pyproject.toml index 440ecf0..2c2daf8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "ext4" -version = "1.3.3" +version = "1.4" authors = [ { name="Eeems", email="eeems@eeems.email" }, ] @@ -25,7 +25,7 @@ classifiers = [ license = "MIT" dynamic = ["readme"] dependencies = [ - "cachetools==6.0.0", + "cachetools==7.1.1", "crcmod==1.7", "typing-extensions==4.15.0; python_version<\"3.12\"", ] diff --git a/test.py b/test.py index dc4e62a..43c360c 100644 --- a/test.py +++ b/test.py @@ -17,17 +17,17 @@ def test_path_tuple(path: str | bytes, expected: tuple[bytes, ...]) -> None: global FAILED # noqa: PLW0603 - print(f"check Volume.path_tuple({path}): ", end="") + print(f"check Volume.path_tuple({path}): ", end="", flush=True) try: t = ext4.Volume.path_tuple(path) if t != expected: raise ValueError(f"Result is unexpected {t}") - print("pass") + print("pass", flush=True) except Exception as e: FAILED = True # pyright: ignore[reportConstantRedefinition] - print("fail") + print("fail", flush=True) print(" ", end="", file=sys.stderr) print(e, file=sys.stderr) @@ -43,27 +43,27 @@ def _eval_or_False(source: str) -> Any: # pyright: ignore[reportExplicitAny, re def _assert(source: str, debug: Callable[[], Any] | None = None) -> None: # pyright: ignore[reportExplicitAny] global FAILED # noqa: PLW0603 - print(f"check {source}: ", end="") + print(f"check {source}: ", end="", flush=True) if _eval_or_False(source): - print("pass") + print("pass", flush=True) return FAILED = True # pyright: ignore[reportConstantRedefinition] - print("fail") + print("fail", flush=True) if debug is not None: print(f" {debug()}", file=sys.stderr) def _not_raises(source: str, debug: Callable[[], Any] | None = None) -> None: # pyright: ignore[reportExplicitAny] global FAILED # noqa: PLW0603 - print(f"check {source} does not raise exception: ", end="") + print(f"check {source} does not raise exception: ", end="", flush=True) try: _ = eval(source) # noqa: S307 # pyright: ignore[reportAny] - print("pass") + print("pass", flush=True) except Exception: FAILED = True # pyright: ignore[reportConstantRedefinition] - print("fail") + print("fail", flush=True) if debug is not None: print(f" {debug()}", file=sys.stderr) @@ -71,17 +71,17 @@ def _not_raises(source: str, debug: Callable[[], Any] | None = None) -> None: # def test_magic_error(f: BufferedReader) -> None: global FAILED # noqa: PLW0603 try: - print("check MagicError: ", end="") + print("check MagicError: ", end="", flush=True) _ = ext4.Volume(f, offset=0) FAILED = True # pyright: ignore[reportConstantRedefinition] - print("fail") + print("fail", flush=True) print(" MagicError not raised") except ext4.struct.MagicError: - print("pass") + print("pass", flush=True) except Exception as e: FAILED = True # pyright: ignore[reportConstantRedefinition] - print("fail") + print("fail", flush=True) print(" ", end="", file=sys.stderr) print(e, file=sys.stderr) @@ -89,29 +89,29 @@ def test_magic_error(f: BufferedReader) -> None: def test_root_inode(volume: ext4.Volume) -> None: global FAILED # noqa: PLW0603 try: - print("Validate root inode: ", end="") + print("Validate root inode: ", end="", flush=True) volume.root.validate() - print("pass") + print("pass", flush=True) except ext4.struct.ChecksumError as e: FAILED = True # pyright: ignore[reportConstantRedefinition] - print("fail") + print("fail", flush=True) print(" ", end="", file=sys.stderr) print(e, file=sys.stderr) -print("check ext4.Volume stream validation: ", end="") +print("check ext4.Volume stream validation: ", end="", flush=True) try: _ = ext4.Volume(1) # pyright: ignore[reportArgumentType] FAILED = True # pyright: ignore[reportConstantRedefinition] - print("fail") + print("fail", flush=True) except ext4.InvalidStreamException: - print("pass") + print("pass", flush=True) except Exception as e: FAILED = True # pyright: ignore[reportConstantRedefinition] - print("fail") + print("fail", flush=True) print(" ", end="", file=sys.stderr) print(e, file=sys.stderr) @@ -238,7 +238,7 @@ def test_root_inode(volume: ext4.Volume) -> None: lambda: dx_root_info.indirect_levels, # pyright: ignore[reportAny] ) - entries = list(htree.entries) + entries = htree.entries _assert("len(entries) > 0", lambda: len(entries)) _assert( "len(entries) == htree.count - 1", @@ -264,5 +264,87 @@ def test_root_inode(volume: ext4.Volume) -> None: non_htree_dir = cast(ext4.Directory, volume.inode_at("/empty")) _assert("not non_htree_dir.is_htree") +img_file = "test_htree_multi.ext4" +print(f"Testing image: {img_file}") +with open(img_file, "rb") as f: + volume = None + try: + volume = ext4.Volume(f) + + except Exception: + FAILED = True # pyright: ignore[reportConstantRedefinition] + traceback.print_exc() + + if volume is not None: + _assert("volume.superblock is not None") + _assert("volume.bad_blocks is not None") + _assert("volume.boot_loader is not None") + _assert("volume.journal is not None") + test_root_inode(volume) + _assert("volume.root.is_htree == True") + _assert("volume.root.htree is not None") + + htree = volume.root.htree + _assert("htree is not None") + if htree is not None: + _assert( + "isinstance(htree.dot, ext4.DotDirectoryEntry2)", + lambda: htree.dot, # pyright: ignore[reportOptionalMemberAccess, reportAny] + ) + _assert( + "isinstance(htree.dotdot, ext4.DotDirectoryEntry2)", + lambda: htree.dotdot, # pyright: ignore[reportOptionalMemberAccess, reportAny] + ) + _assert("htree.limit > 0", lambda: htree.limit) # pyright: ignore[reportOptionalMemberAccess, reportAny] + _assert("htree.count > 0", lambda: htree.count) # pyright: ignore[reportOptionalMemberAccess, reportAny] + _assert("htree.count <= htree.limit") + _assert("htree.block >= 0", lambda: htree.block) # pyright: ignore[reportOptionalMemberAccess, reportAny] + _assert("htree.dx_root_info is not None") + + _assert("htree.dot.verify() is None") + _assert("htree.dotdot.verify() is None") + + _assert("htree.dot.name == b'.'", lambda: htree.dot.name) # pyright: ignore[reportAny, reportOptionalMemberAccess] + _assert("htree.dotdot.name == b'..'", lambda: htree.dotdot.name) # pyright: ignore[reportAny, reportOptionalMemberAccess] + + dx_root_info = htree.dx_root_info # pyright: ignore[reportAny] + _assert( + "isinstance(dx_root_info.hash_version, int)", + lambda: dx_root_info.hash_version, # pyright: ignore[reportAny] + ) + _not_raises( + "ext4.DX_HASH(dx_root_info.hash_version)", + lambda: dx_root_info.hash_version, # pyright: ignore[reportAny] + ) + _assert("dx_root_info.info_length == 8", lambda: dx_root_info.info_length) # pyright: ignore[reportAny] + _assert( + "dx_root_info.indirect_levels > 0", + lambda: dx_root_info.indirect_levels, # pyright: ignore[reportAny] + ) + entries = htree.entries + _assert("len(entries) > 0", lambda: len(entries)) + _assert( + "len(entries) == htree.count - 1", + lambda: f"{len(entries)} != {htree.count - 1}", # pyright: ignore[reportAny, reportOptionalMemberAccess] + ) + for entry in entries: + _assert("isinstance(entry.hash, int)", lambda: entry.hash) # pyright: ignore[reportAny] + _assert("isinstance(entry.block, int)", lambda: entry.block) # pyright: ignore[reportAny] + + if entries: + first_entry = entries[0] + _assert("first_entry.hash >= 0", lambda: first_entry.hash) # pyright: ignore[reportAny] + _assert("first_entry.block > 0", lambda: first_entry.block) # pyright: ignore[reportAny] + + block_io = ext4.BlockIO(volume.root) + block = block_io.blocks[first_entry.block] # pyright: ignore[reportAny] + _assert("len(block) > 0", lambda: len(block)) + _assert(f"len(block) == {volume.block_size}", lambda: len(block)) + + inode_no = htree.lookup("12000") + _assert("inode_no is not None") + if inode_no is not None: + inode = volume.inodes[inode_no] + _assert("isinstance(inode, ext4.File)", lambda: inode) if FAILED: sys.exit(1) diff --git a/test.sh b/test.sh index 6d1632d..6f4d7ab 100755 --- a/test.sh +++ b/test.sh @@ -14,7 +14,12 @@ else echo "venv missing" exit 1 fi -if [ ! -f test32.ext4 ] || [ ! -f test32.ext4.tmp ] || [ ! -f test64.ext4 ] || [ ! -f test64.ext4.tmp ] || [ ! -f test_htree.ext4 ]; then +if [ ! -f test32.ext4 ] || + [ ! -f test32.ext4.tmp ] || + [ ! -f test64.ext4 ] || + [ ! -f test64.ext4.tmp ] || + [ ! -f test_htree.ext4 ] || + [ ! -f test_htree_multi.ext4 ]; then ./_test_image.sh trap "rm -f test{32,64,_htree}.ext4{,.tmp}" EXIT fi