From cd55a145172da2e9788e6db02314e94b42b8aa6a Mon Sep 17 00:00:00 2001 From: Chao Wang <26245345+ChaoWao@users.noreply.github.com> Date: Fri, 20 Mar 2026 21:35:23 +0800 Subject: [PATCH 1/4] Add: AscendC kernel compilation support Integrate AscendC operators into PTO runtime via single-TU compile + link: - Add AscendCToolchain to toolchain.py with --cce-aicore-lang dialect, SDK include paths from bisheng_intf.cmake, and auto-sync flags - Add ascendc_compiler.py with single-TU approach: merge kernel_entry wrapper + user source into one translation unit, compile with AscendC flags, link with ld.lld to resolve block-local relocations - Dispatch compiler='ascendc' kernels in code_runner.py - Add ascendc_vector_example device test (z=x+y via AscendC, w=z*z via PTO) under tests/device_tests/ to avoid sim CI discovery - Add unit tests for wrapper generation, merged source generation, artifact extraction, and toolchain flags --- examples/scripts/code_runner.py | 37 +- python/ascendc_compiler.py | 538 ++++++++++++++++++ python/toolchain.py | 65 +++ .../ascendc_vector_example/README.md | 96 ++++ .../ascendc_vector_example/golden.py | 52 ++ .../kernels/aiv/kernel_mul.cpp | 79 +++ .../kernels/ascendc/add_custom.cpp | 90 +++ .../kernels/kernel_config.py | 52 ++ .../kernels/orchestration/ascendc_orch.cpp | 89 +++ tests/test_ascendc_compiler.py | 371 ++++++++++++ 10 files changed, 1468 insertions(+), 1 deletion(-) create mode 100644 python/ascendc_compiler.py create mode 100644 tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/README.md create mode 100644 tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/golden.py create mode 100644 tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/aiv/kernel_mul.cpp create mode 100644 tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.cpp create mode 100644 tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/kernel_config.py create mode 100644 tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/orchestration/ascendc_orch.cpp create mode 100644 tests/test_ascendc_compiler.py diff --git a/examples/scripts/code_runner.py b/examples/scripts/code_runner.py index b4d93c8a6..8c35debb6 100644 --- a/examples/scripts/code_runner.py +++ b/examples/scripts/code_runner.py @@ -794,7 +794,13 @@ def _compile_orchestration(): ) def _compile_one_kernel(kernel): - logger.info(f"Compiling kernel: {kernel['source']} (func_id={kernel['func_id']})") + compiler_type = kernel.get("compiler", "pto") + logger.info(f"Compiling kernel: {kernel['source']} " + f"(func_id={kernel['func_id']}, compiler={compiler_type})") + + if compiler_type == "ascendc": + return _compile_one_ascendc_kernel(kernel) + incore_o = kernel_compiler.compile_incore( kernel["source"], core_type=kernel["core_type"], @@ -808,6 +814,35 @@ def _compile_one_kernel(kernel): kernel_bin = extract_text_section(incore_o) return (kernel["func_id"], kernel_bin) + def _compile_one_ascendc_kernel(kernel): + from ascendc_compiler import AscendCCompiler, extract_kernel_artifacts + + ascendc = AscendCCompiler(platform=self.platform) + tiling_data = None + + kernel_meta_dir = kernel.get("kernel_meta_dir") + if kernel_meta_dir: + kernel_meta_dir = os.path.abspath(kernel_meta_dir) + _, tiling_data = extract_kernel_artifacts(kernel_meta_dir) + + tiling_path = kernel.get("tiling_data_path") + if tiling_path and os.path.isfile(tiling_path): + with open(tiling_path, 'rb') as f: + tiling_data = f.read() + + combined_o = ascendc.compile_ascendc_kernel( + ascendc_kernel_source=kernel["source"], + ascendc_kernel_symbol=kernel.get("ascendc_symbol", "ascendc_kernel"), + tensor_args=kernel.get("tensor_args", []), + tiling_data=tiling_data, + core_type=kernel.get("core_type", "aiv"), + has_workspace=kernel.get("has_workspace", False), + extra_include_dirs=runtime_include_dirs, + build_dir=self.build_dir, + ) + kernel_bin = extract_text_section(combined_o) + return (kernel["func_id"], kernel_bin) + # Launch all compilations concurrently max_workers = 2 + len(self.kernels) # runtime + orchestration + kernels with ThreadPoolExecutor(max_workers=max_workers) as executor: diff --git a/python/ascendc_compiler.py b/python/ascendc_compiler.py new file mode 100644 index 000000000..39eae9d49 --- /dev/null +++ b/python/ascendc_compiler.py @@ -0,0 +1,538 @@ +""" +AscendC Kernel Compiler for PTO Runtime Integration + +Compiles AscendC operator sources into PTO-compatible kernel binaries by +generating a merged source file that includes both the user's AscendC kernel +and a wrapper ``kernel_entry`` that bridges PTO's Tensor-pointer convention +to AscendC's raw GM-address convention. + +Workflow: + 1. Generate merged .cpp: kernel_entry wrapper (at offset 0) + #include user source + 2. Compile with AscendC toolchain flags (--cce-aicore-lang) → single .o + 3. Link with ld.lld (-e kernel_entry -Ttext=0) to resolve block-local relocations + 4. extract_text_section(linked_elf) → kernel binary (done by caller) + +The merged source suppresses ``__global__`` when including the user's kernel +source, so the compiler allows ``kernel_entry`` to call the user's kernel +function. kernel_entry is defined first in the source to ensure it sits at +.text offset 0 (PTO dispatches to offset 0). + +Usage: + compiler = AscendCCompiler(platform="a2a3") + kernel_o = compiler.compile_ascendc_kernel( + ascendc_kernel_source="/path/to/add_custom.cpp", + ascendc_kernel_symbol="add_custom", + tensor_args=[ + {"name": "x", "direction": "input"}, + {"name": "y", "direction": "input"}, + {"name": "z", "direction": "output"}, + ], + core_type="aiv", + ) + # caller does: kernel_bin = extract_text_section(kernel_o) +""" + +import json +import logging +import os +import subprocess +import tempfile +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +import env_manager +from toolchain import AscendCToolchain, CCECToolchain + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Wrapper source generation +# --------------------------------------------------------------------------- + +def generate_wrapper_source( + ascendc_kernel_symbol: str, + tensor_args: List[Dict[str, str]], + tiling_data: Optional[bytes] = None, + has_workspace: bool = False, +) -> str: + """Generate a C++ wrapper that bridges PTO kernel_entry to an AscendC kernel. + + The generated source: + - Defines ``kernel_entry(__gm__ int64_t* args)`` (PTO dispatch convention) + - Unpacks each ``args[i]`` as a ``Tensor*``, extracts ``buffer.addr`` + - Forwards the raw GM byte-addresses to the AscendC kernel function + - Embeds static tiling data as a ``constexpr`` array when provided + + Args: + ascendc_kernel_symbol: The ``extern "C"`` symbol name of the AscendC + kernel entry function in the compiled .o (e.g. ``"add_custom"``). + tensor_args: Ordered list of tensor descriptors. Each dict has: + ``name`` -- human-readable label (used in comments) + ``direction`` -- ``"input"`` | ``"output"`` | ``"inout"`` + tiling_data: Raw bytes of the tiling data blob. If *None*, the wrapper + passes a nullptr for the tiling parameter. + has_workspace: Whether the AscendC kernel expects a workspace pointer. + + Returns: + Complete C++ source string ready for ccec compilation. + """ + lines: List[str] = [] + + # --- header / includes --------------------------------------------------- + lines.append('#include "tensor.h"') + lines.append('') + + # --- embedded tiling data ------------------------------------------------ + if tiling_data is not None and len(tiling_data) > 0: + lines.append(f'static const uint8_t TILING_DATA[{len(tiling_data)}] = {{') + chunk = 16 + for i in range(0, len(tiling_data), chunk): + segment = tiling_data[i:i + chunk] + hex_segment = ', '.join(f'0x{b:02x}' for b in segment) + trailing = ',' if i + chunk < len(tiling_data) else '' + lines.append(f' {hex_segment}{trailing}') + lines.append('};') + lines.append('') + + # --- kernel_entry -------------------------------------------------------- + lines.append('extern "C" __aicore__ void kernel_entry(__gm__ int64_t* args) {') + + # Unpack Tensor* from args[] + for idx, ta in enumerate(tensor_args): + lines.append(f' __gm__ Tensor* t_{ta["name"]} = ' + f'reinterpret_cast<__gm__ Tensor*>(args[{idx}]);') + + lines.append('') + + # Extract raw GM byte-addresses + for ta in tensor_args: + lines.append( + f' __gm__ uint8_t* gm_{ta["name"]} = ' + f'reinterpret_cast<__gm__ uint8_t*>(t_{ta["name"]}->buffer.addr);' + ) + + lines.append('') + + # Build call arguments + call_args = [f'gm_{ta["name"]}' for ta in tensor_args] + if has_workspace: + call_args.append('nullptr') + if tiling_data is not None and len(tiling_data) > 0: + call_args.append('(__gm__ uint8_t*)TILING_DATA') + else: + call_args.append('nullptr') + call_str = ', '.join(call_args) + lines.append(f' {ascendc_kernel_symbol}({call_str});') + + lines.append('}') + lines.append('') + + return '\n'.join(lines) + + +def generate_merged_source( + ascendc_kernel_source: str, + ascendc_kernel_symbol: str, + tensor_args: List[Dict[str, str]], + tiling_data: Optional[bytes] = None, + has_workspace: bool = False, +) -> str: + """Generate a single-TU source: kernel_entry wrapper + #include user kernel. + + kernel_entry is defined FIRST so it sits at offset 0 in .text (PTO + dispatches to offset 0). The user's AscendC kernel source is included + AFTER, with ``__global__`` suppressed so the compiler treats the user's + kernel as a regular callable function rather than a top-level entry point. + + Compiled as one translation unit with AscendC flags, so there are no + cross-TU relocations to resolve. + """ + lines: List[str] = [] + + # Include kernel_operator.h first for AscendC types and intrinsics + lines.append('#include "kernel_operator.h"') + lines.append('#include "tensor.h"') + lines.append('') + + # Forward-declare user's kernel function (without __global__) + param_types = ['__gm__ uint8_t*'] * len(tensor_args) + if has_workspace: + param_types.append('__gm__ uint8_t*') + param_types.append('__gm__ uint8_t*') # tiling + fwd_params = ', '.join(param_types) + lines.append(f'extern "C" __aicore__ void {ascendc_kernel_symbol}({fwd_params});') + lines.append('') + + # Embedded tiling data + if tiling_data is not None and len(tiling_data) > 0: + lines.append(f'static const uint8_t TILING_DATA[{len(tiling_data)}] = {{') + chunk = 16 + for i in range(0, len(tiling_data), chunk): + segment = tiling_data[i:i + chunk] + hex_segment = ', '.join(f'0x{b:02x}' for b in segment) + trailing = ',' if i + chunk < len(tiling_data) else '' + lines.append(f' {hex_segment}{trailing}') + lines.append('};') + lines.append('') + + # kernel_entry wrapper — defined FIRST so it's at .text offset 0 + # NOT __global__ — the PTO executor calls kernel functions via function + # pointers using the standard calling convention, not the __global__ entry + # convention. + lines.append('extern "C" __aicore__ void kernel_entry(__gm__ int64_t* args) {') + + for idx, ta in enumerate(tensor_args): + lines.append(f' __gm__ Tensor* t_{ta["name"]} = ' + f'reinterpret_cast<__gm__ Tensor*>(args[{idx}]);') + lines.append('') + + for ta in tensor_args: + lines.append( + f' __gm__ uint8_t* gm_{ta["name"]} = ' + f'reinterpret_cast<__gm__ uint8_t*>(t_{ta["name"]}->buffer.addr);' + ) + lines.append('') + + call_args = [f'gm_{ta["name"]}' for ta in tensor_args] + if has_workspace: + call_args.append('nullptr') + if tiling_data is not None and len(tiling_data) > 0: + call_args.append('(__gm__ uint8_t*)TILING_DATA') + else: + call_args.append('nullptr') + call_str = ', '.join(call_args) + lines.append(f' {ascendc_kernel_symbol}({call_str});') + + lines.append('}') + lines.append('') + + # Include user's AscendC kernel source AFTER kernel_entry. + # Suppress __global__ so user's kernel becomes a regular __aicore__ function + # (the compiler forbids calling __global__ functions from other functions). + lines.append('#define __global__') + lines.append(f'#include "{ascendc_kernel_source}"') + lines.append('#undef __global__') + lines.append('') + + return '\n'.join(lines) + + +# --------------------------------------------------------------------------- +# AscendC compilation helpers +# --------------------------------------------------------------------------- + +def extract_kernel_artifacts( + kernel_meta_dir: str, +) -> Tuple[Optional[bytes], Optional[bytes]]: + """Extract compiled kernel .o and tiling data from an AscendC kernel_meta dir. + + The kernel_meta directory is produced by ``npu_op_kernel_options`` with + ``--save-temp-files``. It typically contains: + - ``*.o`` -- the compiled AICore kernel binary + - ``*.json`` -- metadata including tiling information + + Args: + kernel_meta_dir: Absolute path to a kernel_meta directory + + Returns: + (kernel_o_bytes, tiling_bytes) -- either may be None if not found + """ + meta_dir = Path(kernel_meta_dir) + if not meta_dir.is_dir(): + raise FileNotFoundError(f"kernel_meta directory not found: {kernel_meta_dir}") + + kernel_o_bytes: Optional[bytes] = None + tiling_bytes: Optional[bytes] = None + + # Find the .o file (there should be exactly one) + o_files = sorted(meta_dir.glob("*.o")) + if o_files: + with open(o_files[0], 'rb') as f: + kernel_o_bytes = f.read() + logger.info(f"[AscendC] Loaded kernel .o: {o_files[0]} ({len(kernel_o_bytes)} bytes)") + + # Look for tiling data -- could be a .bin or embedded in a .json + tiling_bins = sorted(meta_dir.glob("*tiling*.bin")) + if tiling_bins: + with open(tiling_bins[0], 'rb') as f: + tiling_bytes = f.read() + logger.info(f"[AscendC] Loaded tiling data: {tiling_bins[0]} ({len(tiling_bytes)} bytes)") + else: + json_files = sorted(meta_dir.glob("*.json")) + for jf in json_files: + try: + with open(jf) as f: + meta = json.load(f) + if "tiling_data" in meta: + td = meta["tiling_data"] + if isinstance(td, str): + tiling_bytes = bytes.fromhex(td) + elif isinstance(td, list): + tiling_bytes = bytes(td) + logger.info(f"[AscendC] Extracted tiling from {jf.name}") + break + except (json.JSONDecodeError, KeyError, ValueError): + continue + + return kernel_o_bytes, tiling_bytes + + +# --------------------------------------------------------------------------- +# Main compiler class +# --------------------------------------------------------------------------- + +class AscendCCompiler: + """Compile AscendC operators into PTO-compatible kernel binaries. + + Typical usage:: + + compiler = AscendCCompiler(platform="a2a3") + kernel_o = compiler.compile_ascendc_kernel( + ascendc_kernel_source="path/to/add_custom.cpp", + ascendc_kernel_symbol="add_custom", + tensor_args=[ + {"name": "x", "direction": "input"}, + {"name": "y", "direction": "input"}, + {"name": "z", "direction": "output"}, + ], + core_type="aiv", + ) + kernel_bin = extract_text_section(kernel_o) + """ + + def __init__(self, platform: str = "a2a3"): + self.platform = platform + self.project_root = Path(__file__).parent.parent + + if platform in ("a2a3", "a2a3sim"): + self.platform_dir = self.project_root / "src" / "a2a3" / "platform" + elif platform in ("a5", "a5sim"): + self.platform_dir = self.project_root / "src" / "a5" / "platform" + else: + raise ValueError(f"Unknown platform: {platform}") + + if platform in ("a2a3", "a5"): + env_manager.ensure("ASCEND_HOME_PATH") + self.ascendc_toolchain = AscendCToolchain(platform) + self.pto_toolchain = CCECToolchain(platform) + else: + self.ascendc_toolchain = None + self.pto_toolchain = None + + def _get_runtime_include_dir(self, runtime_name: str = "tensormap_and_ringbuffer") -> str: + arch = "a2a3" if self.platform in ("a2a3", "a2a3sim") else "a5" + return str(self.project_root / "src" / arch / "runtime" / runtime_name / "runtime") + + def compile_ascendc_kernel( + self, + ascendc_kernel_source: Optional[str] = None, + ascendc_kernel_o: Optional[bytes] = None, + ascendc_kernel_symbol: str = "ascendc_kernel", + tensor_args: Optional[List[Dict[str, str]]] = None, + tiling_data: Optional[bytes] = None, + core_type: str = "aiv", + has_workspace: bool = False, + extra_include_dirs: Optional[List[str]] = None, + build_dir: Optional[str] = None, + ) -> bytes: + """Compile AscendC kernel into a PTO-compatible .o (single-TU). + + Generates a merged source file that ``#include``s the user's AscendC + kernel and appends a ``kernel_entry`` wrapper, then compiles everything + as one translation unit with AscendC flags. + + The caller should run ``extract_text_section(result)`` on the output. + + Args: + ascendc_kernel_source: Path to AscendC kernel ``.cpp`` source. + ascendc_kernel_o: Pre-compiled AscendC kernel ``.o`` bytes. + If provided, a two-step compile+link is used instead. + ascendc_kernel_symbol: ``extern "C"`` symbol of the AscendC kernel. + tensor_args: Ordered tensor descriptor list for wrapper generation. + tiling_data: Static tiling data blob (None = no tiling). + core_type: ``"aiv"`` (vector) or ``"aic"`` (cube). + has_workspace: Whether AscendC kernel expects a workspace pointer. + extra_include_dirs: Extra -I paths for compilation. + build_dir: Working directory for intermediates. + + Returns: + Compiled .o bytes. + """ + if self.ascendc_toolchain is None: + raise RuntimeError( + "AscendC kernel compilation requires a hardware platform (a2a3/a5). " + "Simulation platforms are not supported." + ) + + if ascendc_kernel_source is None and ascendc_kernel_o is None: + raise ValueError("Provide either ascendc_kernel_source or ascendc_kernel_o") + if ascendc_kernel_source is not None and ascendc_kernel_o is not None: + raise ValueError("Provide only one of ascendc_kernel_source or ascendc_kernel_o") + + if tensor_args is None: + tensor_args = [] + + work_dir = build_dir or tempfile.mkdtemp(prefix="ascendc_build_") + os.makedirs(work_dir, exist_ok=True) + + if ascendc_kernel_o is not None: + return self._compile_with_prebuilt_o( + ascendc_kernel_o, ascendc_kernel_symbol, tensor_args, + tiling_data, core_type, has_workspace, extra_include_dirs, work_dir, + ) + + # --- Single-TU approach: merge kernel source + wrapper --- + ascendc_kernel_source = os.path.abspath(ascendc_kernel_source) + + merged_src = generate_merged_source( + ascendc_kernel_source=ascendc_kernel_source, + ascendc_kernel_symbol=ascendc_kernel_symbol, + tensor_args=tensor_args, + tiling_data=tiling_data, + has_workspace=has_workspace, + ) + + merged_cpp_path = os.path.join(work_dir, "ascendc_merged.cpp") + with open(merged_cpp_path, 'w') as f: + f.write(merged_src) + logger.info(f"[AscendC] Generated merged source: {merged_cpp_path}") + + output_o_path = os.path.join(work_dir, "ascendc_kernel.o") + + # Build include dirs: AscendC SDK + runtime (for tensor.h) + source dir + ascendc_includes = self.ascendc_toolchain.get_ascendc_include_dirs() + ascendc_includes.append(self._get_runtime_include_dir()) + ascendc_includes.append(os.path.dirname(ascendc_kernel_source)) + if extra_include_dirs: + ascendc_includes.extend( + os.path.abspath(d) for d in extra_include_dirs + ) + + compile_cmd = [self.ascendc_toolchain.cxx_path] + compile_cmd += self.ascendc_toolchain.get_compile_flags(core_type=core_type) + for inc in ascendc_includes: + compile_cmd.append(f"-I{inc}") + compile_cmd.extend(["-o", output_o_path, merged_cpp_path]) + + logger.info(f"[AscendC] Compiling AscendC kernel: {ascendc_kernel_source}") + logger.debug(f" Command: {' '.join(compile_cmd)}") + self._run(compile_cmd, "AscendC-Compile") + + # Link to resolve block-local relocations (R_AICORE_BLOCK_LOCAL_OFFSET12 + # for g_vecTPipePtr etc.). extract_text_section extracts raw bytes without + # applying relocations, so we must link first. + linked_path = os.path.join(work_dir, "ascendc_linked.elf") + link_cmd = [ + self.pto_toolchain.linker_path, + "-e", "kernel_entry", "-Ttext=0", + "-o", linked_path, output_o_path, + ] + logger.info("[AscendC] Linking to resolve block-local relocations") + self._run(link_cmd, "AscendC-Link") + + with open(linked_path, 'rb') as f: + result_bytes = f.read() + + logger.info(f"[AscendC] Linked binary: {len(result_bytes)} bytes") + return result_bytes + + def _compile_with_prebuilt_o( + self, + ascendc_kernel_o: bytes, + ascendc_kernel_symbol: str, + tensor_args: List[Dict[str, str]], + tiling_data: Optional[bytes], + core_type: str, + has_workspace: bool, + extra_include_dirs: Optional[List[str]], + work_dir: str, + ) -> bytes: + """Fallback: pre-compiled .o needs wrapper + link.""" + kernel_o_path = os.path.join(work_dir, "ascendc_kernel.o") + with open(kernel_o_path, 'wb') as f: + f.write(ascendc_kernel_o) + logger.info(f"[AscendC] Using pre-compiled kernel .o ({len(ascendc_kernel_o)} bytes)") + + # Generate + compile PTO wrapper with AscendC flags + wrapper_src = generate_wrapper_source( + ascendc_kernel_symbol=ascendc_kernel_symbol, + tensor_args=tensor_args, + tiling_data=tiling_data, + has_workspace=has_workspace, + ) + + wrapper_cpp_path = os.path.join(work_dir, "ascendc_wrapper.cpp") + with open(wrapper_cpp_path, 'w') as f: + f.write(wrapper_src) + + wrapper_o_path = os.path.join(work_dir, "ascendc_wrapper.o") + wrapper_includes = [self._get_runtime_include_dir()] + if extra_include_dirs: + wrapper_includes.extend(extra_include_dirs) + + wrapper_cmd = [self.ascendc_toolchain.cxx_path] + wrapper_cmd += self.ascendc_toolchain.get_compile_flags(core_type=core_type) + for inc in wrapper_includes: + wrapper_cmd.append(f"-I{os.path.abspath(inc)}") + wrapper_cmd.extend(["-o", wrapper_o_path, wrapper_cpp_path]) + self._run(wrapper_cmd, "AscendC-Wrapper") + + # Full link to resolve cross-TU calls + combined_o_path = os.path.join(work_dir, "ascendc_combined.o") + link_cmd = [ + self.pto_toolchain.linker_path, + "-e", "kernel_entry", "-Ttext=0", + "-o", combined_o_path, + wrapper_o_path, kernel_o_path, + ] + self._run(link_cmd, "AscendC-Link") + + with open(combined_o_path, 'rb') as f: + result_bytes = f.read() + + logger.info(f"[AscendC] Combined binary: {len(result_bytes)} bytes") + return result_bytes + + def compile_from_kernel_meta( + self, + kernel_meta_dir: str, + ascendc_kernel_source: str, + ascendc_kernel_symbol: str, + tensor_args: List[Dict[str, str]], + core_type: str = "aiv", + has_workspace: bool = False, + extra_include_dirs: Optional[List[str]] = None, + build_dir: Optional[str] = None, + ) -> bytes: + """Convenience: extract tiling from kernel_meta, then compile source.""" + _, tiling_data = extract_kernel_artifacts(kernel_meta_dir) + + return self.compile_ascendc_kernel( + ascendc_kernel_source=ascendc_kernel_source, + ascendc_kernel_symbol=ascendc_kernel_symbol, + tensor_args=tensor_args, + tiling_data=tiling_data, + core_type=core_type, + has_workspace=has_workspace, + extra_include_dirs=extra_include_dirs, + build_dir=build_dir, + ) + + @staticmethod + def _run(cmd: List[str], label: str) -> subprocess.CompletedProcess: + """Run a subprocess with logging and error handling.""" + logger.debug(f"[{label}] {' '.join(cmd)}") + try: + result = subprocess.run(cmd, capture_output=True, text=True) + if result.stdout: + logger.debug(f"[{label}] stdout:\n{result.stdout}") + if result.stderr: + logger.debug(f"[{label}] stderr:\n{result.stderr}") + if result.returncode != 0: + raise RuntimeError( + f"{label} failed (exit {result.returncode}):\n{result.stderr}" + ) + return result + except FileNotFoundError as exc: + raise RuntimeError(f"{label}: tool not found -- {exc}") from exc diff --git a/python/toolchain.py b/python/toolchain.py index d6bc795be..185eb6093 100644 --- a/python/toolchain.py +++ b/python/toolchain.py @@ -102,6 +102,71 @@ def get_cmake_args(self) -> List[str]: ] +class AscendCToolchain(CCECToolchain): + """ccec compiler configured for AscendC kernel compilation. + + AscendC uses a different dialect (--cce-aicore-lang) than PTO-ISA (-x cce), + plus automatic synchronization (--cce-auto-sync) and the full AscendC SDK + include tree. + """ + + def get_compile_flags(self, core_type: str = "aiv", **kwargs) -> List[str]: + if self.platform in ("a5", "a5sim"): + arch = "dav-c310-vec" if core_type == "aiv" else "dav-c310-cube" + elif self.platform in ("a2a3", "a2a3sim"): + arch = "dav-c220-vec" if core_type == "aiv" else "dav-c220-cube" + else: + raise ValueError(f"Unknown platform: {self.platform}") + + flags = [ + "-c", "-O3", "-std=c++17", + "--cce-aicore-lang", + "-DTILING_KEY_VAR=0", + "--cce-aicore-only", + f"--cce-aicore-arch={arch}", + "--cce-auto-sync", + "-mllvm", "-cce-aicore-stack-size=0x8000", + "-mllvm", "-cce-aicore-function-stack-size=0x8000", + "-mllvm", "-cce-aicore-record-overflow=false", + ] + + # Force-include SDK version header if present + version_header = os.path.join( + self.ascend_home_path, "include", "ascendc", "asc_devkit_version.h" + ) + if os.path.isfile(version_header): + flags.extend(["-include", version_header]) + + return flags + + def get_ascendc_include_dirs(self) -> List[str]: + """Return AscendC SDK include directories (from bisheng_intf.cmake).""" + dk = os.path.join(self.ascend_home_path, "aarch64-linux") + asc = os.path.join(dk, "asc") + tikcfw = os.path.join(dk, "tikcpp", "tikcfw") + + dirs = [ + os.path.join(asc, "impl", "adv_api"), + os.path.join(asc, "impl", "basic_api"), + os.path.join(asc, "impl", "c_api"), + os.path.join(asc, "impl", "micro_api"), + os.path.join(asc, "impl", "simt_api"), + os.path.join(asc, "impl", "utils"), + os.path.join(asc, "include"), + os.path.join(asc, "include", "adv_api"), + os.path.join(asc, "include", "basic_api"), + os.path.join(asc, "include", "aicpu_api"), + os.path.join(asc, "include", "c_api"), + os.path.join(asc, "include", "micro_api"), + os.path.join(asc, "include", "simt_api"), + os.path.join(asc, "include", "utils"), + tikcfw, + os.path.join(tikcfw, "interface"), + os.path.join(tikcfw, "impl"), + ] + return [d for d in dirs if os.path.isdir(d)] + + class Gxx15Toolchain(Toolchain): """g++-15 compiler for simulation kernels.""" diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/README.md b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/README.md new file mode 100644 index 000000000..66d0c1d94 --- /dev/null +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/README.md @@ -0,0 +1,96 @@ +# AscendC Vector Example + +Demonstrates integrating an AscendC operator into the PTO tensormap_and_ringbuffer runtime. + +## Overview + +This device test runs two tasks on a single blockdim (2 AIV cores): + +1. **t0**: `z = add_custom(x, y)` -- AscendC AddCustom operator (func_id=0) +2. **t1**: `w = mul(z, z)` -- PTO-native kernel_mul (func_id=1) + +The AscendC kernel source is compiled directly by simpler's `AscendCCompiler` using the +CANN SDK's ccec compiler with `--cce-aicore-lang` flags, then linked with a generated PTO +wrapper to match PTO's `kernel_entry(int64_t* args)` convention. + +## Prerequisites + +- Ascend CANN SDK with ccec compiler (hardware platform only, no sim support) + +## How to Run + +```bash +python examples/scripts/run_example.py \ + -k tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels \ + -g tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/golden.py \ + -p a2a3 -d +``` + +Or via CI (device tests only): +```bash +./ci.sh -p a2a3 -d +``` + +## Architecture + +``` + AscendC source (.cpp) PTO wrapper (.cpp) + uses kernel_operator.h uses tensor.h + │ │ + ▼ ▼ + ccec --cce-aicore-lang ccec -x cce (PTO-ISA) + (AscendC toolchain flags) (PTO toolchain flags) + │ │ + ▼ ▼ + kernel.o wrapper.o + │ │ + └──────────┬─────────────────────────┘ + ▼ + ld.lld -r wrapper.o kernel.o + │ + ▼ + combined.o + │ + ▼ + extract_text_section + │ + ▼ + kernel binary + │ + ┌──────────────────▼───────────────────────────┐ + │ PTO Runtime │ + │ kernel_binaries = [(0, ascendc_bin), │ + │ (1, mul_bin)] │ + │ │ + │ Orchestration: │ + │ pto2_rt_submit_aiv_task(rt, 0, params) │ + │ pto2_rt_submit_aiv_task(rt, 1, params) │ + └───────────────────────────────────────────────┘ +``` + +Key: wrapper.o is listed first in the link command so `kernel_entry` lands +at `.text` offset 0 (PTO dispatch jumps to offset 0). + +## Configuration + +- `block_dim = 1`: Single blockdim (1 AIC + 2 AIV). Only AIV used. +- `aicpu_thread_num = 3`: 2 schedulers + 1 orchestrator. +- `compiler = "ascendc"`: Tells CodeRunner to use AscendCCompiler for this kernel. + +## kernel_config.py Schema for AscendC Kernels + +```python +{ + "func_id": 0, + "source": "path/to/ascendc_kernel.cpp", + "core_type": "aiv", + "compiler": "ascendc", + "ascendc_symbol": "add_custom", # extern "C" symbol name + "tensor_args": [ # ordered tensor descriptors + {"name": "x", "direction": "input"}, + {"name": "y", "direction": "input"}, + {"name": "z", "direction": "output"}, + ], + "has_workspace": False, # workspace pointer (optional) +} +``` diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/golden.py b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/golden.py new file mode 100644 index 000000000..fe1aa700a --- /dev/null +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/golden.py @@ -0,0 +1,52 @@ +""" +Golden script for AscendC vector example. + +Computation: + z = x + y (AscendC AddCustom) + w = z * z (PTO kernel_mul) + +With x=2.0, y=3.0: + z = 5.0 + w = 25.0 + +Args layout: [ptr_x, ptr_y, ptr_z, ptr_w, size_x, size_y, size_z, size_w, SIZE] +""" + +import ctypes +import torch + +__outputs__ = ["z", "w"] + +RTOL = 1e-5 +ATOL = 1e-5 + + +def generate_inputs(params: dict) -> list: + ROWS = 128 + COLS = 128 + SIZE = ROWS * COLS + + x = torch.full((SIZE,), 2.0, dtype=torch.float32) + y = torch.full((SIZE,), 3.0, dtype=torch.float32) + z = torch.zeros(SIZE, dtype=torch.float32) + w = torch.zeros(SIZE, dtype=torch.float32) + + return [ + ("x", x), + ("y", y), + ("z", z), + ("w", w), + ("size_x", ctypes.c_int64(x.nbytes)), + ("size_y", ctypes.c_int64(y.nbytes)), + ("size_z", ctypes.c_int64(z.nbytes)), + ("size_w", ctypes.c_int64(w.nbytes)), + ("SIZE", ctypes.c_int64(SIZE)), + ] + + +def compute_golden(tensors: dict, params: dict) -> None: + x = torch.as_tensor(tensors["x"]) + y = torch.as_tensor(tensors["y"]) + z_val = x + y + tensors["z"][:] = z_val + tensors["w"][:] = z_val * z_val diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/aiv/kernel_mul.cpp b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/aiv/kernel_mul.cpp new file mode 100644 index 000000000..51d81036e --- /dev/null +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/aiv/kernel_mul.cpp @@ -0,0 +1,79 @@ +/** + * Element-wise Tensor Multiplication Kernel + * + * Implements: out[i] = src0[i] * src1[i] + * + * This kernel performs element-wise multiplication of two tensors. It's + * compiled separately as a standalone kernel and linked with the dispatcher + * using function pointers, demonstrating the separation pattern used in + * production systems where kernel binaries are loaded dynamically. + */ + +#include +#include + +#include "tensor.h" + +using namespace pto; + +#ifndef __gm__ +#define __gm__ +#endif + +#ifndef __aicore__ +#define __aicore__ [aicore] +#endif + +/** + * Element-wise multiplication kernel implementation + * + * Unified signature: all arguments passed via int64_t array + * @param args Argument array: + * args[0] = src0 pointer (first input tensor) + * args[1] = src1 pointer (second input tensor) + * args[2] = out pointer (output tensor) + * args[3] = size (number of elements) + */ +extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t* args) { + // Unpack arguments (Tensor* pointers from runtime) + __gm__ Tensor* src0_tensor = reinterpret_cast<__gm__ Tensor*>(args[0]); + __gm__ Tensor* src1_tensor = reinterpret_cast<__gm__ Tensor*>(args[1]); + __gm__ Tensor* out_tensor = reinterpret_cast<__gm__ Tensor*>(args[2]); + __gm__ float* src0 = reinterpret_cast<__gm__ float*>(src0_tensor->buffer.addr) + src0_tensor->start_offset; + __gm__ float* src1 = reinterpret_cast<__gm__ float*>(src1_tensor->buffer.addr) + src1_tensor->start_offset; + __gm__ float* out = reinterpret_cast<__gm__ float*>(out_tensor->buffer.addr) + out_tensor->start_offset; + + // Configuration: float, 128, 128, 128, 128 + constexpr int kTRows_ = 128; + constexpr int kTCols_ = 128; + constexpr int vRows = 128; + constexpr int vCols = 128; + + using DynShapeDim5 = Shape<1, 1, 1, vRows, vCols>; + using DynStridDim5 = Stride<1, 1, 1, kTCols_, 1>; + using GlobalData = GlobalTensor; + using TileData = Tile; + + TileData src0Tile(vRows, vCols); + TileData src1Tile(vRows, vCols); + TileData dstTile(vRows, vCols); + TASSIGN(src0Tile, 0x0); + TASSIGN(src1Tile, 0x10000); + TASSIGN(dstTile, 0x20000); + + GlobalData src0Global(src0); + GlobalData src1Global(src1); + GlobalData dstGlobal(out); + + TLOAD(src0Tile, src0Global); + TLOAD(src1Tile, src1Global); + set_flag(PIPE_MTE2, PIPE_V, EVENT_ID0); + wait_flag(PIPE_MTE2, PIPE_V, EVENT_ID0); + TMUL(dstTile, src0Tile, src1Tile); + set_flag(PIPE_V, PIPE_MTE3, EVENT_ID0); + wait_flag(PIPE_V, PIPE_MTE3, EVENT_ID0); + TSTORE(dstGlobal, dstTile); + + set_flag(PIPE_MTE3, PIPE_S, EVENT_ID7); + wait_flag(PIPE_MTE3, PIPE_S, EVENT_ID7); +} diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.cpp b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.cpp new file mode 100644 index 000000000..1a086deeb --- /dev/null +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.cpp @@ -0,0 +1,90 @@ +/** + * AscendC AddCustom Operator + * + * Element-wise addition: z[i] = x[i] + y[i] + * + * Uses AscendC double-buffered pipeline with CopyIn/Compute/CopyOut stages. + * Static tiling: totalLength=16384 elements, tileNum=8 tiles. + * + * Calling convention (extern "C"): + * void add_custom(__gm__ uint8_t* x, __gm__ uint8_t* y, __gm__ uint8_t* z, + * __gm__ uint8_t* workspace, __gm__ uint8_t* tiling); + */ + +#include "kernel_operator.h" + +using namespace AscendC; + +constexpr int32_t BUFFER_NUM = 2; + +class KernelAdd { +public: + __aicore__ inline KernelAdd() {} + + __aicore__ inline void Init(__gm__ uint8_t* x, __gm__ uint8_t* y, + __gm__ uint8_t* z, uint32_t totalLength, + uint32_t tileNum) { + this->blockLength = totalLength; + this->tileNum = tileNum; + this->tileLength = totalLength / tileNum; + + xGm.SetGlobalBuffer((__gm__ float*)x, this->blockLength); + yGm.SetGlobalBuffer((__gm__ float*)y, this->blockLength); + zGm.SetGlobalBuffer((__gm__ float*)z, this->blockLength); + + pipe.InitBuffer(inQueueX, BUFFER_NUM, this->tileLength * sizeof(float)); + pipe.InitBuffer(inQueueY, BUFFER_NUM, this->tileLength * sizeof(float)); + pipe.InitBuffer(outQueueZ, BUFFER_NUM, this->tileLength * sizeof(float)); + } + + __aicore__ inline void Process() { + for (int32_t i = 0; i < this->tileNum; i++) { + CopyIn(i); + Compute(i); + CopyOut(i); + } + } + +private: + __aicore__ inline void CopyIn(int32_t progress) { + LocalTensor xLocal = inQueueX.AllocTensor(); + LocalTensor yLocal = inQueueY.AllocTensor(); + DataCopy(xLocal, xGm[progress * this->tileLength], this->tileLength); + DataCopy(yLocal, yGm[progress * this->tileLength], this->tileLength); + inQueueX.EnQue(xLocal); + inQueueY.EnQue(yLocal); + } + + __aicore__ inline void Compute(int32_t progress) { + LocalTensor xLocal = inQueueX.DeQue(); + LocalTensor yLocal = inQueueY.DeQue(); + LocalTensor zLocal = outQueueZ.AllocTensor(); + Add(zLocal, xLocal, yLocal, this->tileLength); + outQueueZ.EnQue(zLocal); + inQueueX.FreeTensor(xLocal); + inQueueY.FreeTensor(yLocal); + } + + __aicore__ inline void CopyOut(int32_t progress) { + LocalTensor zLocal = outQueueZ.DeQue(); + DataCopy(zGm[progress * this->tileLength], zLocal, this->tileLength); + outQueueZ.FreeTensor(zLocal); + } + + TPipe pipe; + TQue inQueueX, inQueueY; + TQue outQueueZ; + GlobalTensor xGm, yGm, zGm; + uint32_t blockLength; + uint32_t tileNum; + uint32_t tileLength; +}; + +extern "C" __global__ __aicore__ void add_custom( + __gm__ uint8_t* x, __gm__ uint8_t* y, __gm__ uint8_t* z, + __gm__ uint8_t* workspace, __gm__ uint8_t* tiling) +{ + KernelAdd op; + op.Init(x, y, z, 16384, 8); + op.Process(); +} diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/kernel_config.py b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/kernel_config.py new file mode 100644 index 000000000..b7c176df3 --- /dev/null +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/kernel_config.py @@ -0,0 +1,52 @@ +""" +AscendC Vector Example — kernel_config.py + +Demonstrates integrating an AscendC operator (add_custom) into the PTO +tensormap_and_ringbuffer runtime via single-TU compile + link. + +The AscendC kernel source is merged with a kernel_entry wrapper into a single +translation unit, compiled with AscendC flags, then linked to resolve +block-local relocations. + +Computation: + z = x + y (AscendC add_custom, func_id=0) + w = z * z (PTO kernel_mul, func_id=1) + +The orchestration submits two AIV tasks in sequence. +""" + +from pathlib import Path + +_KERNELS_ROOT = Path(__file__).parent + +ORCHESTRATION = { + "source": str(_KERNELS_ROOT / "orchestration" / "ascendc_orch.cpp"), + "function_name": "aicpu_orchestration_entry", +} + +KERNELS = [ + { + "func_id": 0, + "source": str(_KERNELS_ROOT / "ascendc" / "add_custom.cpp"), + "core_type": "aiv", + "compiler": "ascendc", + "ascendc_symbol": "add_custom", + "tensor_args": [ + {"name": "x", "direction": "input"}, + {"name": "y", "direction": "input"}, + {"name": "z", "direction": "output"}, + ], + "has_workspace": True, + }, + { + "func_id": 1, + "source": str(_KERNELS_ROOT / "aiv" / "kernel_mul.cpp"), + "core_type": "aiv", + }, +] + +RUNTIME_CONFIG = { + "runtime": "tensormap_and_ringbuffer", + "aicpu_thread_num": 3, + "block_dim": 2, +} diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/orchestration/ascendc_orch.cpp b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/orchestration/ascendc_orch.cpp new file mode 100644 index 000000000..fb4941860 --- /dev/null +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/orchestration/ascendc_orch.cpp @@ -0,0 +1,89 @@ +/** + * AscendC Vector Example — Device Orchestration + * + * Demonstrates calling an AscendC kernel (AddCustom, func_id=0) and a + * PTO-native kernel (kernel_mul, func_id=1) from the same orchestration. + * + * DAG: + * t0: z = add_custom(x, y) (AscendC, func_id=0) + * t1: w = mul(z, z) (PTO, func_id=1) + * Dependencies: t0 -> t1 + * + * Both kernels run as AIV (vector) tasks on a single blockdim (2V). + * + * Args layout (from golden.py): + * [ptr_x, ptr_y, ptr_z, ptr_w, size_x, size_y, size_z, size_w, SIZE] + */ + +#include +#include + +#include "pto_orchestration_api.h" + +#define FUNC_ADD_CUSTOM 0 +#define FUNC_MUL 1 + +#define ARG_PTR_X 0 +#define ARG_PTR_Y 1 +#define ARG_PTR_Z 2 +#define ARG_PTR_W 3 +#define ARG_SIZE_X 4 +#define ARG_SIZE_Y 5 +#define ARG_SIZE_Z 6 +#define ARG_SIZE_W 7 +#define ARG_SIZE 8 + +extern "C" { + +__attribute__((visibility("default"))) +PTO2OrchestrationConfig aicpu_orchestration_config(uint64_t* args, int arg_count) { + (void)args; + (void)arg_count; + return PTO2OrchestrationConfig{ + .expected_arg_count = 9, + }; +} + +__attribute__((visibility("default"))) +void aicpu_orchestration_entry(PTO2Runtime* rt, uint64_t* args, int arg_count, + int orch_thread_num, int orch_thread_index) { + (void)arg_count; + (void)orch_thread_num; + (void)orch_thread_index; + + void* ptr_x = (void*)(uintptr_t)args[ARG_PTR_X]; + void* ptr_y = (void*)(uintptr_t)args[ARG_PTR_Y]; + void* ptr_z = (void*)(uintptr_t)args[ARG_PTR_Z]; + void* ptr_w = (void*)(uintptr_t)args[ARG_PTR_W]; + int SIZE = (int)(args[ARG_SIZE] & 0x7FFFFFFF); + + LOG_INFO(rt, "[ascendc_orch] SIZE=%d", SIZE); + + uint32_t shapes[1] = {(uint32_t)SIZE}; + Tensor ext_x = make_tensor_external(ptr_x, shapes, 1, DataType::FLOAT32); + Tensor ext_y = make_tensor_external(ptr_y, shapes, 1, DataType::FLOAT32); + Tensor ext_z = make_tensor_external(ptr_z, shapes, 1, DataType::FLOAT32); + Tensor ext_w = make_tensor_external(ptr_w, shapes, 1, DataType::FLOAT32); + + // t0: z = add_custom(x, y) — AscendC kernel via PTO dispatch + { + PTOParam params_t0; + params_t0.add_input(ext_x); + params_t0.add_input(ext_y); + params_t0.add_output(ext_z); + pto2_rt_submit_aiv_task(rt, FUNC_ADD_CUSTOM, params_t0); + } + + // t1: w = mul(z, z) — PTO-native kernel + { + PTOParam params_t1; + params_t1.add_input(ext_z); + params_t1.add_input(ext_z); + params_t1.add_output(ext_w); + pto2_rt_submit_aiv_task(rt, FUNC_MUL, params_t1); + } + + LOG_INFO(rt, "[ascendc_orch] Submitted 2 tasks"); +} + +} // extern "C" diff --git a/tests/test_ascendc_compiler.py b/tests/test_ascendc_compiler.py new file mode 100644 index 000000000..0ad02a008 --- /dev/null +++ b/tests/test_ascendc_compiler.py @@ -0,0 +1,371 @@ +"""Tests for AscendC compiler module (python/ascendc_compiler.py).""" + +import sys +from pathlib import Path + +import pytest + +PROJECT_ROOT = Path(__file__).parent.parent +sys.path.insert(0, str(PROJECT_ROOT / "python")) + + +class TestGenerateWrapperSource: + """Test wrapper C++ source generation.""" + + def _gen(self, **kwargs): + from ascendc_compiler import generate_wrapper_source + return generate_wrapper_source(**kwargs) + + def test_basic_three_tensors(self): + """Generate wrapper for a simple 3-tensor AscendC kernel (x, y, z).""" + src = self._gen( + ascendc_kernel_symbol="add_custom", + tensor_args=[ + {"name": "x", "direction": "input"}, + {"name": "y", "direction": "input"}, + {"name": "z", "direction": "output"}, + ], + ) + assert "add_custom(" in src + assert "kernel_entry" in src + assert "t_x" in src and "t_y" in src and "t_z" in src + assert "gm_x" in src and "gm_y" in src and "gm_z" in src + assert "nullptr" in src + + def test_with_static_tiling_data(self): + """Tiling data bytes are embedded as a const array.""" + tiling = bytes([0x10, 0x27, 0x00, 0x00]) + src = self._gen( + ascendc_kernel_symbol="my_op", + tensor_args=[ + {"name": "a", "direction": "input"}, + {"name": "b", "direction": "output"}, + ], + tiling_data=tiling, + ) + assert "TILING_DATA[4]" in src + assert "0x10" in src and "0x27" in src + assert "(__gm__ uint8_t*)TILING_DATA" in src + call_lines = [l for l in src.splitlines() + if "my_op(" in l and "extern" not in l] + assert len(call_lines) == 1 + assert "TILING_DATA" in call_lines[0] + + def test_with_workspace(self): + """Workspace parameter adds an extra nullptr in the call.""" + src = self._gen( + ascendc_kernel_symbol="op", + tensor_args=[{"name": "a", "direction": "input"}], + has_workspace=True, + ) + # Call should have: gm_a, nullptr (workspace), nullptr (tiling) + call_line = [l for l in src.splitlines() if "op(" in l and "extern" not in l][0] + assert call_line.count("nullptr") == 2 + + def test_empty_tiling_treated_as_none(self): + """Empty tiling bytes => no TILING_DATA array, nullptr passed.""" + src = self._gen( + ascendc_kernel_symbol="op", + tensor_args=[{"name": "a", "direction": "input"}], + tiling_data=b"", + ) + assert "TILING_DATA" not in src + assert "nullptr" in src + + def test_includes_tensor_header(self): + """Wrapper source includes tensor.h for Tensor struct access.""" + src = self._gen( + ascendc_kernel_symbol="op", + tensor_args=[{"name": "a", "direction": "input"}], + ) + assert '#include "tensor.h"' in src + + def test_correct_arg_indexing(self): + """Each tensor is unpacked from the correct args index.""" + src = self._gen( + ascendc_kernel_symbol="op", + tensor_args=[ + {"name": "a", "direction": "input"}, + {"name": "b", "direction": "input"}, + {"name": "c", "direction": "input"}, + {"name": "d", "direction": "output"}, + ], + ) + assert "args[0]" in src + assert "args[1]" in src + assert "args[2]" in src + assert "args[3]" in src + + def test_large_tiling_data_formatting(self): + """Large tiling data is formatted correctly across multiple lines.""" + tiling = bytes(range(48)) + src = self._gen( + ascendc_kernel_symbol="op", + tensor_args=[{"name": "a", "direction": "input"}], + tiling_data=tiling, + ) + assert "TILING_DATA[48]" in src + assert "0x00" in src + assert "0x2f" in src # last byte = 47 = 0x2f + + +class TestExtractKernelArtifacts: + """Test extraction of .o and tiling data from kernel_meta directory.""" + + def test_extracts_o_file(self, tmp_path): + """Finds and reads a .o file from kernel_meta.""" + from ascendc_compiler import extract_kernel_artifacts + + meta_dir = tmp_path / "kernel_meta" + meta_dir.mkdir() + (meta_dir / "kernel.o").write_bytes(b"\x7fELFfake") + + kernel_o, tiling = extract_kernel_artifacts(str(meta_dir)) + assert kernel_o == b"\x7fELFfake" + + def test_extracts_tiling_bin(self, tmp_path): + """Finds and reads a tiling .bin file.""" + from ascendc_compiler import extract_kernel_artifacts + + meta_dir = tmp_path / "kernel_meta" + meta_dir.mkdir() + (meta_dir / "kernel.o").write_bytes(b"obj") + (meta_dir / "tiling_data.bin").write_bytes(b"\x01\x02\x03") + + kernel_o, tiling = extract_kernel_artifacts(str(meta_dir)) + assert tiling == b"\x01\x02\x03" + + def test_extracts_tiling_from_json(self, tmp_path): + """Falls back to extracting tiling data from JSON metadata.""" + import json + from ascendc_compiler import extract_kernel_artifacts + + meta_dir = tmp_path / "kernel_meta" + meta_dir.mkdir() + (meta_dir / "kernel.o").write_bytes(b"obj") + meta = {"tiling_data": [0x10, 0x20, 0x30]} + (meta_dir / "metadata.json").write_text(json.dumps(meta)) + + kernel_o, tiling = extract_kernel_artifacts(str(meta_dir)) + assert tiling == bytes([0x10, 0x20, 0x30]) + + def test_extracts_tiling_hex_from_json(self, tmp_path): + """Tiling data as hex string in JSON.""" + import json + from ascendc_compiler import extract_kernel_artifacts + + meta_dir = tmp_path / "kernel_meta" + meta_dir.mkdir() + (meta_dir / "kernel.o").write_bytes(b"obj") + meta = {"tiling_data": "aabb"} + (meta_dir / "metadata.json").write_text(json.dumps(meta)) + + kernel_o, tiling = extract_kernel_artifacts(str(meta_dir)) + assert tiling == bytes([0xaa, 0xbb]) + + def test_missing_dir_raises(self): + """Raises FileNotFoundError for non-existent directory.""" + from ascendc_compiler import extract_kernel_artifacts + + with pytest.raises(FileNotFoundError): + extract_kernel_artifacts("/nonexistent/path") + + def test_no_o_file_returns_none(self, tmp_path): + """Returns (None, ...) when no .o file exists.""" + from ascendc_compiler import extract_kernel_artifacts + + meta_dir = tmp_path / "kernel_meta" + meta_dir.mkdir() + (meta_dir / "readme.txt").write_text("no object file") + + kernel_o, tiling = extract_kernel_artifacts(str(meta_dir)) + assert kernel_o is None + + +class TestAscendCCompilerInit: + """Test AscendCCompiler initialization.""" + + def test_sim_platform_raises_on_compile(self): + """Simulation platforms raise RuntimeError when trying to compile.""" + from ascendc_compiler import AscendCCompiler + + compiler = AscendCCompiler(platform="a2a3sim") + with pytest.raises(RuntimeError, match="hardware platform"): + compiler.compile_ascendc_kernel( + ascendc_kernel_source="/fake.cpp", + ascendc_kernel_symbol="op", + tensor_args=[{"name": "a", "direction": "input"}], + ) + + def test_unknown_platform_raises(self): + """Unknown platform raises ValueError.""" + from ascendc_compiler import AscendCCompiler + + with pytest.raises(ValueError, match="Unknown platform"): + AscendCCompiler(platform="unknown_platform") + + def test_must_provide_source_or_o(self): + """Raises ValueError when neither source nor .o is provided.""" + from ascendc_compiler import AscendCCompiler + + compiler = AscendCCompiler(platform="a2a3sim") + with pytest.raises(RuntimeError, match="hardware platform"): + compiler.compile_ascendc_kernel( + ascendc_kernel_symbol="op", + tensor_args=[{"name": "a", "direction": "input"}], + ) + + +class TestAscendCToolchain: + """Test AscendCToolchain compiler flags and include dirs.""" + + @pytest.fixture(autouse=True) + def _ensure_ascend(self): + import env_manager + env_manager.ensure("ASCEND_HOME_PATH") + + def test_flags_use_aicore_lang(self): + """AscendC flags must use --cce-aicore-lang, not -x cce.""" + from toolchain import AscendCToolchain + + tc = AscendCToolchain(platform="a2a3") + flags = tc.get_compile_flags(core_type="aiv") + assert "--cce-aicore-lang" in flags + assert "-x" not in flags + assert "cce" not in flags + assert "--cce-auto-sync" in flags + + def test_flags_contain_arch(self): + """Flags include correct architecture for platform and core_type.""" + from toolchain import AscendCToolchain + + tc = AscendCToolchain(platform="a2a3") + aiv_flags = tc.get_compile_flags(core_type="aiv") + assert "--cce-aicore-arch=dav-c220-vec" in aiv_flags + + aic_flags = tc.get_compile_flags(core_type="aic") + assert "--cce-aicore-arch=dav-c220-cube" in aic_flags + + def test_include_dirs_are_specific(self): + """Include dirs are specific paths, not os.walk results.""" + from toolchain import AscendCToolchain + + tc = AscendCToolchain(platform="a2a3") + dirs = tc.get_ascendc_include_dirs() + assert len(dirs) > 0 + # All returned dirs must actually exist + for d in dirs: + assert Path(d).is_dir(), f"Include dir does not exist: {d}" + # Must include key AscendC directories + dir_basenames = [Path(d).name for d in dirs] + assert "include" in dir_basenames # asc/include + + +class TestGenerateMergedSource: + """Test merged single-TU source generation.""" + + def _gen(self, **kwargs): + from ascendc_compiler import generate_merged_source + return generate_merged_source(**kwargs) + + def test_basic_structure(self): + """Merged source has kernel_entry before user source include.""" + src = self._gen( + ascendc_kernel_source="/path/to/add_custom.cpp", + ascendc_kernel_symbol="add_custom", + tensor_args=[ + {"name": "x", "direction": "input"}, + {"name": "y", "direction": "input"}, + {"name": "z", "direction": "output"}, + ], + ) + lines = src.splitlines() + # kernel_entry must appear before the user source include + ke_line = next(i for i, l in enumerate(lines) if "kernel_entry" in l and "extern" in l) + inc_line = next(i for i, l in enumerate(lines) if "#include" in l and "add_custom.cpp" in l) + assert ke_line < inc_line, "kernel_entry must be defined before user source" + + def test_global_suppressed(self): + """__global__ is suppressed around user source include.""" + src = self._gen( + ascendc_kernel_source="/path/to/kernel.cpp", + ascendc_kernel_symbol="my_kernel", + tensor_args=[{"name": "a", "direction": "input"}], + ) + assert '#define __global__' in src + assert '#undef __global__' in src + + def test_forward_declaration(self): + """Forward declares user kernel without __global__.""" + src = self._gen( + ascendc_kernel_source="/path/to/kernel.cpp", + ascendc_kernel_symbol="my_kernel", + tensor_args=[ + {"name": "a", "direction": "input"}, + {"name": "b", "direction": "output"}, + ], + has_workspace=True, + ) + # Forward decl has 4 params: a, b, workspace, tiling + fwd_lines = [l for l in src.splitlines() + if "extern" in l and "my_kernel" in l and "kernel_entry" not in l] + assert len(fwd_lines) == 1 + assert "__global__" not in fwd_lines[0] + assert fwd_lines[0].count("uint8_t*") == 4 + + def test_kernel_entry_not_global(self): + """kernel_entry is NOT __global__ (uses standard calling convention).""" + src = self._gen( + ascendc_kernel_source="/path/to/kernel.cpp", + ascendc_kernel_symbol="my_kernel", + tensor_args=[{"name": "a", "direction": "input"}], + ) + ke_line = [l for l in src.splitlines() if "kernel_entry" in l and "extern" in l][0] + assert "__global__" not in ke_line + assert "__aicore__" in ke_line + + def test_includes_kernel_operator(self): + """Merged source includes kernel_operator.h for AscendC types.""" + src = self._gen( + ascendc_kernel_source="/path/to/kernel.cpp", + ascendc_kernel_symbol="my_kernel", + tensor_args=[{"name": "a", "direction": "input"}], + ) + assert '#include "kernel_operator.h"' in src + assert '#include "tensor.h"' in src + + def test_with_tiling_and_workspace(self): + """Tiling data and workspace are correctly embedded and passed.""" + tiling = bytes([0xAA, 0xBB]) + src = self._gen( + ascendc_kernel_source="/path/to/kernel.cpp", + ascendc_kernel_symbol="my_kernel", + tensor_args=[{"name": "a", "direction": "input"}], + tiling_data=tiling, + has_workspace=True, + ) + assert "TILING_DATA[2]" in src + assert "0xaa" in src + call_line = [l for l in src.splitlines() + if "my_kernel(" in l and "extern" not in l][0] + assert "nullptr" in call_line # workspace + assert "TILING_DATA" in call_line # tiling + + +class TestCodeRunnerAscendCDispatch: + """Test that CodeRunner dispatches to AscendC compiler for compiler='ascendc' kernels.""" + + def test_default_compiler_is_pto(self): + """Kernels without 'compiler' field default to 'pto'.""" + kernel = {"func_id": 0, "source": "/fake.cpp", "core_type": "aiv"} + assert kernel.get("compiler", "pto") == "pto" + + def test_ascendc_compiler_detected(self): + """Kernels with compiler='ascendc' are detected.""" + kernel = { + "func_id": 0, + "source": "/fake.o", + "core_type": "aiv", + "compiler": "ascendc", + } + assert kernel.get("compiler", "pto") == "ascendc" From eaad160f1b23b443611661b41663d6b068a95aaf Mon Sep 17 00:00:00 2001 From: Chao Wang <26245345+ChaoWao@users.noreply.github.com> Date: Sun, 22 Mar 2026 08:59:20 +0800 Subject: [PATCH 2/4] Refactor: simplify AscendC compiler to consume pre-compiled .o only - Remove source compilation path from ascendc_compiler.py (no more _compile_kernel_source, use_ascendc_compiler, ascendc_kernel_source) - compile_ascendc_kernel() now requires pre-compiled .o bytes directly - Update code_runner.py to reject non-.o sources for ascendc kernels - kernel_config.py now references add_custom.o (externally compiled) - add_custom.cpp is reference-only documentation (wrapped in #if 0) - Update tests to match simplified API --- examples/scripts/code_runner.py | 27 +- python/ascendc_compiler.py | 315 +++++------------- .../kernels/ascendc/add_custom.cpp | 47 ++- .../kernels/kernel_config.py | 14 +- tests/test_ascendc_compiler.py | 140 +++----- 5 files changed, 202 insertions(+), 341 deletions(-) diff --git a/examples/scripts/code_runner.py b/examples/scripts/code_runner.py index 8c35debb6..ae5068e98 100644 --- a/examples/scripts/code_runner.py +++ b/examples/scripts/code_runner.py @@ -819,19 +819,38 @@ def _compile_one_ascendc_kernel(kernel): ascendc = AscendCCompiler(platform=self.platform) tiling_data = None + ascendc_kernel_o = None + # Load pre-compiled .o from source path or kernel_meta + source_path = kernel["source"] + if source_path.endswith(".o"): + with open(source_path, 'rb') as f: + ascendc_kernel_o = f.read() + + # Extract from kernel_meta if provided kernel_meta_dir = kernel.get("kernel_meta_dir") if kernel_meta_dir: kernel_meta_dir = os.path.abspath(kernel_meta_dir) - _, tiling_data = extract_kernel_artifacts(kernel_meta_dir) + meta_o, tiling_data = extract_kernel_artifacts(kernel_meta_dir) + if ascendc_kernel_o is None and meta_o is not None: + ascendc_kernel_o = meta_o + + if ascendc_kernel_o is None: + raise ValueError( + f"AscendC kernel requires a pre-compiled .o file. " + f"Got source='{source_path}'. " + f"Compile the AscendC kernel externally (e.g. via tikcpp_smoke) " + f"and provide the .o path or kernel_meta_dir." + ) + # Override tiling from explicit path tiling_path = kernel.get("tiling_data_path") if tiling_path and os.path.isfile(tiling_path): with open(tiling_path, 'rb') as f: tiling_data = f.read() - combined_o = ascendc.compile_ascendc_kernel( - ascendc_kernel_source=kernel["source"], + combined_elf = ascendc.compile_ascendc_kernel( + ascendc_kernel_o=ascendc_kernel_o, ascendc_kernel_symbol=kernel.get("ascendc_symbol", "ascendc_kernel"), tensor_args=kernel.get("tensor_args", []), tiling_data=tiling_data, @@ -840,7 +859,7 @@ def _compile_one_ascendc_kernel(kernel): extra_include_dirs=runtime_include_dirs, build_dir=self.build_dir, ) - kernel_bin = extract_text_section(combined_o) + kernel_bin = extract_text_section(combined_elf) return (kernel["func_id"], kernel_bin) # Launch all compilations concurrently diff --git a/python/ascendc_compiler.py b/python/ascendc_compiler.py index 39eae9d49..da227a519 100644 --- a/python/ascendc_compiler.py +++ b/python/ascendc_compiler.py @@ -1,35 +1,40 @@ """ AscendC Kernel Compiler for PTO Runtime Integration -Compiles AscendC operator sources into PTO-compatible kernel binaries by -generating a merged source file that includes both the user's AscendC kernel -and a wrapper ``kernel_entry`` that bridges PTO's Tensor-pointer convention -to AscendC's raw GM-address convention. - -Workflow: - 1. Generate merged .cpp: kernel_entry wrapper (at offset 0) + #include user source - 2. Compile with AscendC toolchain flags (--cce-aicore-lang) → single .o - 3. Link with ld.lld (-e kernel_entry -Ttext=0) to resolve block-local relocations - 4. extract_text_section(linked_elf) → kernel binary (done by caller) - -The merged source suppresses ``__global__`` when including the user's kernel -source, so the compiler allows ``kernel_entry`` to call the user's kernel -function. kernel_entry is defined first in the source to ensure it sits at -.text offset 0 (PTO dispatches to offset 0). +Wraps pre-compiled AscendC operator binaries (.o) into PTO-compatible kernel +binaries. The key bridge is a generated "wrapper kernel" that adapts PTO's +unified kernel entry (int64_t* args with Tensor pointers) to AscendC's +per-tensor GM_ADDR calling convention. + +The AscendC kernel .o is compiled externally (e.g. via tikcpp_smoke + +npu_op_kernel_options --save-temp-files). This module only consumes the +pre-compiled .o — it does NOT compile AscendC source code. + +Workflow (compile wrapper + link): + 1. Read pre-compiled kernel .o (from kernel_meta/ or provided directly) + 2. Generate wrapper .cpp (kernel_entry + embedded tiling data) + 3. Compile wrapper with PTO compiler (ccec -x cce) -> wrapper.o + 4. Link: ld.lld -e kernel_entry -Ttext=0 wrapper.o kernel.o -> combined.elf + 5. extract_text_section(combined.elf) -> kernel binary (done by caller) + +The wrapper is always compiled with PTO flags (-x cce) because it only needs +tensor.h and contains no AscendC code. Usage: compiler = AscendCCompiler(platform="a2a3") - kernel_o = compiler.compile_ascendc_kernel( - ascendc_kernel_source="/path/to/add_custom.cpp", + kernel_o, tiling = extract_kernel_artifacts("/path/to/kernel_meta") + combined = compiler.compile_ascendc_kernel( + ascendc_kernel_o=kernel_o, ascendc_kernel_symbol="add_custom", tensor_args=[ {"name": "x", "direction": "input"}, {"name": "y", "direction": "input"}, {"name": "z", "direction": "output"}, ], + tiling_data=tiling, core_type="aiv", ) - # caller does: kernel_bin = extract_text_section(kernel_o) + # caller does: kernel_bin = extract_text_section(combined) """ import json @@ -41,7 +46,7 @@ from typing import Dict, List, Optional, Tuple import env_manager -from toolchain import AscendCToolchain, CCECToolchain +from toolchain import CCECToolchain logger = logging.getLogger(__name__) @@ -64,6 +69,8 @@ def generate_wrapper_source( - Forwards the raw GM byte-addresses to the AscendC kernel function - Embeds static tiling data as a ``constexpr`` array when provided + Compiled with PTO flags (ccec -x cce). Only needs tensor.h, no AscendC SDK. + Args: ascendc_kernel_symbol: The ``extern "C"`` symbol name of the AscendC kernel entry function in the compiled .o (e.g. ``"add_custom"``). @@ -82,6 +89,23 @@ def generate_wrapper_source( # --- header / includes --------------------------------------------------- lines.append('#include "tensor.h"') lines.append('') + # Ensure __gm__ and __aicore__ are defined (PTO ccec -x cce uses [aicore]) + lines.append('#ifndef __gm__') + lines.append('#define __gm__') + lines.append('#endif') + lines.append('#ifndef __aicore__') + lines.append('#define __aicore__ [aicore]') + lines.append('#endif') + lines.append('') + + # --- AscendC kernel forward declaration ---------------------------------- + param_strs = ['__gm__ uint8_t*'] * len(tensor_args) + if has_workspace: + param_strs.append('__gm__ uint8_t*') # workspace + param_strs.append('__gm__ uint8_t*') # tiling + decl_params = ', '.join(param_strs) + lines.append(f'extern "C" __aicore__ void {ascendc_kernel_symbol}({decl_params});') + lines.append('') # --- embedded tiling data ------------------------------------------------ if tiling_data is not None and len(tiling_data) > 0: @@ -131,95 +155,8 @@ def generate_wrapper_source( return '\n'.join(lines) -def generate_merged_source( - ascendc_kernel_source: str, - ascendc_kernel_symbol: str, - tensor_args: List[Dict[str, str]], - tiling_data: Optional[bytes] = None, - has_workspace: bool = False, -) -> str: - """Generate a single-TU source: kernel_entry wrapper + #include user kernel. - - kernel_entry is defined FIRST so it sits at offset 0 in .text (PTO - dispatches to offset 0). The user's AscendC kernel source is included - AFTER, with ``__global__`` suppressed so the compiler treats the user's - kernel as a regular callable function rather than a top-level entry point. - - Compiled as one translation unit with AscendC flags, so there are no - cross-TU relocations to resolve. - """ - lines: List[str] = [] - - # Include kernel_operator.h first for AscendC types and intrinsics - lines.append('#include "kernel_operator.h"') - lines.append('#include "tensor.h"') - lines.append('') - - # Forward-declare user's kernel function (without __global__) - param_types = ['__gm__ uint8_t*'] * len(tensor_args) - if has_workspace: - param_types.append('__gm__ uint8_t*') - param_types.append('__gm__ uint8_t*') # tiling - fwd_params = ', '.join(param_types) - lines.append(f'extern "C" __aicore__ void {ascendc_kernel_symbol}({fwd_params});') - lines.append('') - - # Embedded tiling data - if tiling_data is not None and len(tiling_data) > 0: - lines.append(f'static const uint8_t TILING_DATA[{len(tiling_data)}] = {{') - chunk = 16 - for i in range(0, len(tiling_data), chunk): - segment = tiling_data[i:i + chunk] - hex_segment = ', '.join(f'0x{b:02x}' for b in segment) - trailing = ',' if i + chunk < len(tiling_data) else '' - lines.append(f' {hex_segment}{trailing}') - lines.append('};') - lines.append('') - - # kernel_entry wrapper — defined FIRST so it's at .text offset 0 - # NOT __global__ — the PTO executor calls kernel functions via function - # pointers using the standard calling convention, not the __global__ entry - # convention. - lines.append('extern "C" __aicore__ void kernel_entry(__gm__ int64_t* args) {') - - for idx, ta in enumerate(tensor_args): - lines.append(f' __gm__ Tensor* t_{ta["name"]} = ' - f'reinterpret_cast<__gm__ Tensor*>(args[{idx}]);') - lines.append('') - - for ta in tensor_args: - lines.append( - f' __gm__ uint8_t* gm_{ta["name"]} = ' - f'reinterpret_cast<__gm__ uint8_t*>(t_{ta["name"]}->buffer.addr);' - ) - lines.append('') - - call_args = [f'gm_{ta["name"]}' for ta in tensor_args] - if has_workspace: - call_args.append('nullptr') - if tiling_data is not None and len(tiling_data) > 0: - call_args.append('(__gm__ uint8_t*)TILING_DATA') - else: - call_args.append('nullptr') - call_str = ', '.join(call_args) - lines.append(f' {ascendc_kernel_symbol}({call_str});') - - lines.append('}') - lines.append('') - - # Include user's AscendC kernel source AFTER kernel_entry. - # Suppress __global__ so user's kernel becomes a regular __aicore__ function - # (the compiler forbids calling __global__ functions from other functions). - lines.append('#define __global__') - lines.append(f'#include "{ascendc_kernel_source}"') - lines.append('#undef __global__') - lines.append('') - - return '\n'.join(lines) - - # --------------------------------------------------------------------------- -# AscendC compilation helpers +# AscendC artifact extraction # --------------------------------------------------------------------------- def extract_kernel_artifacts( @@ -283,22 +220,26 @@ def extract_kernel_artifacts( # --------------------------------------------------------------------------- class AscendCCompiler: - """Compile AscendC operators into PTO-compatible kernel binaries. + """Wrap pre-compiled AscendC kernel binaries into PTO-compatible kernel entries. + + Compile wrapper + link: + 1. Write pre-compiled kernel .o to build dir + 2. Generate + compile wrapper.o with PTO compiler (ccec -x cce) + 3. Link wrapper.o + kernel.o via ld.lld (wrapper first -> offset 0) + + The AscendC kernel .o must be compiled externally (e.g. via tikcpp_smoke + + npu_op_kernel_options --save-temp-files). Typical usage:: compiler = AscendCCompiler(platform="a2a3") - kernel_o = compiler.compile_ascendc_kernel( - ascendc_kernel_source="path/to/add_custom.cpp", + combined = compiler.compile_ascendc_kernel( + ascendc_kernel_o=kernel_o_bytes, ascendc_kernel_symbol="add_custom", - tensor_args=[ - {"name": "x", "direction": "input"}, - {"name": "y", "direction": "input"}, - {"name": "z", "direction": "output"}, - ], - core_type="aiv", + tensor_args=[...], + tiling_data=tiling_bytes, ) - kernel_bin = extract_text_section(kernel_o) + kernel_bin = extract_text_section(combined) """ def __init__(self, platform: str = "a2a3"): @@ -314,10 +255,8 @@ def __init__(self, platform: str = "a2a3"): if platform in ("a2a3", "a5"): env_manager.ensure("ASCEND_HOME_PATH") - self.ascendc_toolchain = AscendCToolchain(platform) self.pto_toolchain = CCECToolchain(platform) else: - self.ascendc_toolchain = None self.pto_toolchain = None def _get_runtime_include_dir(self, runtime_name: str = "tensormap_and_ringbuffer") -> str: @@ -326,8 +265,7 @@ def _get_runtime_include_dir(self, runtime_name: str = "tensormap_and_ringbuffer def compile_ascendc_kernel( self, - ascendc_kernel_source: Optional[str] = None, - ascendc_kernel_o: Optional[bytes] = None, + ascendc_kernel_o: bytes, ascendc_kernel_symbol: str = "ascendc_kernel", tensor_args: Optional[List[Dict[str, str]]] = None, tiling_data: Optional[bytes] = None, @@ -336,125 +274,47 @@ def compile_ascendc_kernel( extra_include_dirs: Optional[List[str]] = None, build_dir: Optional[str] = None, ) -> bytes: - """Compile AscendC kernel into a PTO-compatible .o (single-TU). + """Wrap a pre-compiled AscendC kernel .o into a PTO-compatible linked ELF. - Generates a merged source file that ``#include``s the user's AscendC - kernel and appends a ``kernel_entry`` wrapper, then compiles everything - as one translation unit with AscendC flags. + Generates a PTO wrapper, compiles it with PTO flags, then links it with + the kernel .o so that ``kernel_entry`` sits at .text offset 0. The caller should run ``extract_text_section(result)`` on the output. Args: - ascendc_kernel_source: Path to AscendC kernel ``.cpp`` source. - ascendc_kernel_o: Pre-compiled AscendC kernel ``.o`` bytes. - If provided, a two-step compile+link is used instead. + ascendc_kernel_o: Pre-compiled kernel ``.o`` bytes (from external + AscendC compilation, e.g. tikcpp_smoke kernel_meta/). ascendc_kernel_symbol: ``extern "C"`` symbol of the AscendC kernel. tensor_args: Ordered tensor descriptor list for wrapper generation. tiling_data: Static tiling data blob (None = no tiling). core_type: ``"aiv"`` (vector) or ``"aic"`` (cube). has_workspace: Whether AscendC kernel expects a workspace pointer. - extra_include_dirs: Extra -I paths for compilation. + extra_include_dirs: Extra -I paths for wrapper compilation. build_dir: Working directory for intermediates. Returns: - Compiled .o bytes. + Linked ELF bytes (pass to ``extract_text_section``). """ - if self.ascendc_toolchain is None: + if self.pto_toolchain is None: raise RuntimeError( - "AscendC kernel compilation requires a hardware platform (a2a3/a5). " + "AscendC kernel wrapping requires a hardware platform (a2a3/a5). " "Simulation platforms are not supported." ) - if ascendc_kernel_source is None and ascendc_kernel_o is None: - raise ValueError("Provide either ascendc_kernel_source or ascendc_kernel_o") - if ascendc_kernel_source is not None and ascendc_kernel_o is not None: - raise ValueError("Provide only one of ascendc_kernel_source or ascendc_kernel_o") - if tensor_args is None: tensor_args = [] work_dir = build_dir or tempfile.mkdtemp(prefix="ascendc_build_") os.makedirs(work_dir, exist_ok=True) - if ascendc_kernel_o is not None: - return self._compile_with_prebuilt_o( - ascendc_kernel_o, ascendc_kernel_symbol, tensor_args, - tiling_data, core_type, has_workspace, extra_include_dirs, work_dir, - ) - - # --- Single-TU approach: merge kernel source + wrapper --- - ascendc_kernel_source = os.path.abspath(ascendc_kernel_source) - - merged_src = generate_merged_source( - ascendc_kernel_source=ascendc_kernel_source, - ascendc_kernel_symbol=ascendc_kernel_symbol, - tensor_args=tensor_args, - tiling_data=tiling_data, - has_workspace=has_workspace, - ) - - merged_cpp_path = os.path.join(work_dir, "ascendc_merged.cpp") - with open(merged_cpp_path, 'w') as f: - f.write(merged_src) - logger.info(f"[AscendC] Generated merged source: {merged_cpp_path}") - - output_o_path = os.path.join(work_dir, "ascendc_kernel.o") - - # Build include dirs: AscendC SDK + runtime (for tensor.h) + source dir - ascendc_includes = self.ascendc_toolchain.get_ascendc_include_dirs() - ascendc_includes.append(self._get_runtime_include_dir()) - ascendc_includes.append(os.path.dirname(ascendc_kernel_source)) - if extra_include_dirs: - ascendc_includes.extend( - os.path.abspath(d) for d in extra_include_dirs - ) - - compile_cmd = [self.ascendc_toolchain.cxx_path] - compile_cmd += self.ascendc_toolchain.get_compile_flags(core_type=core_type) - for inc in ascendc_includes: - compile_cmd.append(f"-I{inc}") - compile_cmd.extend(["-o", output_o_path, merged_cpp_path]) - - logger.info(f"[AscendC] Compiling AscendC kernel: {ascendc_kernel_source}") - logger.debug(f" Command: {' '.join(compile_cmd)}") - self._run(compile_cmd, "AscendC-Compile") - - # Link to resolve block-local relocations (R_AICORE_BLOCK_LOCAL_OFFSET12 - # for g_vecTPipePtr etc.). extract_text_section extracts raw bytes without - # applying relocations, so we must link first. - linked_path = os.path.join(work_dir, "ascendc_linked.elf") - link_cmd = [ - self.pto_toolchain.linker_path, - "-e", "kernel_entry", "-Ttext=0", - "-o", linked_path, output_o_path, - ] - logger.info("[AscendC] Linking to resolve block-local relocations") - self._run(link_cmd, "AscendC-Link") - - with open(linked_path, 'rb') as f: - result_bytes = f.read() - - logger.info(f"[AscendC] Linked binary: {len(result_bytes)} bytes") - return result_bytes - - def _compile_with_prebuilt_o( - self, - ascendc_kernel_o: bytes, - ascendc_kernel_symbol: str, - tensor_args: List[Dict[str, str]], - tiling_data: Optional[bytes], - core_type: str, - has_workspace: bool, - extra_include_dirs: Optional[List[str]], - work_dir: str, - ) -> bytes: - """Fallback: pre-compiled .o needs wrapper + link.""" + # --- Step 1: Write pre-compiled kernel .o ----------------------------- kernel_o_path = os.path.join(work_dir, "ascendc_kernel.o") with open(kernel_o_path, 'wb') as f: f.write(ascendc_kernel_o) - logger.info(f"[AscendC] Using pre-compiled kernel .o ({len(ascendc_kernel_o)} bytes)") + logger.info(f"[AscendC] Using pre-compiled kernel .o " + f"({len(ascendc_kernel_o)} bytes)") - # Generate + compile PTO wrapper with AscendC flags + # --- Step 2: Generate + compile wrapper with PTO flags --------------- wrapper_src = generate_wrapper_source( ascendc_kernel_symbol=ascendc_kernel_symbol, tensor_args=tensor_args, @@ -465,30 +325,36 @@ def _compile_with_prebuilt_o( wrapper_cpp_path = os.path.join(work_dir, "ascendc_wrapper.cpp") with open(wrapper_cpp_path, 'w') as f: f.write(wrapper_src) + logger.info(f"[AscendC] Generated wrapper: {wrapper_cpp_path}") wrapper_o_path = os.path.join(work_dir, "ascendc_wrapper.o") wrapper_includes = [self._get_runtime_include_dir()] if extra_include_dirs: wrapper_includes.extend(extra_include_dirs) - wrapper_cmd = [self.ascendc_toolchain.cxx_path] - wrapper_cmd += self.ascendc_toolchain.get_compile_flags(core_type=core_type) + wrapper_cmd = [self.pto_toolchain.cxx_path] + wrapper_cmd += self.pto_toolchain.get_compile_flags(core_type=core_type) for inc in wrapper_includes: wrapper_cmd.append(f"-I{os.path.abspath(inc)}") wrapper_cmd.extend(["-o", wrapper_o_path, wrapper_cpp_path]) - self._run(wrapper_cmd, "AscendC-Wrapper") - # Full link to resolve cross-TU calls - combined_o_path = os.path.join(work_dir, "ascendc_combined.o") + logger.info("[AscendC] Compiling wrapper with PTO flags") + self._run(wrapper_cmd, "PTO-Wrapper") + + # --- Step 3: Link wrapper.o + kernel.o ------------------------------- + # wrapper.o listed FIRST so kernel_entry lands at .text offset 0 + combined_path = os.path.join(work_dir, "ascendc_combined.elf") link_cmd = [ self.pto_toolchain.linker_path, "-e", "kernel_entry", "-Ttext=0", - "-o", combined_o_path, + "-o", combined_path, wrapper_o_path, kernel_o_path, ] + + logger.info("[AscendC] Linking wrapper.o + kernel.o") self._run(link_cmd, "AscendC-Link") - with open(combined_o_path, 'rb') as f: + with open(combined_path, 'rb') as f: result_bytes = f.read() logger.info(f"[AscendC] Combined binary: {len(result_bytes)} bytes") @@ -497,7 +363,6 @@ def _compile_with_prebuilt_o( def compile_from_kernel_meta( self, kernel_meta_dir: str, - ascendc_kernel_source: str, ascendc_kernel_symbol: str, tensor_args: List[Dict[str, str]], core_type: str = "aiv", @@ -505,11 +370,15 @@ def compile_from_kernel_meta( extra_include_dirs: Optional[List[str]] = None, build_dir: Optional[str] = None, ) -> bytes: - """Convenience: extract tiling from kernel_meta, then compile source.""" - _, tiling_data = extract_kernel_artifacts(kernel_meta_dir) + """Convenience: extract .o and tiling from kernel_meta, then wrap+link.""" + kernel_o, tiling_data = extract_kernel_artifacts(kernel_meta_dir) + if kernel_o is None: + raise FileNotFoundError( + f"No .o file found in kernel_meta: {kernel_meta_dir}" + ) return self.compile_ascendc_kernel( - ascendc_kernel_source=ascendc_kernel_source, + ascendc_kernel_o=kernel_o, ascendc_kernel_symbol=ascendc_kernel_symbol, tensor_args=tensor_args, tiling_data=tiling_data, diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.cpp b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.cpp index 1a086deeb..7cedb79d3 100644 --- a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.cpp +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.cpp @@ -1,16 +1,47 @@ /** - * AscendC AddCustom Operator + * AscendC AddCustom Operator — Reference Source * - * Element-wise addition: z[i] = x[i] + y[i] + * This file shows the AscendC operator source that would be compiled + * externally using the AscendC toolchain (CMake + npu_op_kernel_options). + * It is provided for documentation purposes — the actual compilation is + * done outside of simpler using the AscendC build system. * - * Uses AscendC double-buffered pipeline with CopyIn/Compute/CopyOut stages. - * Static tiling: totalLength=16384 elements, tileNum=8 tiles. + * AscendC compilation produces: + * 1. A compiled .o file (the AICore kernel binary) + * 2. A tiling data blob (for static tiling, baked into the kernel) * - * Calling convention (extern "C"): + * These artifacts are then consumed by simpler's AscendCCompiler, which + * wraps them with a PTO-compatible kernel_entry and registers the + * combined binary with the PTO runtime. + * + * To build this operator: + * 1. Place it in a tikcpp_smoke-style project structure + * 2. Configure CMakePresets.json for target SoC (e.g. Ascend910B) + * 3. Add to CMakeLists.txt: + * npu_op_kernel_options(ascendc_kernels ALL OPTIONS --save-temp-files) + * 4. Build with: bash run.sh --is-dynamic=0 + * 5. Find artifacts in: build_out/op_kernel/AddCustom_/kernel_/kernel_meta/ + * 6. Copy .o to this directory as add_custom.o + * + * The kernel entry symbol exported by AscendC compilation is typically + * the operator class name in snake_case — here "add_custom". + * + * Calling convention (after static tiling degeneration): * void add_custom(__gm__ uint8_t* x, __gm__ uint8_t* y, __gm__ uint8_t* z, * __gm__ uint8_t* workspace, __gm__ uint8_t* tiling); + * + * With static tiling, the tiling parameter points to compile-time-fixed data + * that is embedded in the wrapper kernel by AscendCCompiler. */ +// ============================================================================ +// The following is a simplified pseudo-code representation of what AscendC +// generates. It is NOT directly compilable with PTO's ccec — it requires the +// full AscendC header set (kernel_operator.h, etc.) from the CANN SDK. +// ============================================================================ + +#if 0 // Not compiled — reference only + #include "kernel_operator.h" using namespace AscendC; @@ -80,11 +111,15 @@ class KernelAdd { uint32_t tileLength; }; +// Generated entry point (after static tiling degeneration) extern "C" __global__ __aicore__ void add_custom( __gm__ uint8_t* x, __gm__ uint8_t* y, __gm__ uint8_t* z, __gm__ uint8_t* workspace, __gm__ uint8_t* tiling) { + // Static tiling: totalLength and tileNum are fixed at compile time KernelAdd op; - op.Init(x, y, z, 16384, 8); + op.Init(x, y, z, /*totalLength=*/16384, /*tileNum=*/8); op.Process(); } + +#endif diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/kernel_config.py b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/kernel_config.py index b7c176df3..6cd9f6eef 100644 --- a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/kernel_config.py +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/kernel_config.py @@ -1,12 +1,13 @@ """ AscendC Vector Example — kernel_config.py -Demonstrates integrating an AscendC operator (add_custom) into the PTO -tensormap_and_ringbuffer runtime via single-TU compile + link. +Demonstrates integrating a pre-compiled AscendC operator (add_custom) into the +PTO tensormap_and_ringbuffer runtime via wrapper generation + link. -The AscendC kernel source is merged with a kernel_entry wrapper into a single -translation unit, compiled with AscendC flags, then linked to resolve -block-local relocations. +The AscendC kernel .o is compiled externally (e.g. via tikcpp_smoke + +npu_op_kernel_options --save-temp-files). Simpler only generates a PTO wrapper +(kernel_entry), compiles it with PTO flags (-x cce), and links it with the +kernel .o so kernel_entry sits at .text offset 0. Computation: z = x + y (AscendC add_custom, func_id=0) @@ -27,7 +28,8 @@ KERNELS = [ { "func_id": 0, - "source": str(_KERNELS_ROOT / "ascendc" / "add_custom.cpp"), + # Pre-compiled AscendC kernel .o (produced by external AscendC build) + "source": str(_KERNELS_ROOT / "ascendc" / "add_custom.o"), "core_type": "aiv", "compiler": "ascendc", "ascendc_symbol": "add_custom", diff --git a/tests/test_ascendc_compiler.py b/tests/test_ascendc_compiler.py index 0ad02a008..3e654d57c 100644 --- a/tests/test_ascendc_compiler.py +++ b/tests/test_ascendc_compiler.py @@ -32,6 +32,21 @@ def test_basic_three_tensors(self): assert "gm_x" in src and "gm_y" in src and "gm_z" in src assert "nullptr" in src + def test_forward_declaration(self): + """Forward declares AscendC kernel symbol.""" + src = self._gen( + ascendc_kernel_symbol="add_custom", + tensor_args=[ + {"name": "x", "direction": "input"}, + {"name": "y", "direction": "input"}, + {"name": "z", "direction": "output"}, + ], + ) + fwd_lines = [l for l in src.splitlines() + if "extern" in l and "add_custom" in l] + assert len(fwd_lines) == 1 + assert "uint8_t*" in fwd_lines[0] + def test_with_static_tiling_data(self): """Tiling data bytes are embedded as a const array.""" tiling = bytes([0x10, 0x27, 0x00, 0x00]) @@ -108,6 +123,14 @@ def test_large_tiling_data_formatting(self): assert "0x00" in src assert "0x2f" in src # last byte = 47 = 0x2f + def test_no_ascendc_headers(self): + """Wrapper does NOT include kernel_operator.h (PTO code only).""" + src = self._gen( + ascendc_kernel_symbol="op", + tensor_args=[{"name": "a", "direction": "input"}], + ) + assert "kernel_operator.h" not in src + class TestExtractKernelArtifacts: """Test extraction of .o and tiling data from kernel_meta directory.""" @@ -186,13 +209,13 @@ class TestAscendCCompilerInit: """Test AscendCCompiler initialization.""" def test_sim_platform_raises_on_compile(self): - """Simulation platforms raise RuntimeError when trying to compile.""" + """Simulation platforms raise RuntimeError when trying to wrap+link.""" from ascendc_compiler import AscendCCompiler compiler = AscendCCompiler(platform="a2a3sim") with pytest.raises(RuntimeError, match="hardware platform"): compiler.compile_ascendc_kernel( - ascendc_kernel_source="/fake.cpp", + ascendc_kernel_o=b"fake_kernel_o", ascendc_kernel_symbol="op", tensor_args=[{"name": "a", "direction": "input"}], ) @@ -204,17 +227,6 @@ def test_unknown_platform_raises(self): with pytest.raises(ValueError, match="Unknown platform"): AscendCCompiler(platform="unknown_platform") - def test_must_provide_source_or_o(self): - """Raises ValueError when neither source nor .o is provided.""" - from ascendc_compiler import AscendCCompiler - - compiler = AscendCCompiler(platform="a2a3sim") - with pytest.raises(RuntimeError, match="hardware platform"): - compiler.compile_ascendc_kernel( - ascendc_kernel_symbol="op", - tensor_args=[{"name": "a", "direction": "input"}], - ) - class TestAscendCToolchain: """Test AscendCToolchain compiler flags and include dirs.""" @@ -260,96 +272,15 @@ def test_include_dirs_are_specific(self): dir_basenames = [Path(d).name for d in dirs] assert "include" in dir_basenames # asc/include + def test_pto_flags_use_x_cce(self): + """PTO flags use -x cce, not --cce-aicore-lang.""" + from toolchain import CCECToolchain -class TestGenerateMergedSource: - """Test merged single-TU source generation.""" - - def _gen(self, **kwargs): - from ascendc_compiler import generate_merged_source - return generate_merged_source(**kwargs) - - def test_basic_structure(self): - """Merged source has kernel_entry before user source include.""" - src = self._gen( - ascendc_kernel_source="/path/to/add_custom.cpp", - ascendc_kernel_symbol="add_custom", - tensor_args=[ - {"name": "x", "direction": "input"}, - {"name": "y", "direction": "input"}, - {"name": "z", "direction": "output"}, - ], - ) - lines = src.splitlines() - # kernel_entry must appear before the user source include - ke_line = next(i for i, l in enumerate(lines) if "kernel_entry" in l and "extern" in l) - inc_line = next(i for i, l in enumerate(lines) if "#include" in l and "add_custom.cpp" in l) - assert ke_line < inc_line, "kernel_entry must be defined before user source" - - def test_global_suppressed(self): - """__global__ is suppressed around user source include.""" - src = self._gen( - ascendc_kernel_source="/path/to/kernel.cpp", - ascendc_kernel_symbol="my_kernel", - tensor_args=[{"name": "a", "direction": "input"}], - ) - assert '#define __global__' in src - assert '#undef __global__' in src - - def test_forward_declaration(self): - """Forward declares user kernel without __global__.""" - src = self._gen( - ascendc_kernel_source="/path/to/kernel.cpp", - ascendc_kernel_symbol="my_kernel", - tensor_args=[ - {"name": "a", "direction": "input"}, - {"name": "b", "direction": "output"}, - ], - has_workspace=True, - ) - # Forward decl has 4 params: a, b, workspace, tiling - fwd_lines = [l for l in src.splitlines() - if "extern" in l and "my_kernel" in l and "kernel_entry" not in l] - assert len(fwd_lines) == 1 - assert "__global__" not in fwd_lines[0] - assert fwd_lines[0].count("uint8_t*") == 4 - - def test_kernel_entry_not_global(self): - """kernel_entry is NOT __global__ (uses standard calling convention).""" - src = self._gen( - ascendc_kernel_source="/path/to/kernel.cpp", - ascendc_kernel_symbol="my_kernel", - tensor_args=[{"name": "a", "direction": "input"}], - ) - ke_line = [l for l in src.splitlines() if "kernel_entry" in l and "extern" in l][0] - assert "__global__" not in ke_line - assert "__aicore__" in ke_line - - def test_includes_kernel_operator(self): - """Merged source includes kernel_operator.h for AscendC types.""" - src = self._gen( - ascendc_kernel_source="/path/to/kernel.cpp", - ascendc_kernel_symbol="my_kernel", - tensor_args=[{"name": "a", "direction": "input"}], - ) - assert '#include "kernel_operator.h"' in src - assert '#include "tensor.h"' in src - - def test_with_tiling_and_workspace(self): - """Tiling data and workspace are correctly embedded and passed.""" - tiling = bytes([0xAA, 0xBB]) - src = self._gen( - ascendc_kernel_source="/path/to/kernel.cpp", - ascendc_kernel_symbol="my_kernel", - tensor_args=[{"name": "a", "direction": "input"}], - tiling_data=tiling, - has_workspace=True, - ) - assert "TILING_DATA[2]" in src - assert "0xaa" in src - call_line = [l for l in src.splitlines() - if "my_kernel(" in l and "extern" not in l][0] - assert "nullptr" in call_line # workspace - assert "TILING_DATA" in call_line # tiling + tc = CCECToolchain(platform="a2a3") + flags = tc.get_compile_flags(core_type="aiv") + assert "-x" in flags + assert "cce" in flags + assert "--cce-aicore-lang" not in flags class TestCodeRunnerAscendCDispatch: @@ -369,3 +300,8 @@ def test_ascendc_compiler_detected(self): "compiler": "ascendc", } assert kernel.get("compiler", "pto") == "ascendc" + + def test_source_must_be_o_file(self): + """AscendC kernels require .o files, not .cpp sources.""" + kernel_o = {"source": "ascendc/add_custom.o", "compiler": "ascendc"} + assert kernel_o["source"].endswith(".o") From e63096597188a24b6a2ba3018dec427e4360dab7 Mon Sep 17 00:00:00 2001 From: Chao Wang <26245345+ChaoWao@users.noreply.github.com> Date: Sun, 22 Mar 2026 11:23:15 +0800 Subject: [PATCH 3/4] Add: working AscendC kernel integration with PTO runtime - Add pre-compiled add_custom.o (AscendC AddCustom, float32, dav-c220-vec) with two critical adaptations for PTO dispatch: 1. No __global__ attribute (causes hang under PTO subroutine dispatch) 2. No GetBlockNum()/GetBlockIdx() (PTO dispatches to single cores) - Add build_add_custom.sh to reproduce the .o from source - Fix __gm__ address space qualifier in wrapper forward declaration: workspace and tiling params use plain uint8_t* (not __gm__) since PTO's ccec treats __gm__ as a real address-space qualifier - Update reference source and kernel_config docs - Device test passes: z (add) and w (mul) match golden on hardware --- python/ascendc_compiler.py | 11 +- .../kernels/ascendc/add_custom.cpp | 124 ++++++------- .../kernels/ascendc/add_custom.o | Bin 0 -> 3936 bytes .../kernels/ascendc/build_add_custom.sh | 169 ++++++++++++++++++ .../kernels/kernel_config.py | 15 +- tests/test_ascendc_compiler.py | 2 +- 6 files changed, 246 insertions(+), 75 deletions(-) create mode 100644 tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.o create mode 100755 tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/build_add_custom.sh diff --git a/python/ascendc_compiler.py b/python/ascendc_compiler.py index da227a519..fe450fd03 100644 --- a/python/ascendc_compiler.py +++ b/python/ascendc_compiler.py @@ -99,10 +99,15 @@ def generate_wrapper_source( lines.append('') # --- AscendC kernel forward declaration ---------------------------------- + # Tensor pointers are in GM address space; workspace and tiling use plain + # uint8_t* because the wrapper may embed tiling in const data (not GM) and + # PTO's ccec treats __gm__ as a real address-space qualifier. The linker + # resolves symbols by name only, so parameter types don't need to match + # the AscendC side exactly. param_strs = ['__gm__ uint8_t*'] * len(tensor_args) if has_workspace: - param_strs.append('__gm__ uint8_t*') # workspace - param_strs.append('__gm__ uint8_t*') # tiling + param_strs.append('uint8_t*') # workspace (nullptr from wrapper) + param_strs.append('uint8_t*') # tiling (may be const data, not GM) decl_params = ', '.join(param_strs) lines.append(f'extern "C" __aicore__ void {ascendc_kernel_symbol}({decl_params});') lines.append('') @@ -143,7 +148,7 @@ def generate_wrapper_source( if has_workspace: call_args.append('nullptr') if tiling_data is not None and len(tiling_data) > 0: - call_args.append('(__gm__ uint8_t*)TILING_DATA') + call_args.append('(uint8_t*)TILING_DATA') else: call_args.append('nullptr') call_str = ', '.join(call_args) diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.cpp b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.cpp index 7cedb79d3..11026905f 100644 --- a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.cpp +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.cpp @@ -1,43 +1,34 @@ /** * AscendC AddCustom Operator — Reference Source * - * This file shows the AscendC operator source that would be compiled - * externally using the AscendC toolchain (CMake + npu_op_kernel_options). - * It is provided for documentation purposes — the actual compilation is - * done outside of simpler using the AscendC build system. + * This file documents the AscendC kernel source used for PTO integration. + * The actual .o is built by build_add_custom.sh. * - * AscendC compilation produces: - * 1. A compiled .o file (the AICore kernel binary) - * 2. A tiling data blob (for static tiling, baked into the kernel) + * PTO integration requires two adaptations from standard AscendC kernels: * - * These artifacts are then consumed by simpler's AscendCCompiler, which - * wraps them with a PTO-compatible kernel_entry and registers the - * combined binary with the PTO runtime. + * 1. NO __global__ attribute — PTO's dispatch loop calls kernels as + * subroutines; __global__ generates a prologue that resets AICore + * state and never returns, causing a hang. * - * To build this operator: - * 1. Place it in a tikcpp_smoke-style project structure - * 2. Configure CMakePresets.json for target SoC (e.g. Ascend910B) - * 3. Add to CMakeLists.txt: - * npu_op_kernel_options(ascendc_kernels ALL OPTIONS --save-temp-files) - * 4. Build with: bash run.sh --is-dynamic=0 - * 5. Find artifacts in: build_out/op_kernel/AddCustom_/kernel_/kernel_meta/ - * 6. Copy .o to this directory as add_custom.o + * 2. NO GetBlockNum()/GetBlockIdx() partitioning — PTO dispatches each + * task to a single core, so the kernel must process all data. + * (Framework-launched AscendC launches across all blocks simultaneously.) * - * The kernel entry symbol exported by AscendC compilation is typically - * the operator class name in snake_case — here "add_custom". + * Calling convention: + * extern "C" __aicore__ void add_custom( + * GM_ADDR x, GM_ADDR y, GM_ADDR z, + * GM_ADDR workspace, GM_ADDR tiling); * - * Calling convention (after static tiling degeneration): - * void add_custom(__gm__ uint8_t* x, __gm__ uint8_t* y, __gm__ uint8_t* z, - * __gm__ uint8_t* workspace, __gm__ uint8_t* tiling); + * The PTO wrapper (generated by AscendCCompiler) provides kernel_entry() + * which unpacks Tensor* args and forwards raw GM addresses to add_custom(). * - * With static tiling, the tiling parameter points to compile-time-fixed data - * that is embedded in the wrapper kernel by AscendCCompiler. + * Static tiling: totalLength=16384 and tileNum=8 are hardcoded as constexpr. + * No runtime tiling pointer is needed (workspace and tiling are unused). */ // ============================================================================ -// The following is a simplified pseudo-code representation of what AscendC -// generates. It is NOT directly compilable with PTO's ccec — it requires the -// full AscendC header set (kernel_operator.h, etc.) from the CANN SDK. +// The actual source compiled by build_add_custom.sh (reproduced here for +// reference — not compiled by PTO). // ============================================================================ #if 0 // Not compiled — reference only @@ -45,80 +36,81 @@ #include "kernel_operator.h" using namespace AscendC; - constexpr int32_t BUFFER_NUM = 2; class KernelAdd { public: __aicore__ inline KernelAdd() {} - - __aicore__ inline void Init(__gm__ uint8_t* x, __gm__ uint8_t* y, - __gm__ uint8_t* z, uint32_t totalLength, - uint32_t tileNum) { + __aicore__ inline void Init(GM_ADDR x, GM_ADDR y, GM_ADDR z, + uint32_t totalLength, uint32_t tileNum) + { + // Process ALL data — no GetBlockNum() division (PTO single-core dispatch) this->blockLength = totalLength; this->tileNum = tileNum; - this->tileLength = totalLength / tileNum; - - xGm.SetGlobalBuffer((__gm__ float*)x, this->blockLength); - yGm.SetGlobalBuffer((__gm__ float*)y, this->blockLength); - zGm.SetGlobalBuffer((__gm__ float*)z, this->blockLength); - - pipe.InitBuffer(inQueueX, BUFFER_NUM, this->tileLength * sizeof(float)); - pipe.InitBuffer(inQueueY, BUFFER_NUM, this->tileLength * sizeof(float)); - pipe.InitBuffer(outQueueZ, BUFFER_NUM, this->tileLength * sizeof(float)); + this->tileLength = this->blockLength / tileNum / BUFFER_NUM; + xGm.SetGlobalBuffer((__gm__ DTYPE_X*)x, this->blockLength); + yGm.SetGlobalBuffer((__gm__ DTYPE_Y*)y, this->blockLength); + zGm.SetGlobalBuffer((__gm__ DTYPE_Z*)z, this->blockLength); + pipe.InitBuffer(inQueueX, BUFFER_NUM, this->tileLength * sizeof(DTYPE_X)); + pipe.InitBuffer(inQueueY, BUFFER_NUM, this->tileLength * sizeof(DTYPE_Y)); + pipe.InitBuffer(outQueueZ, BUFFER_NUM, this->tileLength * sizeof(DTYPE_Z)); } - - __aicore__ inline void Process() { - for (int32_t i = 0; i < this->tileNum; i++) { + __aicore__ inline void Process() + { + int32_t loopCount = this->tileNum * BUFFER_NUM; + for (int32_t i = 0; i < loopCount; i++) { CopyIn(i); Compute(i); CopyOut(i); } } - private: - __aicore__ inline void CopyIn(int32_t progress) { - LocalTensor xLocal = inQueueX.AllocTensor(); - LocalTensor yLocal = inQueueY.AllocTensor(); + __aicore__ inline void CopyIn(int32_t progress) + { + LocalTensor xLocal = inQueueX.AllocTensor(); + LocalTensor yLocal = inQueueY.AllocTensor(); DataCopy(xLocal, xGm[progress * this->tileLength], this->tileLength); DataCopy(yLocal, yGm[progress * this->tileLength], this->tileLength); inQueueX.EnQue(xLocal); inQueueY.EnQue(yLocal); } - - __aicore__ inline void Compute(int32_t progress) { - LocalTensor xLocal = inQueueX.DeQue(); - LocalTensor yLocal = inQueueY.DeQue(); - LocalTensor zLocal = outQueueZ.AllocTensor(); + __aicore__ inline void Compute(int32_t progress) + { + LocalTensor xLocal = inQueueX.DeQue(); + LocalTensor yLocal = inQueueY.DeQue(); + LocalTensor zLocal = outQueueZ.AllocTensor(); Add(zLocal, xLocal, yLocal, this->tileLength); - outQueueZ.EnQue(zLocal); + outQueueZ.EnQue(zLocal); inQueueX.FreeTensor(xLocal); inQueueY.FreeTensor(yLocal); } - - __aicore__ inline void CopyOut(int32_t progress) { - LocalTensor zLocal = outQueueZ.DeQue(); + __aicore__ inline void CopyOut(int32_t progress) + { + LocalTensor zLocal = outQueueZ.DeQue(); DataCopy(zGm[progress * this->tileLength], zLocal, this->tileLength); outQueueZ.FreeTensor(zLocal); } - +private: TPipe pipe; TQue inQueueX, inQueueY; TQue outQueueZ; - GlobalTensor xGm, yGm, zGm; + GlobalTensor xGm; + GlobalTensor yGm; + GlobalTensor zGm; uint32_t blockLength; uint32_t tileNum; uint32_t tileLength; }; -// Generated entry point (after static tiling degeneration) -extern "C" __global__ __aicore__ void add_custom( - __gm__ uint8_t* x, __gm__ uint8_t* y, __gm__ uint8_t* z, - __gm__ uint8_t* workspace, __gm__ uint8_t* tiling) +// No __global__ — called as subroutine from PTO wrapper's kernel_entry +extern "C" __aicore__ void add_custom( + GM_ADDR x, GM_ADDR y, GM_ADDR z, + GM_ADDR workspace, GM_ADDR tiling) { - // Static tiling: totalLength and tileNum are fixed at compile time + constexpr uint32_t totalLength = 16384; + constexpr uint32_t tileNum = 8; KernelAdd op; - op.Init(x, y, z, /*totalLength=*/16384, /*tileNum=*/8); + op.Init(x, y, z, totalLength, tileNum); op.Process(); } diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.o b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.o new file mode 100644 index 0000000000000000000000000000000000000000..218e54e65d98cf55db6e092300b6279580ad75c2 GIT binary patch literal 3936 zcmbtXZEO_B8Gd(WxZS$|cTfjxW9&5;*+=l{Y%l>URoCIeHlgSoK~iX`zi^EcV;f^A zO{Hcp6x9S0Oc4S#Ddj_IMfrmwP`@a>UK&KB{J4lJ1!SzlHpbwtdsS13AcgBQ>s@Rd zP=9o!-S?e$p7&#Bp4ppwbJg0_Oi{>f3Izk>J=aR4Pj&UfWZ0@f(@3En&pdVO@NH2y zd`En1_^yZykBF*aOT0DwbuyQE7?GyN*FrA$FxgL~wE*p}wEIQYJ`uRHUy~T~cA204 zaoSJTfM0p}ir0j_?#??J{qvUHuWz{GRq&7zP$L)%V@#t%s>HfuEP27tZe8%d1xX@9 zy)yc#NQ@p4Tl)`bWMrr(kV6^$jb9o0onIOLgTKf6vrzWNy!WjS1@%WYni5czDRFPz z=ttt%t*FL`q5l^z-HB?y?cb|~fs5)fF9n2}PVdF@5uO^Arp!_mCE=C+>GZ3%Ej4sR zqrg!$IfS@6>OA?GcdN{2<1&ERpeBY;SNMq9gZU`Yzj-@k9Ud8@MvV>aLCuHMZrGF= zhz*+tNDYQb> zv_qUOdS+^Vl5u9K2gooNSy^H#nWNFY@3Kq(HYhRr3+}D#rMl4p-f3+Xd;XI{(^wx9u@ju`z0clG^$}yeOnE7n zDYNGiJvh#K)Sq_*61|*$e}XCJW@+X?A2UAhWuYk*1iRp|TtPAN@3OME`9(HUV#hS- z;(W#_gOP4zDr8hy*Lm2Q%b&OMI6VL+m8;PMolVEA_f__hU^f`Xgn3z4=uJ@_y_`eQ z%QT%Qb9p*NJ|E(kU*s~1B>4-TxFPKt4Im$EpZ7E9q9IBCrHCKBtkYI&fXf`OS_?RA zLeNPbk+~NZGDTmY(ZV2mw^AW*^&oN&G2n!m6-4fp)zPdxie@R(d8&Y?^$MSy!!uY* zGZ;kvK^~F$Q$<|UN4QxH+sY9JT{NV~k3)ntALNPAv-}k+$fH^FY0rj2>ai?7joo94 z5!9&sSM2S`2-bNB>%5QwC!#IIM4Qes5jexUU(4Tq-@1i)WKz_dPu;%Y_9a$5*Nb|U z60D)zPe?)fO>Pb(nIUeX#G8V}bg{(R$k$t&x#`WvK4fb2@g{cZszDrk>Ro+}l;&&# zMresXQG@-m`2iesL5XYJ{7~-gH3nUqUM*BB2y6a(GSU>&fpt2p@ zkmq0W1G>lzOMZ-ty!<47 zHb<43Ojn{nfZt{wSz zyE?uTJ6#O)4(ODbL|g*5j0&QVA@5XC+@El-;<#6F+^ab5RUG#!PN{l>OrW3EO2egP z#U-W1rOVfsmX?<-EiYUATzFY|IIK6e*0(k5yP7(8w6wSBC1s&-s0=qaFcXiIxQIQq zm0?8CztWQd8*KxdY^IT?*oSPp_zM1(ZIP8c-g@osS1bFwZTCJ7MsFRSnKWKcmjlsZ z&$nj^13OMdskDaa@~gF9bNT}uT!YCb=S69gu?zcQ>cDNV^iCfkUbXK(eg$U3Ttfd+(IyQ zcGWjP{$QIe$BV1owC?!|+iyilNptP%-)mga+JXtJqJk}2ls7g^QOq*(PH{u+6YXCcS)Peb-;+mm%HyUHL3*GA19VG8EXzVuG?yjFJUvvD&&=cNH z{Sz#<>iW6%H=yvf>2?UywMpsn&$Y=qWt;5(YuF|&y^RYv`SFLzZc@m#aM#1-&wl|Y CK1;^{ literal 0 HcmV?d00001 diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/build_add_custom.sh b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/build_add_custom.sh new file mode 100755 index 000000000..a6b900d58 --- /dev/null +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/build_add_custom.sh @@ -0,0 +1,169 @@ +#!/bin/bash +# Compile AddCustom AscendC kernel for PTO runtime integration +# +# Based on: https://gitee.com/ascend/samples/tree/master/operator/ascendc/0_introduction/1_add_frameworklaunch/AddCustom +# +# Prerequisites: +# - CANN toolkit installed (ASCEND_HOME_PATH set) +# - ccec compiler available +# +# PTO integration requirements (compared to framework-launched AscendC): +# 1. NO __global__ attribute — PTO calls kernels as subroutines from its +# dispatch loop; __global__ generates an incompatible prologue/epilogue +# 2. NO GetBlockNum()/GetBlockIdx() partitioning — PTO dispatches each task +# to a single core, so the kernel must process all data +# 3. Static tiling — tiling values hardcoded as constexpr (no runtime tiling) +# +# Usage: +# bash build_add_custom.sh [--dtype float|half] [--output add_custom.o] + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ASCEND_HOME="${ASCEND_HOME_PATH:-/usr/local/Ascend/cann-8.5.0}" +CCEC="${ASCEND_HOME}/bin/ccec" +BUILD_DIR="${SCRIPT_DIR}/_build" + +# Defaults +DTYPE="float" +OUTPUT="${SCRIPT_DIR}/add_custom.o" + +# Parse args +while [[ $# -gt 0 ]]; do + case $1 in + --dtype) DTYPE="$2"; shift 2 ;; + --output) OUTPUT="$2"; shift 2 ;; + *) echo "Unknown arg: $1"; exit 1 ;; + esac +done + +if [ ! -f "$CCEC" ]; then + echo "Error: ccec not found at $CCEC" + echo "Set ASCEND_HOME_PATH to your CANN installation" + exit 1 +fi + +mkdir -p "$BUILD_DIR" + +# Generate PTO-compatible kernel source +# Key differences from the Gitee original: +# - extern "C" __aicore__ (no __global__) +# - blockLength = totalLength (no GetBlockNum() division) +# - no GetBlockIdx() offset on GlobalTensor +cat > "$BUILD_DIR/add_custom.cpp" << 'SRC_EOF' +#include "kernel_operator.h" + +using namespace AscendC; +constexpr int32_t BUFFER_NUM = 2; + +class KernelAdd { +public: + __aicore__ inline KernelAdd() {} + __aicore__ inline void Init(GM_ADDR x, GM_ADDR y, GM_ADDR z, + uint32_t totalLength, uint32_t tileNum) + { + this->blockLength = totalLength; + this->tileNum = tileNum; + this->tileLength = this->blockLength / tileNum / BUFFER_NUM; + xGm.SetGlobalBuffer((__gm__ DTYPE_X*)x, this->blockLength); + yGm.SetGlobalBuffer((__gm__ DTYPE_Y*)y, this->blockLength); + zGm.SetGlobalBuffer((__gm__ DTYPE_Z*)z, this->blockLength); + pipe.InitBuffer(inQueueX, BUFFER_NUM, this->tileLength * sizeof(DTYPE_X)); + pipe.InitBuffer(inQueueY, BUFFER_NUM, this->tileLength * sizeof(DTYPE_Y)); + pipe.InitBuffer(outQueueZ, BUFFER_NUM, this->tileLength * sizeof(DTYPE_Z)); + } + __aicore__ inline void Process() + { + int32_t loopCount = this->tileNum * BUFFER_NUM; + for (int32_t i = 0; i < loopCount; i++) { + CopyIn(i); + Compute(i); + CopyOut(i); + } + } +private: + __aicore__ inline void CopyIn(int32_t progress) + { + LocalTensor xLocal = inQueueX.AllocTensor(); + LocalTensor yLocal = inQueueY.AllocTensor(); + DataCopy(xLocal, xGm[progress * this->tileLength], this->tileLength); + DataCopy(yLocal, yGm[progress * this->tileLength], this->tileLength); + inQueueX.EnQue(xLocal); + inQueueY.EnQue(yLocal); + } + __aicore__ inline void Compute(int32_t progress) + { + LocalTensor xLocal = inQueueX.DeQue(); + LocalTensor yLocal = inQueueY.DeQue(); + LocalTensor zLocal = outQueueZ.AllocTensor(); + Add(zLocal, xLocal, yLocal, this->tileLength); + outQueueZ.EnQue(zLocal); + inQueueX.FreeTensor(xLocal); + inQueueY.FreeTensor(yLocal); + } + __aicore__ inline void CopyOut(int32_t progress) + { + LocalTensor zLocal = outQueueZ.DeQue(); + DataCopy(zGm[progress * this->tileLength], zLocal, this->tileLength); + outQueueZ.FreeTensor(zLocal); + } +private: + TPipe pipe; + TQue inQueueX, inQueueY; + TQue outQueueZ; + GlobalTensor xGm; + GlobalTensor yGm; + GlobalTensor zGm; + uint32_t blockLength; + uint32_t tileNum; + uint32_t tileLength; +}; + +// No __global__ — called as subroutine from PTO wrapper's kernel_entry +extern "C" __aicore__ void add_custom( + GM_ADDR x, GM_ADDR y, GM_ADDR z, + GM_ADDR workspace, GM_ADDR tiling) +{ + constexpr uint32_t totalLength = 16384; + constexpr uint32_t tileNum = 8; + KernelAdd op; + op.Init(x, y, z, totalLength, tileNum); + op.Process(); +} +SRC_EOF + +# AscendC include directories +ASC_ROOT="${ASCEND_HOME}/aarch64-linux/asc" +TIKCPP_ROOT="${ASCEND_HOME}/aarch64-linux/tikcpp" + +echo "Compiling add_custom.cpp (dtype=$DTYPE)..." +"$CCEC" \ + -c -O3 -std=c++17 \ + --cce-aicore-lang \ + -DTILING_KEY_VAR=0 \ + --cce-aicore-only \ + --cce-aicore-arch=dav-c220-vec \ + --cce-auto-sync \ + -mllvm -cce-aicore-stack-size=0x8000 \ + -mllvm -cce-aicore-function-stack-size=0x8000 \ + -DDTYPE_X=$DTYPE -DDTYPE_Y=$DTYPE -DDTYPE_Z=$DTYPE \ + -I"${ASC_ROOT}/include" \ + -I"${ASC_ROOT}/include/basic_api" \ + -I"${ASC_ROOT}/include/adv_api" \ + -I"${ASC_ROOT}/include/c_api" \ + -I"${ASC_ROOT}/include/utils" \ + -I"${ASC_ROOT}/impl/adv_api" \ + -I"${ASC_ROOT}/impl/basic_api" \ + -I"${ASC_ROOT}/impl/c_api" \ + -I"${ASC_ROOT}/impl/micro_api" \ + -I"${ASC_ROOT}/impl/simt_api" \ + -I"${ASC_ROOT}/impl/utils" \ + -I"${TIKCPP_ROOT}/tikcfw" \ + -I"${TIKCPP_ROOT}/tikcfw/interface" \ + -I"${TIKCPP_ROOT}/tikcfw/impl" \ + -include "${ASCEND_HOME}/include/ascendc/asc_devkit_version.h" \ + -o "$OUTPUT" \ + "$BUILD_DIR/add_custom.cpp" + +echo "Output: $OUTPUT ($(wc -c < "$OUTPUT") bytes)" +echo "Done." diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/kernel_config.py b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/kernel_config.py index 6cd9f6eef..ef0aafafb 100644 --- a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/kernel_config.py +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/kernel_config.py @@ -4,10 +4,14 @@ Demonstrates integrating a pre-compiled AscendC operator (add_custom) into the PTO tensormap_and_ringbuffer runtime via wrapper generation + link. -The AscendC kernel .o is compiled externally (e.g. via tikcpp_smoke + -npu_op_kernel_options --save-temp-files). Simpler only generates a PTO wrapper -(kernel_entry), compiles it with PTO flags (-x cce), and links it with the -kernel .o so kernel_entry sits at .text offset 0. +The .o is compiled with AscendC toolchain (ccec --cce-aicore-lang) but adapted +for PTO dispatch: + - No __global__ attribute (causes hang under PTO subroutine dispatch) + - No GetBlockNum()/GetBlockIdx() partitioning (PTO dispatches to single cores) + - Static tiling (constexpr values, no runtime tiling pointer) + +Simpler generates a PTO wrapper (kernel_entry), compiles it with PTO flags +(-x cce), and links it with the kernel .o so kernel_entry sits at .text offset 0. Computation: z = x + y (AscendC add_custom, func_id=0) @@ -28,7 +32,8 @@ KERNELS = [ { "func_id": 0, - # Pre-compiled AscendC kernel .o (produced by external AscendC build) + # Pre-compiled AscendC kernel .o with static tiling degeneration + # (tiling values baked in at compile time, no runtime tiling needed) "source": str(_KERNELS_ROOT / "ascendc" / "add_custom.o"), "core_type": "aiv", "compiler": "ascendc", diff --git a/tests/test_ascendc_compiler.py b/tests/test_ascendc_compiler.py index 3e654d57c..434bbcf88 100644 --- a/tests/test_ascendc_compiler.py +++ b/tests/test_ascendc_compiler.py @@ -60,7 +60,7 @@ def test_with_static_tiling_data(self): ) assert "TILING_DATA[4]" in src assert "0x10" in src and "0x27" in src - assert "(__gm__ uint8_t*)TILING_DATA" in src + assert "(uint8_t*)TILING_DATA" in src call_lines = [l for l in src.splitlines() if "my_op(" in l and "extern" not in l] assert len(call_lines) == 1 From 908a39655f561375db3ac4066e3e7b17d03153bc Mon Sep 17 00:00:00 2001 From: Chao Wang <26245345+ChaoWao@users.noreply.github.com> Date: Tue, 24 Mar 2026 20:14:21 +0800 Subject: [PATCH 4/4] Add unified PTO runtime package (L2/L3) Implements the python/pto/ package with a unified Runtime API that routes by level ("chip" for L2, "host" for L3). L3 manages per-chip worker processes, a handle-based task DAG with eager dispatch, and Python orchestration functions. Co-Authored-By: Claude Opus 4.6 --- python/pto/__init__.py | 36 +++++++ python/pto/compiler.py | 172 +++++++++++++++++++++++++++++++++ python/pto/dag.py | 123 ++++++++++++++++++++++++ python/pto/l2_runtime.py | 164 ++++++++++++++++++++++++++++++++ python/pto/l3_context.py | 97 +++++++++++++++++++ python/pto/l3_runtime.py | 200 +++++++++++++++++++++++++++++++++++++++ python/pto/l3_worker.py | 132 ++++++++++++++++++++++++++ python/pto/runtime.py | 75 +++++++++++++++ python/pto/types.py | 105 ++++++++++++++++++++ 9 files changed, 1104 insertions(+) create mode 100644 python/pto/__init__.py create mode 100644 python/pto/compiler.py create mode 100644 python/pto/dag.py create mode 100644 python/pto/l2_runtime.py create mode 100644 python/pto/l3_context.py create mode 100644 python/pto/l3_runtime.py create mode 100644 python/pto/l3_worker.py create mode 100644 python/pto/runtime.py create mode 100644 python/pto/types.py diff --git a/python/pto/__init__.py b/python/pto/__init__.py new file mode 100644 index 000000000..031c721f2 --- /dev/null +++ b/python/pto/__init__.py @@ -0,0 +1,36 @@ +"""PTO — Python Tensor Orchestration runtime. + +Provides a unified API for L2 (single chip) through L3+ (multi-chip) +execution on Ascend NPU devices. + +Quick start:: + + import pto + + # Single chip (L2) + rt = pto.Runtime(level="chip", platform="a2a3", device=0) + rt.register("vector_add", orch="orch.cpp", kernels=[...]) + rt.run("vector_add", args=[pto.Arg.input(x), pto.Arg.output(y)]) + rt.close() + + # Multi-chip (L3) + rt = pto.Runtime(level="host", platform="a2a3", devices=[0, 1, 2, 3]) + pkg = pto.compile(platform="a2a3", orch="orch.cpp", kernels=[...]) + rt.register("pipeline", orch=my_orch_func, kernels={"compute": pkg}) + rt.run("pipeline", args={"input": data}) + rt.close() +""" + +from .types import Arg, CompiledPackage, KernelSource, ParamType, TensorHandle +from .runtime import Runtime +from .compiler import compile + +__all__ = [ + "Runtime", + "Arg", + "compile", + "CompiledPackage", + "KernelSource", + "ParamType", + "TensorHandle", +] diff --git a/python/pto/compiler.py b/python/pto/compiler.py new file mode 100644 index 000000000..fc49fddf8 --- /dev/null +++ b/python/pto/compiler.py @@ -0,0 +1,172 @@ +"""Compilation utilities for PTO runtime. + +Wraps the existing RuntimeBuilder + KernelCompiler pipeline and adds +SHA256-based caching. +""" + +from __future__ import annotations + +import hashlib +import logging +import os +import pickle +import sys +from pathlib import Path +from typing import Optional + +from .types import CompiledPackage, KernelSource + +logger = logging.getLogger(__name__) + +# Ensure simpler's python/ is importable +_PYTHON_DIR = str(Path(__file__).resolve().parent.parent) +if _PYTHON_DIR not in sys.path: + sys.path.insert(0, _PYTHON_DIR) + +# Also need examples/scripts for elf_parser +_SCRIPTS_DIR = str(Path(__file__).resolve().parent.parent.parent / "examples" / "scripts") +if _SCRIPTS_DIR not in sys.path: + sys.path.insert(0, _SCRIPTS_DIR) + + +def _cache_key(platform: str, runtime_name: str, orch_source: str, + kernel_sources: list[dict], orch_func: str) -> str: + """Compute a SHA256 cache key from source contents + config.""" + h = hashlib.sha256() + h.update(platform.encode()) + h.update(runtime_name.encode()) + h.update(orch_func.encode()) + + # Hash orchestration source content + orch_path = Path(orch_source) + if orch_path.is_file(): + h.update(orch_path.read_bytes()) + else: + h.update(orch_source.encode()) + + # Hash kernel source contents + for ks in sorted(kernel_sources, key=lambda k: k.get("func_id", 0)): + src_path = Path(ks["source"]) + if src_path.is_file(): + h.update(src_path.read_bytes()) + else: + h.update(ks["source"].encode()) + h.update(ks.get("core_type", "aiv").encode()) + h.update(str(ks.get("func_id", 0)).encode()) + + return h.hexdigest() + + +def compile( + platform: str, + runtime_name: str = "tensormap_and_ringbuffer", + orch_source: str = "", + kernel_sources: Optional[list[dict]] = None, + orch_func: str = "aicpu_orchestration_entry", + block_dim: int = 1, + aicpu_thread_num: int = 1, + orch_thread_num: int = 1, + cache_dir: Optional[str] = None, + build_dir: Optional[str] = None, + extra_include_dirs: Optional[list[str]] = None, +) -> CompiledPackage: + """Compile all artifacts for an L2 execution package. + + Wraps RuntimeBuilder.build() + KernelCompiler to produce a + CompiledPackage containing all binaries needed for a single L2 run. + + Args: + platform: "a2a3" | "a2a3sim" | "a5" | "a5sim" + runtime_name: Which L2 runtime variant to compile + orch_source: Path to orchestration .cpp source + kernel_sources: List of dicts with keys: source, core_type, func_id + orch_func: Orchestration entry function name + block_dim: Number of AICore blocks + aicpu_thread_num: AICPU thread count + orch_thread_num: Orchestration thread count + cache_dir: Optional cache directory for compiled artifacts + build_dir: Optional build directory for intermediate files + extra_include_dirs: Additional include directories for kernel compilation + + Returns: + CompiledPackage with all binaries ready for L2 execution + """ + if kernel_sources is None: + kernel_sources = [] + + # Check cache + if cache_dir: + cache_path = Path(cache_dir).expanduser() + cache_path.mkdir(parents=True, exist_ok=True) + key = _cache_key(platform, runtime_name, orch_source, + kernel_sources, orch_func) + cached_file = cache_path / f"{key}.pkg" + if cached_file.exists(): + logger.info(f"Cache hit: {cached_file}") + with open(cached_file, "rb") as f: + return pickle.load(f) + + from runtime_builder import RuntimeBuilder + from elf_parser import extract_text_section + + builder = RuntimeBuilder(platform=platform) + kernel_compiler = builder.get_kernel_compiler() + + # Build runtime (host.so, aicpu.so, aicore.o) + logger.info("Compiling runtime...") + host_binary, aicpu_binary, aicore_binary = builder.build( + runtime_name, build_dir) + + # Compile orchestration + orch_binary = b"" + if orch_source: + logger.info(f"Compiling orchestration: {orch_source}") + orch_binary = kernel_compiler.compile_orchestration( + runtime_name, orch_source, build_dir=build_dir) + + # Compile kernels + compiled_kernels = [] + for ks in kernel_sources: + src = ks["source"] + core_type = ks.get("core_type", "aiv") + func_id = ks.get("func_id", 0) + + logger.info(f"Compiling kernel: {src} (core_type={core_type}, func_id={func_id})") + + pto_isa_root = os.environ.get("PTO_ISA_ROOT", "") + incore_o = kernel_compiler.compile_incore( + src, core_type=core_type, + pto_isa_root=pto_isa_root, + extra_include_dirs=extra_include_dirs or [], + build_dir=build_dir, + ) + + # Extract .text section for hardware platforms + if not platform.endswith("sim"): + kernel_bin = extract_text_section(incore_o) + else: + kernel_bin = incore_o + + compiled_kernels.append((func_id, kernel_bin)) + + pkg = CompiledPackage( + platform=platform, + runtime_name=runtime_name, + host_binary=host_binary, + aicpu_binary=aicpu_binary, + aicore_binary=aicore_binary, + orch_binary=orch_binary, + orch_func=orch_func, + kernel_binaries=compiled_kernels, + block_dim=block_dim, + aicpu_thread_num=aicpu_thread_num, + orch_thread_num=orch_thread_num, + ) + + # Save to cache + if cache_dir: + logger.info(f"Caching compiled package: {cached_file}") + with open(cached_file, "wb") as f: + pickle.dump(pkg, f) + + return pkg diff --git a/python/pto/dag.py b/python/pto/dag.py new file mode 100644 index 000000000..56be2273e --- /dev/null +++ b/python/pto/dag.py @@ -0,0 +1,123 @@ +"""Task DAG with handle-based dependency inference and eager dispatch.""" + +from __future__ import annotations + +from collections import defaultdict +from dataclasses import dataclass, field +from typing import Any + +from .types import ParamType + + +@dataclass +class TaskNode: + """A single task in the DAG.""" + + task_id: int + chip: int + kernel: str + args: list + deps: set[int] = field(default_factory=set) + is_group: bool = False + group_chips: list[int] = field(default_factory=list) + + +class TaskDAG: + """Dependency graph for L3 task scheduling. + + Dependencies are inferred automatically from tensor handle usage: + if task B reads a handle that task A produced (OUTPUT/INOUT), + then B depends on A. + """ + + def __init__(self): + self._tasks: dict[int, TaskNode] = {} + self._handle_producer: dict[int, int] = {} # handle_id → producer task_id + self._consumers: dict[int, set[int]] = defaultdict(set) # task_id → dependents + self._completed: set[int] = set() + self._dispatched: set[int] = set() + self._next_id = 0 + + def add_task(self, chip: int, kernel: str, args: list, + is_group: bool = False, + group_chips: list[int] = None) -> TaskNode: + """Add a task and infer dependencies from tensor handles. + + Returns the TaskNode (with deps populated). + """ + task_id = self._next_id + self._next_id += 1 + + deps = set() + for arg in args: + if not hasattr(arg, "type"): + continue + if arg.type in (ParamType.INPUT, ParamType.INOUT): + handle_id = getattr(arg, "_handle_id", None) + if handle_id is not None: + producer = self._handle_producer.get(handle_id) + if producer is not None and producer not in self._completed: + deps.add(producer) + + # Register outputs + for arg in args: + if not hasattr(arg, "type"): + continue + if arg.type in (ParamType.OUTPUT, ParamType.INOUT): + handle_id = getattr(arg, "_handle_id", None) + if handle_id is not None: + self._handle_producer[handle_id] = task_id + + node = TaskNode( + task_id=task_id, + chip=chip, + kernel=kernel, + args=args, + deps=deps, + is_group=is_group, + group_chips=group_chips or [], + ) + self._tasks[task_id] = node + + for dep in deps: + self._consumers[dep].add(task_id) + + return node + + def is_ready(self, task_id: int) -> bool: + """Check if a task has all dependencies satisfied.""" + node = self._tasks.get(task_id) + if node is None: + return False + return len(node.deps) == 0 + + def mark_dispatched(self, task_id: int) -> None: + self._dispatched.add(task_id) + + def complete(self, task_id: int) -> list[TaskNode]: + """Mark task complete and return newly-ready tasks.""" + self._completed.add(task_id) + ready = [] + + for consumer_id in self._consumers.get(task_id, set()): + consumer = self._tasks[consumer_id] + consumer.deps.discard(task_id) + if len(consumer.deps) == 0 and consumer_id not in self._dispatched: + ready.append(consumer) + + return ready + + def get_ready_tasks(self) -> list[TaskNode]: + """Return all tasks that are ready but not yet dispatched.""" + ready = [] + for tid, node in self._tasks.items(): + if tid not in self._dispatched and len(node.deps) == 0: + ready.append(node) + return ready + + def all_complete(self) -> bool: + return len(self._completed) == len(self._tasks) + + @property + def task_count(self) -> int: + return len(self._tasks) diff --git a/python/pto/l2_runtime.py b/python/pto/l2_runtime.py new file mode 100644 index 000000000..f216467bf --- /dev/null +++ b/python/pto/l2_runtime.py @@ -0,0 +1,164 @@ +"""L2 (single-chip) runtime implementation. + +Wraps the existing bindings.py ctypes interface behind the unified +init/register/run/close API. +""" + +from __future__ import annotations + +import ctypes +import logging +import sys +from pathlib import Path +from typing import Optional + +import numpy as np + +from .types import Arg, CompiledPackage, ParamType + +logger = logging.getLogger(__name__) + +# Ensure simpler's python/ is importable +_PYTHON_DIR = str(Path(__file__).resolve().parent.parent) +if _PYTHON_DIR not in sys.path: + sys.path.insert(0, _PYTHON_DIR) + + +def _args_to_c_arrays(args: list[Arg]): + """Convert a list of Arg to the parallel arrays that bindings.py expects. + + Returns: + (func_args, arg_types, arg_sizes, host_tensors) + host_tensors is a dict mapping index to numpy array for copy-back tracking. + """ + from bindings import ARG_SCALAR, ARG_INPUT_PTR, ARG_OUTPUT_PTR, ARG_INOUT_PTR + + type_map = { + ParamType.SCALAR: ARG_SCALAR, + ParamType.INPUT: ARG_INPUT_PTR, + ParamType.OUTPUT: ARG_OUTPUT_PTR, + ParamType.INOUT: ARG_INOUT_PTR, + } + + func_args = [] + arg_types = [] + arg_sizes = [] + + for arg in args: + arg_types.append(type_map[arg.type]) + if arg.type == ParamType.SCALAR: + func_args.append(int(arg.data)) + arg_sizes.append(0) + else: + # Tensor argument — pass host pointer + arr = arg.data + if not isinstance(arr, np.ndarray): + raise TypeError(f"Expected numpy array for tensor arg, got {type(arr)}") + if not arr.flags["C_CONTIGUOUS"]: + arr = np.ascontiguousarray(arr) + ptr = arr.ctypes.data + func_args.append(ptr) + arg_sizes.append(arr.nbytes) + + return func_args, arg_types, arg_sizes + + +class L2Runtime: + """Single-chip runtime. Wraps bindings.py.""" + + def __init__(self, platform: str, device: int = 0): + self._platform = platform + self._device = device + self._registry: dict[str, CompiledPackage] = {} + self._lib_loaded = False + self._RuntimeClass = None + + def register(self, name: str, *, pkg: CompiledPackage = None, + orch: str = "", kernels: list[dict] = None, + runtime: str = "tensormap_and_ringbuffer", + orch_func: str = "aicpu_orchestration_entry", + block_dim: int = 1, aicpu_thread_num: int = 1, + orch_thread_num: int = 1, + cache_dir: Optional[str] = None, + build_dir: Optional[str] = None, + extra_include_dirs: Optional[list[str]] = None, + **kwargs) -> None: + """Register a named computation. + + Either pass a pre-compiled CompiledPackage via ``pkg``, or pass + source paths via ``orch`` + ``kernels`` to compile on the fly. + """ + if pkg is not None: + self._registry[name] = pkg + return + + # Compile from source + from .compiler import compile as pto_compile + + pkg = pto_compile( + platform=self._platform, + runtime_name=runtime, + orch_source=orch, + kernel_sources=kernels or [], + orch_func=orch_func, + block_dim=block_dim, + aicpu_thread_num=aicpu_thread_num, + orch_thread_num=orch_thread_num, + cache_dir=cache_dir, + build_dir=build_dir, + extra_include_dirs=extra_include_dirs, + ) + self._registry[name] = pkg + + def run(self, name: str, args: list[Arg]) -> None: + """Execute a registered computation on the device. + + Performs the full L2 lifecycle: init → launch → finalize. + """ + if name not in self._registry: + raise KeyError(f"No registered computation '{name}'. " + f"Available: {list(self._registry.keys())}") + + pkg = self._registry[name] + self._ensure_loaded(pkg) + + func_args, arg_types, arg_sizes = _args_to_c_arrays(args) + + from bindings import launch_runtime + + rt = self._RuntimeClass() + rt.initialize( + pkg.orch_binary, + pkg.orch_func, + func_args, + arg_types=arg_types, + arg_sizes=arg_sizes, + kernel_binaries=pkg.kernel_binaries, + ) + launch_runtime( + rt, + aicpu_thread_num=pkg.aicpu_thread_num, + block_dim=pkg.block_dim, + device_id=self._device, + aicpu_binary=pkg.aicpu_binary, + aicore_binary=pkg.aicore_binary, + orch_thread_num=pkg.orch_thread_num, + ) + rt.finalize() + + def close(self) -> None: + """Release resources.""" + self._registry.clear() + self._RuntimeClass = None + self._lib_loaded = False + + def _ensure_loaded(self, pkg: CompiledPackage) -> None: + """Load the host binary and set device if not already done.""" + if self._lib_loaded: + return + + from bindings import bind_host_binary, set_device + + self._RuntimeClass = bind_host_binary(pkg.host_binary) + set_device(self._device) + self._lib_loaded = True diff --git a/python/pto/l3_context.py b/python/pto/l3_context.py new file mode 100644 index 000000000..3f49c3489 --- /dev/null +++ b/python/pto/l3_context.py @@ -0,0 +1,97 @@ +"""L3OrchestratorContext — the ``ctx`` object passed to Python orchestration functions. + +Provides the API that L3 orchestration code calls: peers(), submit(), +submit_group(), alloc(), scope_begin/end. +""" + +from __future__ import annotations + +import numpy as np +from typing import Optional + +from .types import Arg, CompiledPackage, ParamType, TensorHandle +from .dag import TaskDAG, TaskNode + + +class L3OrchestratorContext: + """Runtime context for L3 Python orchestration functions. + + The orchestration function receives this as ``ctx`` and uses it to + discover chips and submit tasks. + """ + + def __init__(self, device_ids: list[int], + kernel_registry: dict[str, CompiledPackage]): + self._device_ids = list(device_ids) + self._kernel_registry = kernel_registry + self._dag = TaskDAG() + self._next_handle_id = 1 + self._handles: dict[int, TensorHandle] = {} + + @property + def dag(self) -> TaskDAG: + return self._dag + + def peers(self) -> list[int]: + """Return available chip IDs.""" + return list(self._device_ids) + + def alloc(self, shape: tuple, dtype: str = "float32") -> TensorHandle: + """Allocate a host-memory tensor and return a handle.""" + arr = np.zeros(shape, dtype=dtype) + handle_id = self._next_handle_id + self._next_handle_id += 1 + handle = TensorHandle( + id=handle_id, + shape=shape, + dtype=dtype, + size=arr.nbytes, + _data=arr, + ) + self._handles[handle_id] = handle + return handle + + def submit(self, chip: int, kernel: str, args: list[Arg]) -> TaskNode: + """Submit a single-chip task to the DAG. + + Args: + chip: Target chip ID + kernel: Name of a registered L2 package + args: List of Arg descriptors + """ + if chip not in self._device_ids: + raise ValueError(f"Chip {chip} not in available devices {self._device_ids}") + if kernel not in self._kernel_registry: + raise KeyError(f"Kernel '{kernel}' not registered. " + f"Available: {list(self._kernel_registry.keys())}") + return self._dag.add_task(chip=chip, kernel=kernel, args=args) + + def submit_group(self, chips: list[int], kernel: str, + args: list[Arg]) -> TaskNode: + """Submit a group task (multiple chips as one logical DAG node). + + All chips execute the same kernel in parallel. Used for collective + operations (allreduce, etc.) where chips communicate via P2P. + """ + for c in chips: + if c not in self._device_ids: + raise ValueError(f"Chip {c} not in available devices") + if kernel not in self._kernel_registry: + raise KeyError(f"Kernel '{kernel}' not registered") + + return self._dag.add_task( + chip=chips[0], # representative chip + kernel=kernel, + args=args, + is_group=True, + group_chips=chips, + ) + + def scope_begin(self) -> None: + pass # placeholder for future scope tracking + + def scope_end(self) -> None: + pass + + def get_kernel_package(self, name: str) -> CompiledPackage: + return self._kernel_registry[name] diff --git a/python/pto/l3_runtime.py b/python/pto/l3_runtime.py new file mode 100644 index 000000000..f578837dd --- /dev/null +++ b/python/pto/l3_runtime.py @@ -0,0 +1,200 @@ +"""L3 (single-host, multi-chip) runtime implementation. + +Manages per-chip worker processes, builds/executes a task DAG from a +Python orchestration function, and routes tasks to ChipWorkers. +""" + +from __future__ import annotations + +import logging +import time +from typing import Any, Callable, Optional + +from .types import Arg, CompiledPackage, ParamType +from .l3_context import L3OrchestratorContext +from .l3_worker import ChipWorker +from .dag import TaskNode + +logger = logging.getLogger(__name__) + + +class L3Runtime: + """Multi-chip runtime for a single host. + + Lifecycle: + 1. ``__init__`` — configure platform and device list + 2. ``register()`` — register named computations (Python orch + L2 packages) + 3. ``run()`` — execute: call orch function → build DAG → dispatch to workers + 4. ``close()`` — shut down workers + """ + + def __init__(self, platform: str, devices: list[int]): + self._platform = platform + self._devices = list(devices) + self._kernel_registry: dict[str, CompiledPackage] = {} + self._orch_registry: dict[str, Callable] = {} + self._workers: dict[int, ChipWorker] = {} + self._started = False + + def register(self, name: str, *, + orch: Callable = None, + kernels: dict[str, CompiledPackage] = None, + **kwargs) -> None: + """Register a named multi-chip computation. + + Args: + name: Computation name + orch: Python orchestration function ``f(ctx, args)`` + kernels: Dict mapping kernel names to pre-compiled L2 packages + """ + if orch is not None: + self._orch_registry[name] = orch + if kernels: + self._kernel_registry.update(kernels) + + def run(self, name: str, args: Any = None) -> Any: + """Execute a registered multi-chip computation. + + Steps: + 1. Ensure workers are started + 2. Create L3OrchestratorContext + 3. Call the Python orch function — this populates the DAG + 4. Dispatch ready tasks to chip workers + 5. Wait for all tasks to complete + """ + if name not in self._orch_registry: + raise KeyError(f"No registered orchestration '{name}'. " + f"Available: {list(self._orch_registry.keys())}") + + self._ensure_workers() + + ctx = L3OrchestratorContext( + device_ids=self._devices, + kernel_registry=self._kernel_registry, + ) + + orch_func = self._orch_registry[name] + result = orch_func(ctx, args) + + self._execute_dag(ctx) + + return result + + def close(self) -> None: + """Shut down all worker processes.""" + for worker in self._workers.values(): + worker.stop() + for worker in self._workers.values(): + worker.join(timeout=10.0) + self._workers.clear() + self._started = False + self._kernel_registry.clear() + self._orch_registry.clear() + + def _ensure_workers(self) -> None: + """Start worker processes if not already running.""" + if self._started: + return + + for device_id in self._devices: + worker = ChipWorker(device_id=device_id) + worker.start() + self._workers[device_id] = worker + + self._started = True + logger.info(f"Started {len(self._workers)} chip workers: {self._devices}") + + def _execute_dag(self, ctx: L3OrchestratorContext) -> None: + """Dispatch all tasks in the DAG to workers and wait for completion.""" + dag = ctx.dag + + if dag.task_count == 0: + return + + in_flight: dict[int, int] = {} # task_id → chip + + ready = dag.get_ready_tasks() + for node in ready: + self._dispatch_task(node, ctx, in_flight) + + while not dag.all_complete(): + for device_id, worker in self._workers.items(): + result = worker.poll_result(timeout=0.01) + if result is None: + continue + + task_id, success, error = result + if not success: + raise RuntimeError( + f"Task {task_id} failed on chip {in_flight.get(task_id, '?')}: {error}") + + in_flight.pop(task_id, None) + + newly_ready = dag.complete(task_id) + for node in newly_ready: + self._dispatch_task(node, ctx, in_flight) + + def _dispatch_task(self, node: TaskNode, ctx: L3OrchestratorContext, + in_flight: dict[int, int]) -> None: + """Send a task to the appropriate chip worker.""" + pkg = ctx.get_kernel_package(node.kernel) + + func_args, arg_types, arg_sizes = _args_to_raw(node.args) + + task_spec = { + "host_binary": pkg.host_binary, + "orch_binary": pkg.orch_binary, + "orch_func": pkg.orch_func, + "func_args": func_args, + "arg_types": arg_types, + "arg_sizes": arg_sizes, + "kernel_binaries": pkg.kernel_binaries, + "aicpu_thread_num": pkg.aicpu_thread_num, + "block_dim": pkg.block_dim, + "aicpu_binary": pkg.aicpu_binary, + "aicore_binary": pkg.aicore_binary, + "orch_thread_num": pkg.orch_thread_num, + } + + if node.is_group: + for chip in node.group_chips: + worker = self._workers.get(chip) + if worker is None: + raise RuntimeError(f"No worker for chip {chip}") + worker.send_task(node.task_id, task_spec) + else: + worker = self._workers.get(node.chip) + if worker is None: + raise RuntimeError(f"No worker for chip {node.chip}") + worker.send_task(node.task_id, task_spec) + + ctx.dag.mark_dispatched(node.task_id) + in_flight[node.task_id] = node.chip + logger.debug(f"Dispatched task {node.task_id} (kernel={node.kernel}) to chip {node.chip}") + + +def _args_to_raw(args: list[Arg]) -> tuple[list, list[int], list[int]]: + """Convert Arg list to parallel arrays for the worker task spec.""" + import numpy as np + + func_args = [] + arg_types = [] + arg_sizes = [] + + for arg in args: + arg_types.append(int(arg.type)) + if arg.type == ParamType.SCALAR: + func_args.append(int(arg.data)) + arg_sizes.append(0) + else: + arr = arg.data + if isinstance(arr, np.ndarray): + if not arr.flags["C_CONTIGUOUS"]: + arr = np.ascontiguousarray(arr) + func_args.append(arr.ctypes.data) + arg_sizes.append(arr.nbytes) + else: + func_args.append(0) + arg_sizes.append(arg.size) + + return func_args, arg_types, arg_sizes diff --git a/python/pto/l3_worker.py b/python/pto/l3_worker.py new file mode 100644 index 000000000..e3bd4a0da --- /dev/null +++ b/python/pto/l3_worker.py @@ -0,0 +1,132 @@ +"""ChipWorker — child process that owns one NPU device. + +Each worker process binds to one device via set_device() (DeviceRunner +is a process-global singleton), then loops receiving task specs from the +main process and executing them via the existing bindings.py API. +""" + +from __future__ import annotations + +import logging +import multiprocessing +import pickle +import sys +import traceback +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + +# Ensure simpler's python/ is importable +_PYTHON_DIR = str(Path(__file__).resolve().parent.parent) + + +def _worker_main(device_id: int, artifacts_dir: str, + cmd_pipe: multiprocessing.connection.Connection, + result_pipe: multiprocessing.connection.Connection): + """Entry point for a worker process. Runs in a child process.""" + if _PYTHON_DIR not in sys.path: + sys.path.insert(0, _PYTHON_DIR) + + from bindings import bind_host_binary, set_device, launch_runtime + + # Load host binary and bind to device (once) + host_binary_path = Path(artifacts_dir) / "host_runtime.so" + if host_binary_path.exists(): + RuntimeClass = bind_host_binary(str(host_binary_path)) + else: + # Binary passed inline in first task + RuntimeClass = None + + set_device(device_id) + + while True: + try: + msg = cmd_pipe.recv() + except EOFError: + break + + if msg is None: # shutdown + break + + task_id, task_spec = msg + try: + # Ensure host binary is loaded + if RuntimeClass is None and "host_binary" in task_spec: + RuntimeClass = bind_host_binary(task_spec["host_binary"]) + + rt = RuntimeClass() + rt.initialize( + task_spec["orch_binary"], + task_spec["orch_func"], + task_spec["func_args"], + arg_types=task_spec["arg_types"], + arg_sizes=task_spec["arg_sizes"], + kernel_binaries=task_spec["kernel_binaries"], + ) + launch_runtime( + rt, + aicpu_thread_num=task_spec.get("aicpu_thread_num", 1), + block_dim=task_spec.get("block_dim", 1), + device_id=device_id, + aicpu_binary=task_spec["aicpu_binary"], + aicore_binary=task_spec["aicore_binary"], + orch_thread_num=task_spec.get("orch_thread_num", 1), + ) + rt.finalize() + result_pipe.send((task_id, True, None)) + except Exception as e: + result_pipe.send((task_id, False, traceback.format_exc())) + + +class ChipWorker: + """Manages a child process that owns one NPU device.""" + + def __init__(self, device_id: int, artifacts_dir: str = ""): + self.device_id = device_id + self.artifacts_dir = artifacts_dir + self._process: Optional[multiprocessing.Process] = None + self._cmd_pipe: Optional[multiprocessing.connection.Connection] = None + self._result_pipe: Optional[multiprocessing.connection.Connection] = None + + def start(self) -> None: + parent_cmd, child_cmd = multiprocessing.Pipe() + child_result, parent_result = multiprocessing.Pipe() + + self._cmd_pipe = parent_cmd + self._result_pipe = parent_result + + self._process = multiprocessing.Process( + target=_worker_main, + args=(self.device_id, self.artifacts_dir, child_cmd, child_result), + daemon=True, + ) + self._process.start() + + def send_task(self, task_id: int, task_spec: dict) -> None: + """Send a task to the worker. Non-blocking.""" + self._cmd_pipe.send((task_id, task_spec)) + + def poll_result(self, timeout: float = 0.01): + """Check for a completed result. Returns (task_id, success, error) or None.""" + if self._result_pipe.poll(timeout): + return self._result_pipe.recv() + return None + + def stop(self) -> None: + """Send shutdown signal.""" + if self._cmd_pipe: + try: + self._cmd_pipe.send(None) + except BrokenPipeError: + pass + + def join(self, timeout: float = 10.0) -> None: + if self._process: + self._process.join(timeout=timeout) + if self._process.is_alive(): + self._process.terminate() + + @property + def is_alive(self) -> bool: + return self._process is not None and self._process.is_alive() diff --git a/python/pto/runtime.py b/python/pto/runtime.py new file mode 100644 index 000000000..8a11b96af --- /dev/null +++ b/python/pto/runtime.py @@ -0,0 +1,75 @@ +"""Unified Runtime entry point. + +Routes by ``level`` parameter to the appropriate runtime implementation: + - ``"chip"`` → L2Runtime (single chip) + - ``"host"`` → L3Runtime (single host, multi chip) +""" + +from __future__ import annotations + +from typing import Any, Callable, Optional, Union + +from .types import Arg, CompiledPackage + + +class Runtime: + """Unified runtime interface. + + Usage:: + + # L2 — single chip + rt = pto.Runtime(level="chip", platform="a2a3", device=0) + rt.register("vector_add", orch="orch.cpp", kernels=[...]) + rt.run("vector_add", args=[Arg.input(x), Arg.output(y), Arg.scalar(n)]) + rt.close() + + # L3 — single host, multi chip + rt = pto.Runtime(level="host", platform="a2a3", devices=[0, 1, 2, 3]) + pkg = pto.compile(platform="a2a3", orch="orch.cpp", kernels=[...]) + rt.register("pipeline", orch=my_orch_func, kernels={"compute": pkg}) + rt.run("pipeline", args={"input": data}) + rt.close() + """ + + def __init__(self, level: str = "chip", **kwargs): + self._level = level.lower() + self._impl = _create_impl(self._level, **kwargs) + + @property + def level(self) -> str: + return self._level + + def register(self, name: str, **kwargs) -> None: + """Register a named computation.""" + self._impl.register(name, **kwargs) + + def run(self, name: str, args: Any = None) -> Any: + """Execute a registered computation.""" + return self._impl.run(name, args=args) + + def close(self) -> None: + """Release resources.""" + self._impl.close() + + def __enter__(self): + return self + + def __exit__(self, *exc): + self.close() + + +def _create_impl(level: str, **kwargs): + """Instantiate the right runtime backend for the given level.""" + if level == "chip": + from .l2_runtime import L2Runtime + platform = kwargs.get("platform", "a2a3") + device = kwargs.get("device", 0) + return L2Runtime(platform=platform, device=device) + + if level == "host": + from .l3_runtime import L3Runtime + platform = kwargs.get("platform", "a2a3") + devices = kwargs.get("devices", [0]) + return L3Runtime(platform=platform, devices=devices) + + raise ValueError(f"Unknown level '{level}'. Supported: 'chip', 'host'") diff --git a/python/pto/types.py b/python/pto/types.py new file mode 100644 index 000000000..967edfe15 --- /dev/null +++ b/python/pto/types.py @@ -0,0 +1,105 @@ +"""Core types for the unified PTO runtime.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import IntEnum +from typing import Any, Union + +import numpy as np + + +class ParamType(IntEnum): + SCALAR = 0 + INPUT = 1 + OUTPUT = 2 + INOUT = 3 + + +class Arg: + """Unified argument descriptor for runtime submit/run calls. + + Replaces simpler's split arrays (func_args, arg_types, arg_sizes). + """ + + __slots__ = ("type", "data", "size") + + def __init__(self, type: ParamType, data: Any, size: int = 0): + self.type = type + self.data = data + self.size = size + + # ---- Convenience constructors ---- + + @classmethod + def input(cls, data: np.ndarray) -> Arg: + return cls(ParamType.INPUT, data, data.nbytes) + + @classmethod + def output(cls, data: np.ndarray) -> Arg: + return cls(ParamType.OUTPUT, data, data.nbytes) + + @classmethod + def inout(cls, data: np.ndarray) -> Arg: + return cls(ParamType.INOUT, data, data.nbytes) + + @classmethod + def scalar(cls, value: Union[int, float]) -> Arg: + return cls(ParamType.SCALAR, value, 0) + + def __repr__(self) -> str: + if self.type == ParamType.SCALAR: + return f"Arg.scalar({self.data})" + type_name = self.type.name.lower() + shape = getattr(self.data, "shape", "?") + return f"Arg.{type_name}(shape={shape}, size={self.size})" + + +@dataclass +class TensorHandle: + """Opaque handle for a host-memory tensor managed by the runtime.""" + + id: int + shape: tuple + dtype: str + size: int # bytes + _data: np.ndarray = field(repr=False, default=None) + + @property + def data(self) -> np.ndarray: + return self._data + + def numpy(self) -> np.ndarray: + if self._data is None: + raise ValueError("TensorHandle has no backing data") + return self._data + + +@dataclass +class KernelSource: + """Describes a single kernel source file to compile.""" + + source: str + core_type: str = "aiv" + func_id: int = 0 + + +@dataclass +class CompiledPackage: + """A fully compiled L2 execution package. + + Contains all binaries needed to run a single L2 invocation: + runtime binaries + orchestration .so + kernel binaries + config. + """ + + platform: str + runtime_name: str + host_binary: bytes + aicpu_binary: bytes + aicore_binary: bytes + orch_binary: bytes + orch_func: str + kernel_binaries: list = field(default_factory=list) # [(func_id, bytes)] + block_dim: int = 1 + aicpu_thread_num: int = 1 + orch_thread_num: int = 1