diff --git a/examples/scripts/code_runner.py b/examples/scripts/code_runner.py index b4d93c8a6..ae5068e98 100644 --- a/examples/scripts/code_runner.py +++ b/examples/scripts/code_runner.py @@ -794,7 +794,13 @@ def _compile_orchestration(): ) def _compile_one_kernel(kernel): - logger.info(f"Compiling kernel: {kernel['source']} (func_id={kernel['func_id']})") + compiler_type = kernel.get("compiler", "pto") + logger.info(f"Compiling kernel: {kernel['source']} " + f"(func_id={kernel['func_id']}, compiler={compiler_type})") + + if compiler_type == "ascendc": + return _compile_one_ascendc_kernel(kernel) + incore_o = kernel_compiler.compile_incore( kernel["source"], core_type=kernel["core_type"], @@ -808,6 +814,54 @@ def _compile_one_kernel(kernel): kernel_bin = extract_text_section(incore_o) return (kernel["func_id"], kernel_bin) + def _compile_one_ascendc_kernel(kernel): + from ascendc_compiler import AscendCCompiler, extract_kernel_artifacts + + ascendc = AscendCCompiler(platform=self.platform) + tiling_data = None + ascendc_kernel_o = None + + # Load pre-compiled .o from source path or kernel_meta + source_path = kernel["source"] + if source_path.endswith(".o"): + with open(source_path, 'rb') as f: + ascendc_kernel_o = f.read() + + # Extract from kernel_meta if provided + kernel_meta_dir = kernel.get("kernel_meta_dir") + if kernel_meta_dir: + kernel_meta_dir = os.path.abspath(kernel_meta_dir) + meta_o, tiling_data = extract_kernel_artifacts(kernel_meta_dir) + if ascendc_kernel_o is None and meta_o is not None: + ascendc_kernel_o = meta_o + + if ascendc_kernel_o is None: + raise ValueError( + f"AscendC kernel requires a pre-compiled .o file. " + f"Got source='{source_path}'. " + f"Compile the AscendC kernel externally (e.g. via tikcpp_smoke) " + f"and provide the .o path or kernel_meta_dir." + ) + + # Override tiling from explicit path + tiling_path = kernel.get("tiling_data_path") + if tiling_path and os.path.isfile(tiling_path): + with open(tiling_path, 'rb') as f: + tiling_data = f.read() + + combined_elf = ascendc.compile_ascendc_kernel( + ascendc_kernel_o=ascendc_kernel_o, + ascendc_kernel_symbol=kernel.get("ascendc_symbol", "ascendc_kernel"), + tensor_args=kernel.get("tensor_args", []), + tiling_data=tiling_data, + core_type=kernel.get("core_type", "aiv"), + has_workspace=kernel.get("has_workspace", False), + extra_include_dirs=runtime_include_dirs, + build_dir=self.build_dir, + ) + kernel_bin = extract_text_section(combined_elf) + return (kernel["func_id"], kernel_bin) + # Launch all compilations concurrently max_workers = 2 + len(self.kernels) # runtime + orchestration + kernels with ThreadPoolExecutor(max_workers=max_workers) as executor: diff --git a/python/ascendc_compiler.py b/python/ascendc_compiler.py new file mode 100644 index 000000000..fe450fd03 --- /dev/null +++ b/python/ascendc_compiler.py @@ -0,0 +1,412 @@ +""" +AscendC Kernel Compiler for PTO Runtime Integration + +Wraps pre-compiled AscendC operator binaries (.o) into PTO-compatible kernel +binaries. The key bridge is a generated "wrapper kernel" that adapts PTO's +unified kernel entry (int64_t* args with Tensor pointers) to AscendC's +per-tensor GM_ADDR calling convention. + +The AscendC kernel .o is compiled externally (e.g. via tikcpp_smoke + +npu_op_kernel_options --save-temp-files). This module only consumes the +pre-compiled .o — it does NOT compile AscendC source code. + +Workflow (compile wrapper + link): + 1. Read pre-compiled kernel .o (from kernel_meta/ or provided directly) + 2. Generate wrapper .cpp (kernel_entry + embedded tiling data) + 3. Compile wrapper with PTO compiler (ccec -x cce) -> wrapper.o + 4. Link: ld.lld -e kernel_entry -Ttext=0 wrapper.o kernel.o -> combined.elf + 5. extract_text_section(combined.elf) -> kernel binary (done by caller) + +The wrapper is always compiled with PTO flags (-x cce) because it only needs +tensor.h and contains no AscendC code. + +Usage: + compiler = AscendCCompiler(platform="a2a3") + kernel_o, tiling = extract_kernel_artifacts("/path/to/kernel_meta") + combined = compiler.compile_ascendc_kernel( + ascendc_kernel_o=kernel_o, + ascendc_kernel_symbol="add_custom", + tensor_args=[ + {"name": "x", "direction": "input"}, + {"name": "y", "direction": "input"}, + {"name": "z", "direction": "output"}, + ], + tiling_data=tiling, + core_type="aiv", + ) + # caller does: kernel_bin = extract_text_section(combined) +""" + +import json +import logging +import os +import subprocess +import tempfile +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +import env_manager +from toolchain import CCECToolchain + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Wrapper source generation +# --------------------------------------------------------------------------- + +def generate_wrapper_source( + ascendc_kernel_symbol: str, + tensor_args: List[Dict[str, str]], + tiling_data: Optional[bytes] = None, + has_workspace: bool = False, +) -> str: + """Generate a C++ wrapper that bridges PTO kernel_entry to an AscendC kernel. + + The generated source: + - Defines ``kernel_entry(__gm__ int64_t* args)`` (PTO dispatch convention) + - Unpacks each ``args[i]`` as a ``Tensor*``, extracts ``buffer.addr`` + - Forwards the raw GM byte-addresses to the AscendC kernel function + - Embeds static tiling data as a ``constexpr`` array when provided + + Compiled with PTO flags (ccec -x cce). Only needs tensor.h, no AscendC SDK. + + Args: + ascendc_kernel_symbol: The ``extern "C"`` symbol name of the AscendC + kernel entry function in the compiled .o (e.g. ``"add_custom"``). + tensor_args: Ordered list of tensor descriptors. Each dict has: + ``name`` -- human-readable label (used in comments) + ``direction`` -- ``"input"`` | ``"output"`` | ``"inout"`` + tiling_data: Raw bytes of the tiling data blob. If *None*, the wrapper + passes a nullptr for the tiling parameter. + has_workspace: Whether the AscendC kernel expects a workspace pointer. + + Returns: + Complete C++ source string ready for ccec compilation. + """ + lines: List[str] = [] + + # --- header / includes --------------------------------------------------- + lines.append('#include "tensor.h"') + lines.append('') + # Ensure __gm__ and __aicore__ are defined (PTO ccec -x cce uses [aicore]) + lines.append('#ifndef __gm__') + lines.append('#define __gm__') + lines.append('#endif') + lines.append('#ifndef __aicore__') + lines.append('#define __aicore__ [aicore]') + lines.append('#endif') + lines.append('') + + # --- AscendC kernel forward declaration ---------------------------------- + # Tensor pointers are in GM address space; workspace and tiling use plain + # uint8_t* because the wrapper may embed tiling in const data (not GM) and + # PTO's ccec treats __gm__ as a real address-space qualifier. The linker + # resolves symbols by name only, so parameter types don't need to match + # the AscendC side exactly. + param_strs = ['__gm__ uint8_t*'] * len(tensor_args) + if has_workspace: + param_strs.append('uint8_t*') # workspace (nullptr from wrapper) + param_strs.append('uint8_t*') # tiling (may be const data, not GM) + decl_params = ', '.join(param_strs) + lines.append(f'extern "C" __aicore__ void {ascendc_kernel_symbol}({decl_params});') + lines.append('') + + # --- embedded tiling data ------------------------------------------------ + if tiling_data is not None and len(tiling_data) > 0: + lines.append(f'static const uint8_t TILING_DATA[{len(tiling_data)}] = {{') + chunk = 16 + for i in range(0, len(tiling_data), chunk): + segment = tiling_data[i:i + chunk] + hex_segment = ', '.join(f'0x{b:02x}' for b in segment) + trailing = ',' if i + chunk < len(tiling_data) else '' + lines.append(f' {hex_segment}{trailing}') + lines.append('};') + lines.append('') + + # --- kernel_entry -------------------------------------------------------- + lines.append('extern "C" __aicore__ void kernel_entry(__gm__ int64_t* args) {') + + # Unpack Tensor* from args[] + for idx, ta in enumerate(tensor_args): + lines.append(f' __gm__ Tensor* t_{ta["name"]} = ' + f'reinterpret_cast<__gm__ Tensor*>(args[{idx}]);') + + lines.append('') + + # Extract raw GM byte-addresses + for ta in tensor_args: + lines.append( + f' __gm__ uint8_t* gm_{ta["name"]} = ' + f'reinterpret_cast<__gm__ uint8_t*>(t_{ta["name"]}->buffer.addr);' + ) + + lines.append('') + + # Build call arguments + call_args = [f'gm_{ta["name"]}' for ta in tensor_args] + if has_workspace: + call_args.append('nullptr') + if tiling_data is not None and len(tiling_data) > 0: + call_args.append('(uint8_t*)TILING_DATA') + else: + call_args.append('nullptr') + call_str = ', '.join(call_args) + lines.append(f' {ascendc_kernel_symbol}({call_str});') + + lines.append('}') + lines.append('') + + return '\n'.join(lines) + + +# --------------------------------------------------------------------------- +# AscendC artifact extraction +# --------------------------------------------------------------------------- + +def extract_kernel_artifacts( + kernel_meta_dir: str, +) -> Tuple[Optional[bytes], Optional[bytes]]: + """Extract compiled kernel .o and tiling data from an AscendC kernel_meta dir. + + The kernel_meta directory is produced by ``npu_op_kernel_options`` with + ``--save-temp-files``. It typically contains: + - ``*.o`` -- the compiled AICore kernel binary + - ``*.json`` -- metadata including tiling information + + Args: + kernel_meta_dir: Absolute path to a kernel_meta directory + + Returns: + (kernel_o_bytes, tiling_bytes) -- either may be None if not found + """ + meta_dir = Path(kernel_meta_dir) + if not meta_dir.is_dir(): + raise FileNotFoundError(f"kernel_meta directory not found: {kernel_meta_dir}") + + kernel_o_bytes: Optional[bytes] = None + tiling_bytes: Optional[bytes] = None + + # Find the .o file (there should be exactly one) + o_files = sorted(meta_dir.glob("*.o")) + if o_files: + with open(o_files[0], 'rb') as f: + kernel_o_bytes = f.read() + logger.info(f"[AscendC] Loaded kernel .o: {o_files[0]} ({len(kernel_o_bytes)} bytes)") + + # Look for tiling data -- could be a .bin or embedded in a .json + tiling_bins = sorted(meta_dir.glob("*tiling*.bin")) + if tiling_bins: + with open(tiling_bins[0], 'rb') as f: + tiling_bytes = f.read() + logger.info(f"[AscendC] Loaded tiling data: {tiling_bins[0]} ({len(tiling_bytes)} bytes)") + else: + json_files = sorted(meta_dir.glob("*.json")) + for jf in json_files: + try: + with open(jf) as f: + meta = json.load(f) + if "tiling_data" in meta: + td = meta["tiling_data"] + if isinstance(td, str): + tiling_bytes = bytes.fromhex(td) + elif isinstance(td, list): + tiling_bytes = bytes(td) + logger.info(f"[AscendC] Extracted tiling from {jf.name}") + break + except (json.JSONDecodeError, KeyError, ValueError): + continue + + return kernel_o_bytes, tiling_bytes + + +# --------------------------------------------------------------------------- +# Main compiler class +# --------------------------------------------------------------------------- + +class AscendCCompiler: + """Wrap pre-compiled AscendC kernel binaries into PTO-compatible kernel entries. + + Compile wrapper + link: + 1. Write pre-compiled kernel .o to build dir + 2. Generate + compile wrapper.o with PTO compiler (ccec -x cce) + 3. Link wrapper.o + kernel.o via ld.lld (wrapper first -> offset 0) + + The AscendC kernel .o must be compiled externally (e.g. via tikcpp_smoke + + npu_op_kernel_options --save-temp-files). + + Typical usage:: + + compiler = AscendCCompiler(platform="a2a3") + combined = compiler.compile_ascendc_kernel( + ascendc_kernel_o=kernel_o_bytes, + ascendc_kernel_symbol="add_custom", + tensor_args=[...], + tiling_data=tiling_bytes, + ) + kernel_bin = extract_text_section(combined) + """ + + def __init__(self, platform: str = "a2a3"): + self.platform = platform + self.project_root = Path(__file__).parent.parent + + if platform in ("a2a3", "a2a3sim"): + self.platform_dir = self.project_root / "src" / "a2a3" / "platform" + elif platform in ("a5", "a5sim"): + self.platform_dir = self.project_root / "src" / "a5" / "platform" + else: + raise ValueError(f"Unknown platform: {platform}") + + if platform in ("a2a3", "a5"): + env_manager.ensure("ASCEND_HOME_PATH") + self.pto_toolchain = CCECToolchain(platform) + else: + self.pto_toolchain = None + + def _get_runtime_include_dir(self, runtime_name: str = "tensormap_and_ringbuffer") -> str: + arch = "a2a3" if self.platform in ("a2a3", "a2a3sim") else "a5" + return str(self.project_root / "src" / arch / "runtime" / runtime_name / "runtime") + + def compile_ascendc_kernel( + self, + ascendc_kernel_o: bytes, + ascendc_kernel_symbol: str = "ascendc_kernel", + tensor_args: Optional[List[Dict[str, str]]] = None, + tiling_data: Optional[bytes] = None, + core_type: str = "aiv", + has_workspace: bool = False, + extra_include_dirs: Optional[List[str]] = None, + build_dir: Optional[str] = None, + ) -> bytes: + """Wrap a pre-compiled AscendC kernel .o into a PTO-compatible linked ELF. + + Generates a PTO wrapper, compiles it with PTO flags, then links it with + the kernel .o so that ``kernel_entry`` sits at .text offset 0. + + The caller should run ``extract_text_section(result)`` on the output. + + Args: + ascendc_kernel_o: Pre-compiled kernel ``.o`` bytes (from external + AscendC compilation, e.g. tikcpp_smoke kernel_meta/). + ascendc_kernel_symbol: ``extern "C"`` symbol of the AscendC kernel. + tensor_args: Ordered tensor descriptor list for wrapper generation. + tiling_data: Static tiling data blob (None = no tiling). + core_type: ``"aiv"`` (vector) or ``"aic"`` (cube). + has_workspace: Whether AscendC kernel expects a workspace pointer. + extra_include_dirs: Extra -I paths for wrapper compilation. + build_dir: Working directory for intermediates. + + Returns: + Linked ELF bytes (pass to ``extract_text_section``). + """ + if self.pto_toolchain is None: + raise RuntimeError( + "AscendC kernel wrapping requires a hardware platform (a2a3/a5). " + "Simulation platforms are not supported." + ) + + if tensor_args is None: + tensor_args = [] + + work_dir = build_dir or tempfile.mkdtemp(prefix="ascendc_build_") + os.makedirs(work_dir, exist_ok=True) + + # --- Step 1: Write pre-compiled kernel .o ----------------------------- + kernel_o_path = os.path.join(work_dir, "ascendc_kernel.o") + with open(kernel_o_path, 'wb') as f: + f.write(ascendc_kernel_o) + logger.info(f"[AscendC] Using pre-compiled kernel .o " + f"({len(ascendc_kernel_o)} bytes)") + + # --- Step 2: Generate + compile wrapper with PTO flags --------------- + wrapper_src = generate_wrapper_source( + ascendc_kernel_symbol=ascendc_kernel_symbol, + tensor_args=tensor_args, + tiling_data=tiling_data, + has_workspace=has_workspace, + ) + + wrapper_cpp_path = os.path.join(work_dir, "ascendc_wrapper.cpp") + with open(wrapper_cpp_path, 'w') as f: + f.write(wrapper_src) + logger.info(f"[AscendC] Generated wrapper: {wrapper_cpp_path}") + + wrapper_o_path = os.path.join(work_dir, "ascendc_wrapper.o") + wrapper_includes = [self._get_runtime_include_dir()] + if extra_include_dirs: + wrapper_includes.extend(extra_include_dirs) + + wrapper_cmd = [self.pto_toolchain.cxx_path] + wrapper_cmd += self.pto_toolchain.get_compile_flags(core_type=core_type) + for inc in wrapper_includes: + wrapper_cmd.append(f"-I{os.path.abspath(inc)}") + wrapper_cmd.extend(["-o", wrapper_o_path, wrapper_cpp_path]) + + logger.info("[AscendC] Compiling wrapper with PTO flags") + self._run(wrapper_cmd, "PTO-Wrapper") + + # --- Step 3: Link wrapper.o + kernel.o ------------------------------- + # wrapper.o listed FIRST so kernel_entry lands at .text offset 0 + combined_path = os.path.join(work_dir, "ascendc_combined.elf") + link_cmd = [ + self.pto_toolchain.linker_path, + "-e", "kernel_entry", "-Ttext=0", + "-o", combined_path, + wrapper_o_path, kernel_o_path, + ] + + logger.info("[AscendC] Linking wrapper.o + kernel.o") + self._run(link_cmd, "AscendC-Link") + + with open(combined_path, 'rb') as f: + result_bytes = f.read() + + logger.info(f"[AscendC] Combined binary: {len(result_bytes)} bytes") + return result_bytes + + def compile_from_kernel_meta( + self, + kernel_meta_dir: str, + ascendc_kernel_symbol: str, + tensor_args: List[Dict[str, str]], + core_type: str = "aiv", + has_workspace: bool = False, + extra_include_dirs: Optional[List[str]] = None, + build_dir: Optional[str] = None, + ) -> bytes: + """Convenience: extract .o and tiling from kernel_meta, then wrap+link.""" + kernel_o, tiling_data = extract_kernel_artifacts(kernel_meta_dir) + if kernel_o is None: + raise FileNotFoundError( + f"No .o file found in kernel_meta: {kernel_meta_dir}" + ) + + return self.compile_ascendc_kernel( + ascendc_kernel_o=kernel_o, + ascendc_kernel_symbol=ascendc_kernel_symbol, + tensor_args=tensor_args, + tiling_data=tiling_data, + core_type=core_type, + has_workspace=has_workspace, + extra_include_dirs=extra_include_dirs, + build_dir=build_dir, + ) + + @staticmethod + def _run(cmd: List[str], label: str) -> subprocess.CompletedProcess: + """Run a subprocess with logging and error handling.""" + logger.debug(f"[{label}] {' '.join(cmd)}") + try: + result = subprocess.run(cmd, capture_output=True, text=True) + if result.stdout: + logger.debug(f"[{label}] stdout:\n{result.stdout}") + if result.stderr: + logger.debug(f"[{label}] stderr:\n{result.stderr}") + if result.returncode != 0: + raise RuntimeError( + f"{label} failed (exit {result.returncode}):\n{result.stderr}" + ) + return result + except FileNotFoundError as exc: + raise RuntimeError(f"{label}: tool not found -- {exc}") from exc diff --git a/python/pto/__init__.py b/python/pto/__init__.py new file mode 100644 index 000000000..031c721f2 --- /dev/null +++ b/python/pto/__init__.py @@ -0,0 +1,36 @@ +"""PTO — Python Tensor Orchestration runtime. + +Provides a unified API for L2 (single chip) through L3+ (multi-chip) +execution on Ascend NPU devices. + +Quick start:: + + import pto + + # Single chip (L2) + rt = pto.Runtime(level="chip", platform="a2a3", device=0) + rt.register("vector_add", orch="orch.cpp", kernels=[...]) + rt.run("vector_add", args=[pto.Arg.input(x), pto.Arg.output(y)]) + rt.close() + + # Multi-chip (L3) + rt = pto.Runtime(level="host", platform="a2a3", devices=[0, 1, 2, 3]) + pkg = pto.compile(platform="a2a3", orch="orch.cpp", kernels=[...]) + rt.register("pipeline", orch=my_orch_func, kernels={"compute": pkg}) + rt.run("pipeline", args={"input": data}) + rt.close() +""" + +from .types import Arg, CompiledPackage, KernelSource, ParamType, TensorHandle +from .runtime import Runtime +from .compiler import compile + +__all__ = [ + "Runtime", + "Arg", + "compile", + "CompiledPackage", + "KernelSource", + "ParamType", + "TensorHandle", +] diff --git a/python/pto/compiler.py b/python/pto/compiler.py new file mode 100644 index 000000000..fc49fddf8 --- /dev/null +++ b/python/pto/compiler.py @@ -0,0 +1,172 @@ +"""Compilation utilities for PTO runtime. + +Wraps the existing RuntimeBuilder + KernelCompiler pipeline and adds +SHA256-based caching. +""" + +from __future__ import annotations + +import hashlib +import logging +import os +import pickle +import sys +from pathlib import Path +from typing import Optional + +from .types import CompiledPackage, KernelSource + +logger = logging.getLogger(__name__) + +# Ensure simpler's python/ is importable +_PYTHON_DIR = str(Path(__file__).resolve().parent.parent) +if _PYTHON_DIR not in sys.path: + sys.path.insert(0, _PYTHON_DIR) + +# Also need examples/scripts for elf_parser +_SCRIPTS_DIR = str(Path(__file__).resolve().parent.parent.parent / "examples" / "scripts") +if _SCRIPTS_DIR not in sys.path: + sys.path.insert(0, _SCRIPTS_DIR) + + +def _cache_key(platform: str, runtime_name: str, orch_source: str, + kernel_sources: list[dict], orch_func: str) -> str: + """Compute a SHA256 cache key from source contents + config.""" + h = hashlib.sha256() + h.update(platform.encode()) + h.update(runtime_name.encode()) + h.update(orch_func.encode()) + + # Hash orchestration source content + orch_path = Path(orch_source) + if orch_path.is_file(): + h.update(orch_path.read_bytes()) + else: + h.update(orch_source.encode()) + + # Hash kernel source contents + for ks in sorted(kernel_sources, key=lambda k: k.get("func_id", 0)): + src_path = Path(ks["source"]) + if src_path.is_file(): + h.update(src_path.read_bytes()) + else: + h.update(ks["source"].encode()) + h.update(ks.get("core_type", "aiv").encode()) + h.update(str(ks.get("func_id", 0)).encode()) + + return h.hexdigest() + + +def compile( + platform: str, + runtime_name: str = "tensormap_and_ringbuffer", + orch_source: str = "", + kernel_sources: Optional[list[dict]] = None, + orch_func: str = "aicpu_orchestration_entry", + block_dim: int = 1, + aicpu_thread_num: int = 1, + orch_thread_num: int = 1, + cache_dir: Optional[str] = None, + build_dir: Optional[str] = None, + extra_include_dirs: Optional[list[str]] = None, +) -> CompiledPackage: + """Compile all artifacts for an L2 execution package. + + Wraps RuntimeBuilder.build() + KernelCompiler to produce a + CompiledPackage containing all binaries needed for a single L2 run. + + Args: + platform: "a2a3" | "a2a3sim" | "a5" | "a5sim" + runtime_name: Which L2 runtime variant to compile + orch_source: Path to orchestration .cpp source + kernel_sources: List of dicts with keys: source, core_type, func_id + orch_func: Orchestration entry function name + block_dim: Number of AICore blocks + aicpu_thread_num: AICPU thread count + orch_thread_num: Orchestration thread count + cache_dir: Optional cache directory for compiled artifacts + build_dir: Optional build directory for intermediate files + extra_include_dirs: Additional include directories for kernel compilation + + Returns: + CompiledPackage with all binaries ready for L2 execution + """ + if kernel_sources is None: + kernel_sources = [] + + # Check cache + if cache_dir: + cache_path = Path(cache_dir).expanduser() + cache_path.mkdir(parents=True, exist_ok=True) + key = _cache_key(platform, runtime_name, orch_source, + kernel_sources, orch_func) + cached_file = cache_path / f"{key}.pkg" + if cached_file.exists(): + logger.info(f"Cache hit: {cached_file}") + with open(cached_file, "rb") as f: + return pickle.load(f) + + from runtime_builder import RuntimeBuilder + from elf_parser import extract_text_section + + builder = RuntimeBuilder(platform=platform) + kernel_compiler = builder.get_kernel_compiler() + + # Build runtime (host.so, aicpu.so, aicore.o) + logger.info("Compiling runtime...") + host_binary, aicpu_binary, aicore_binary = builder.build( + runtime_name, build_dir) + + # Compile orchestration + orch_binary = b"" + if orch_source: + logger.info(f"Compiling orchestration: {orch_source}") + orch_binary = kernel_compiler.compile_orchestration( + runtime_name, orch_source, build_dir=build_dir) + + # Compile kernels + compiled_kernels = [] + for ks in kernel_sources: + src = ks["source"] + core_type = ks.get("core_type", "aiv") + func_id = ks.get("func_id", 0) + + logger.info(f"Compiling kernel: {src} (core_type={core_type}, func_id={func_id})") + + pto_isa_root = os.environ.get("PTO_ISA_ROOT", "") + incore_o = kernel_compiler.compile_incore( + src, core_type=core_type, + pto_isa_root=pto_isa_root, + extra_include_dirs=extra_include_dirs or [], + build_dir=build_dir, + ) + + # Extract .text section for hardware platforms + if not platform.endswith("sim"): + kernel_bin = extract_text_section(incore_o) + else: + kernel_bin = incore_o + + compiled_kernels.append((func_id, kernel_bin)) + + pkg = CompiledPackage( + platform=platform, + runtime_name=runtime_name, + host_binary=host_binary, + aicpu_binary=aicpu_binary, + aicore_binary=aicore_binary, + orch_binary=orch_binary, + orch_func=orch_func, + kernel_binaries=compiled_kernels, + block_dim=block_dim, + aicpu_thread_num=aicpu_thread_num, + orch_thread_num=orch_thread_num, + ) + + # Save to cache + if cache_dir: + logger.info(f"Caching compiled package: {cached_file}") + with open(cached_file, "wb") as f: + pickle.dump(pkg, f) + + return pkg diff --git a/python/pto/dag.py b/python/pto/dag.py new file mode 100644 index 000000000..56be2273e --- /dev/null +++ b/python/pto/dag.py @@ -0,0 +1,123 @@ +"""Task DAG with handle-based dependency inference and eager dispatch.""" + +from __future__ import annotations + +from collections import defaultdict +from dataclasses import dataclass, field +from typing import Any + +from .types import ParamType + + +@dataclass +class TaskNode: + """A single task in the DAG.""" + + task_id: int + chip: int + kernel: str + args: list + deps: set[int] = field(default_factory=set) + is_group: bool = False + group_chips: list[int] = field(default_factory=list) + + +class TaskDAG: + """Dependency graph for L3 task scheduling. + + Dependencies are inferred automatically from tensor handle usage: + if task B reads a handle that task A produced (OUTPUT/INOUT), + then B depends on A. + """ + + def __init__(self): + self._tasks: dict[int, TaskNode] = {} + self._handle_producer: dict[int, int] = {} # handle_id → producer task_id + self._consumers: dict[int, set[int]] = defaultdict(set) # task_id → dependents + self._completed: set[int] = set() + self._dispatched: set[int] = set() + self._next_id = 0 + + def add_task(self, chip: int, kernel: str, args: list, + is_group: bool = False, + group_chips: list[int] = None) -> TaskNode: + """Add a task and infer dependencies from tensor handles. + + Returns the TaskNode (with deps populated). + """ + task_id = self._next_id + self._next_id += 1 + + deps = set() + for arg in args: + if not hasattr(arg, "type"): + continue + if arg.type in (ParamType.INPUT, ParamType.INOUT): + handle_id = getattr(arg, "_handle_id", None) + if handle_id is not None: + producer = self._handle_producer.get(handle_id) + if producer is not None and producer not in self._completed: + deps.add(producer) + + # Register outputs + for arg in args: + if not hasattr(arg, "type"): + continue + if arg.type in (ParamType.OUTPUT, ParamType.INOUT): + handle_id = getattr(arg, "_handle_id", None) + if handle_id is not None: + self._handle_producer[handle_id] = task_id + + node = TaskNode( + task_id=task_id, + chip=chip, + kernel=kernel, + args=args, + deps=deps, + is_group=is_group, + group_chips=group_chips or [], + ) + self._tasks[task_id] = node + + for dep in deps: + self._consumers[dep].add(task_id) + + return node + + def is_ready(self, task_id: int) -> bool: + """Check if a task has all dependencies satisfied.""" + node = self._tasks.get(task_id) + if node is None: + return False + return len(node.deps) == 0 + + def mark_dispatched(self, task_id: int) -> None: + self._dispatched.add(task_id) + + def complete(self, task_id: int) -> list[TaskNode]: + """Mark task complete and return newly-ready tasks.""" + self._completed.add(task_id) + ready = [] + + for consumer_id in self._consumers.get(task_id, set()): + consumer = self._tasks[consumer_id] + consumer.deps.discard(task_id) + if len(consumer.deps) == 0 and consumer_id not in self._dispatched: + ready.append(consumer) + + return ready + + def get_ready_tasks(self) -> list[TaskNode]: + """Return all tasks that are ready but not yet dispatched.""" + ready = [] + for tid, node in self._tasks.items(): + if tid not in self._dispatched and len(node.deps) == 0: + ready.append(node) + return ready + + def all_complete(self) -> bool: + return len(self._completed) == len(self._tasks) + + @property + def task_count(self) -> int: + return len(self._tasks) diff --git a/python/pto/l2_runtime.py b/python/pto/l2_runtime.py new file mode 100644 index 000000000..f216467bf --- /dev/null +++ b/python/pto/l2_runtime.py @@ -0,0 +1,164 @@ +"""L2 (single-chip) runtime implementation. + +Wraps the existing bindings.py ctypes interface behind the unified +init/register/run/close API. +""" + +from __future__ import annotations + +import ctypes +import logging +import sys +from pathlib import Path +from typing import Optional + +import numpy as np + +from .types import Arg, CompiledPackage, ParamType + +logger = logging.getLogger(__name__) + +# Ensure simpler's python/ is importable +_PYTHON_DIR = str(Path(__file__).resolve().parent.parent) +if _PYTHON_DIR not in sys.path: + sys.path.insert(0, _PYTHON_DIR) + + +def _args_to_c_arrays(args: list[Arg]): + """Convert a list of Arg to the parallel arrays that bindings.py expects. + + Returns: + (func_args, arg_types, arg_sizes, host_tensors) + host_tensors is a dict mapping index to numpy array for copy-back tracking. + """ + from bindings import ARG_SCALAR, ARG_INPUT_PTR, ARG_OUTPUT_PTR, ARG_INOUT_PTR + + type_map = { + ParamType.SCALAR: ARG_SCALAR, + ParamType.INPUT: ARG_INPUT_PTR, + ParamType.OUTPUT: ARG_OUTPUT_PTR, + ParamType.INOUT: ARG_INOUT_PTR, + } + + func_args = [] + arg_types = [] + arg_sizes = [] + + for arg in args: + arg_types.append(type_map[arg.type]) + if arg.type == ParamType.SCALAR: + func_args.append(int(arg.data)) + arg_sizes.append(0) + else: + # Tensor argument — pass host pointer + arr = arg.data + if not isinstance(arr, np.ndarray): + raise TypeError(f"Expected numpy array for tensor arg, got {type(arr)}") + if not arr.flags["C_CONTIGUOUS"]: + arr = np.ascontiguousarray(arr) + ptr = arr.ctypes.data + func_args.append(ptr) + arg_sizes.append(arr.nbytes) + + return func_args, arg_types, arg_sizes + + +class L2Runtime: + """Single-chip runtime. Wraps bindings.py.""" + + def __init__(self, platform: str, device: int = 0): + self._platform = platform + self._device = device + self._registry: dict[str, CompiledPackage] = {} + self._lib_loaded = False + self._RuntimeClass = None + + def register(self, name: str, *, pkg: CompiledPackage = None, + orch: str = "", kernels: list[dict] = None, + runtime: str = "tensormap_and_ringbuffer", + orch_func: str = "aicpu_orchestration_entry", + block_dim: int = 1, aicpu_thread_num: int = 1, + orch_thread_num: int = 1, + cache_dir: Optional[str] = None, + build_dir: Optional[str] = None, + extra_include_dirs: Optional[list[str]] = None, + **kwargs) -> None: + """Register a named computation. + + Either pass a pre-compiled CompiledPackage via ``pkg``, or pass + source paths via ``orch`` + ``kernels`` to compile on the fly. + """ + if pkg is not None: + self._registry[name] = pkg + return + + # Compile from source + from .compiler import compile as pto_compile + + pkg = pto_compile( + platform=self._platform, + runtime_name=runtime, + orch_source=orch, + kernel_sources=kernels or [], + orch_func=orch_func, + block_dim=block_dim, + aicpu_thread_num=aicpu_thread_num, + orch_thread_num=orch_thread_num, + cache_dir=cache_dir, + build_dir=build_dir, + extra_include_dirs=extra_include_dirs, + ) + self._registry[name] = pkg + + def run(self, name: str, args: list[Arg]) -> None: + """Execute a registered computation on the device. + + Performs the full L2 lifecycle: init → launch → finalize. + """ + if name not in self._registry: + raise KeyError(f"No registered computation '{name}'. " + f"Available: {list(self._registry.keys())}") + + pkg = self._registry[name] + self._ensure_loaded(pkg) + + func_args, arg_types, arg_sizes = _args_to_c_arrays(args) + + from bindings import launch_runtime + + rt = self._RuntimeClass() + rt.initialize( + pkg.orch_binary, + pkg.orch_func, + func_args, + arg_types=arg_types, + arg_sizes=arg_sizes, + kernel_binaries=pkg.kernel_binaries, + ) + launch_runtime( + rt, + aicpu_thread_num=pkg.aicpu_thread_num, + block_dim=pkg.block_dim, + device_id=self._device, + aicpu_binary=pkg.aicpu_binary, + aicore_binary=pkg.aicore_binary, + orch_thread_num=pkg.orch_thread_num, + ) + rt.finalize() + + def close(self) -> None: + """Release resources.""" + self._registry.clear() + self._RuntimeClass = None + self._lib_loaded = False + + def _ensure_loaded(self, pkg: CompiledPackage) -> None: + """Load the host binary and set device if not already done.""" + if self._lib_loaded: + return + + from bindings import bind_host_binary, set_device + + self._RuntimeClass = bind_host_binary(pkg.host_binary) + set_device(self._device) + self._lib_loaded = True diff --git a/python/pto/l3_context.py b/python/pto/l3_context.py new file mode 100644 index 000000000..3f49c3489 --- /dev/null +++ b/python/pto/l3_context.py @@ -0,0 +1,97 @@ +"""L3OrchestratorContext — the ``ctx`` object passed to Python orchestration functions. + +Provides the API that L3 orchestration code calls: peers(), submit(), +submit_group(), alloc(), scope_begin/end. +""" + +from __future__ import annotations + +import numpy as np +from typing import Optional + +from .types import Arg, CompiledPackage, ParamType, TensorHandle +from .dag import TaskDAG, TaskNode + + +class L3OrchestratorContext: + """Runtime context for L3 Python orchestration functions. + + The orchestration function receives this as ``ctx`` and uses it to + discover chips and submit tasks. + """ + + def __init__(self, device_ids: list[int], + kernel_registry: dict[str, CompiledPackage]): + self._device_ids = list(device_ids) + self._kernel_registry = kernel_registry + self._dag = TaskDAG() + self._next_handle_id = 1 + self._handles: dict[int, TensorHandle] = {} + + @property + def dag(self) -> TaskDAG: + return self._dag + + def peers(self) -> list[int]: + """Return available chip IDs.""" + return list(self._device_ids) + + def alloc(self, shape: tuple, dtype: str = "float32") -> TensorHandle: + """Allocate a host-memory tensor and return a handle.""" + arr = np.zeros(shape, dtype=dtype) + handle_id = self._next_handle_id + self._next_handle_id += 1 + handle = TensorHandle( + id=handle_id, + shape=shape, + dtype=dtype, + size=arr.nbytes, + _data=arr, + ) + self._handles[handle_id] = handle + return handle + + def submit(self, chip: int, kernel: str, args: list[Arg]) -> TaskNode: + """Submit a single-chip task to the DAG. + + Args: + chip: Target chip ID + kernel: Name of a registered L2 package + args: List of Arg descriptors + """ + if chip not in self._device_ids: + raise ValueError(f"Chip {chip} not in available devices {self._device_ids}") + if kernel not in self._kernel_registry: + raise KeyError(f"Kernel '{kernel}' not registered. " + f"Available: {list(self._kernel_registry.keys())}") + return self._dag.add_task(chip=chip, kernel=kernel, args=args) + + def submit_group(self, chips: list[int], kernel: str, + args: list[Arg]) -> TaskNode: + """Submit a group task (multiple chips as one logical DAG node). + + All chips execute the same kernel in parallel. Used for collective + operations (allreduce, etc.) where chips communicate via P2P. + """ + for c in chips: + if c not in self._device_ids: + raise ValueError(f"Chip {c} not in available devices") + if kernel not in self._kernel_registry: + raise KeyError(f"Kernel '{kernel}' not registered") + + return self._dag.add_task( + chip=chips[0], # representative chip + kernel=kernel, + args=args, + is_group=True, + group_chips=chips, + ) + + def scope_begin(self) -> None: + pass # placeholder for future scope tracking + + def scope_end(self) -> None: + pass + + def get_kernel_package(self, name: str) -> CompiledPackage: + return self._kernel_registry[name] diff --git a/python/pto/l3_runtime.py b/python/pto/l3_runtime.py new file mode 100644 index 000000000..f578837dd --- /dev/null +++ b/python/pto/l3_runtime.py @@ -0,0 +1,200 @@ +"""L3 (single-host, multi-chip) runtime implementation. + +Manages per-chip worker processes, builds/executes a task DAG from a +Python orchestration function, and routes tasks to ChipWorkers. +""" + +from __future__ import annotations + +import logging +import time +from typing import Any, Callable, Optional + +from .types import Arg, CompiledPackage, ParamType +from .l3_context import L3OrchestratorContext +from .l3_worker import ChipWorker +from .dag import TaskNode + +logger = logging.getLogger(__name__) + + +class L3Runtime: + """Multi-chip runtime for a single host. + + Lifecycle: + 1. ``__init__`` — configure platform and device list + 2. ``register()`` — register named computations (Python orch + L2 packages) + 3. ``run()`` — execute: call orch function → build DAG → dispatch to workers + 4. ``close()`` — shut down workers + """ + + def __init__(self, platform: str, devices: list[int]): + self._platform = platform + self._devices = list(devices) + self._kernel_registry: dict[str, CompiledPackage] = {} + self._orch_registry: dict[str, Callable] = {} + self._workers: dict[int, ChipWorker] = {} + self._started = False + + def register(self, name: str, *, + orch: Callable = None, + kernels: dict[str, CompiledPackage] = None, + **kwargs) -> None: + """Register a named multi-chip computation. + + Args: + name: Computation name + orch: Python orchestration function ``f(ctx, args)`` + kernels: Dict mapping kernel names to pre-compiled L2 packages + """ + if orch is not None: + self._orch_registry[name] = orch + if kernels: + self._kernel_registry.update(kernels) + + def run(self, name: str, args: Any = None) -> Any: + """Execute a registered multi-chip computation. + + Steps: + 1. Ensure workers are started + 2. Create L3OrchestratorContext + 3. Call the Python orch function — this populates the DAG + 4. Dispatch ready tasks to chip workers + 5. Wait for all tasks to complete + """ + if name not in self._orch_registry: + raise KeyError(f"No registered orchestration '{name}'. " + f"Available: {list(self._orch_registry.keys())}") + + self._ensure_workers() + + ctx = L3OrchestratorContext( + device_ids=self._devices, + kernel_registry=self._kernel_registry, + ) + + orch_func = self._orch_registry[name] + result = orch_func(ctx, args) + + self._execute_dag(ctx) + + return result + + def close(self) -> None: + """Shut down all worker processes.""" + for worker in self._workers.values(): + worker.stop() + for worker in self._workers.values(): + worker.join(timeout=10.0) + self._workers.clear() + self._started = False + self._kernel_registry.clear() + self._orch_registry.clear() + + def _ensure_workers(self) -> None: + """Start worker processes if not already running.""" + if self._started: + return + + for device_id in self._devices: + worker = ChipWorker(device_id=device_id) + worker.start() + self._workers[device_id] = worker + + self._started = True + logger.info(f"Started {len(self._workers)} chip workers: {self._devices}") + + def _execute_dag(self, ctx: L3OrchestratorContext) -> None: + """Dispatch all tasks in the DAG to workers and wait for completion.""" + dag = ctx.dag + + if dag.task_count == 0: + return + + in_flight: dict[int, int] = {} # task_id → chip + + ready = dag.get_ready_tasks() + for node in ready: + self._dispatch_task(node, ctx, in_flight) + + while not dag.all_complete(): + for device_id, worker in self._workers.items(): + result = worker.poll_result(timeout=0.01) + if result is None: + continue + + task_id, success, error = result + if not success: + raise RuntimeError( + f"Task {task_id} failed on chip {in_flight.get(task_id, '?')}: {error}") + + in_flight.pop(task_id, None) + + newly_ready = dag.complete(task_id) + for node in newly_ready: + self._dispatch_task(node, ctx, in_flight) + + def _dispatch_task(self, node: TaskNode, ctx: L3OrchestratorContext, + in_flight: dict[int, int]) -> None: + """Send a task to the appropriate chip worker.""" + pkg = ctx.get_kernel_package(node.kernel) + + func_args, arg_types, arg_sizes = _args_to_raw(node.args) + + task_spec = { + "host_binary": pkg.host_binary, + "orch_binary": pkg.orch_binary, + "orch_func": pkg.orch_func, + "func_args": func_args, + "arg_types": arg_types, + "arg_sizes": arg_sizes, + "kernel_binaries": pkg.kernel_binaries, + "aicpu_thread_num": pkg.aicpu_thread_num, + "block_dim": pkg.block_dim, + "aicpu_binary": pkg.aicpu_binary, + "aicore_binary": pkg.aicore_binary, + "orch_thread_num": pkg.orch_thread_num, + } + + if node.is_group: + for chip in node.group_chips: + worker = self._workers.get(chip) + if worker is None: + raise RuntimeError(f"No worker for chip {chip}") + worker.send_task(node.task_id, task_spec) + else: + worker = self._workers.get(node.chip) + if worker is None: + raise RuntimeError(f"No worker for chip {node.chip}") + worker.send_task(node.task_id, task_spec) + + ctx.dag.mark_dispatched(node.task_id) + in_flight[node.task_id] = node.chip + logger.debug(f"Dispatched task {node.task_id} (kernel={node.kernel}) to chip {node.chip}") + + +def _args_to_raw(args: list[Arg]) -> tuple[list, list[int], list[int]]: + """Convert Arg list to parallel arrays for the worker task spec.""" + import numpy as np + + func_args = [] + arg_types = [] + arg_sizes = [] + + for arg in args: + arg_types.append(int(arg.type)) + if arg.type == ParamType.SCALAR: + func_args.append(int(arg.data)) + arg_sizes.append(0) + else: + arr = arg.data + if isinstance(arr, np.ndarray): + if not arr.flags["C_CONTIGUOUS"]: + arr = np.ascontiguousarray(arr) + func_args.append(arr.ctypes.data) + arg_sizes.append(arr.nbytes) + else: + func_args.append(0) + arg_sizes.append(arg.size) + + return func_args, arg_types, arg_sizes diff --git a/python/pto/l3_worker.py b/python/pto/l3_worker.py new file mode 100644 index 000000000..e3bd4a0da --- /dev/null +++ b/python/pto/l3_worker.py @@ -0,0 +1,132 @@ +"""ChipWorker — child process that owns one NPU device. + +Each worker process binds to one device via set_device() (DeviceRunner +is a process-global singleton), then loops receiving task specs from the +main process and executing them via the existing bindings.py API. +""" + +from __future__ import annotations + +import logging +import multiprocessing +import pickle +import sys +import traceback +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + +# Ensure simpler's python/ is importable +_PYTHON_DIR = str(Path(__file__).resolve().parent.parent) + + +def _worker_main(device_id: int, artifacts_dir: str, + cmd_pipe: multiprocessing.connection.Connection, + result_pipe: multiprocessing.connection.Connection): + """Entry point for a worker process. Runs in a child process.""" + if _PYTHON_DIR not in sys.path: + sys.path.insert(0, _PYTHON_DIR) + + from bindings import bind_host_binary, set_device, launch_runtime + + # Load host binary and bind to device (once) + host_binary_path = Path(artifacts_dir) / "host_runtime.so" + if host_binary_path.exists(): + RuntimeClass = bind_host_binary(str(host_binary_path)) + else: + # Binary passed inline in first task + RuntimeClass = None + + set_device(device_id) + + while True: + try: + msg = cmd_pipe.recv() + except EOFError: + break + + if msg is None: # shutdown + break + + task_id, task_spec = msg + try: + # Ensure host binary is loaded + if RuntimeClass is None and "host_binary" in task_spec: + RuntimeClass = bind_host_binary(task_spec["host_binary"]) + + rt = RuntimeClass() + rt.initialize( + task_spec["orch_binary"], + task_spec["orch_func"], + task_spec["func_args"], + arg_types=task_spec["arg_types"], + arg_sizes=task_spec["arg_sizes"], + kernel_binaries=task_spec["kernel_binaries"], + ) + launch_runtime( + rt, + aicpu_thread_num=task_spec.get("aicpu_thread_num", 1), + block_dim=task_spec.get("block_dim", 1), + device_id=device_id, + aicpu_binary=task_spec["aicpu_binary"], + aicore_binary=task_spec["aicore_binary"], + orch_thread_num=task_spec.get("orch_thread_num", 1), + ) + rt.finalize() + result_pipe.send((task_id, True, None)) + except Exception as e: + result_pipe.send((task_id, False, traceback.format_exc())) + + +class ChipWorker: + """Manages a child process that owns one NPU device.""" + + def __init__(self, device_id: int, artifacts_dir: str = ""): + self.device_id = device_id + self.artifacts_dir = artifacts_dir + self._process: Optional[multiprocessing.Process] = None + self._cmd_pipe: Optional[multiprocessing.connection.Connection] = None + self._result_pipe: Optional[multiprocessing.connection.Connection] = None + + def start(self) -> None: + parent_cmd, child_cmd = multiprocessing.Pipe() + child_result, parent_result = multiprocessing.Pipe() + + self._cmd_pipe = parent_cmd + self._result_pipe = parent_result + + self._process = multiprocessing.Process( + target=_worker_main, + args=(self.device_id, self.artifacts_dir, child_cmd, child_result), + daemon=True, + ) + self._process.start() + + def send_task(self, task_id: int, task_spec: dict) -> None: + """Send a task to the worker. Non-blocking.""" + self._cmd_pipe.send((task_id, task_spec)) + + def poll_result(self, timeout: float = 0.01): + """Check for a completed result. Returns (task_id, success, error) or None.""" + if self._result_pipe.poll(timeout): + return self._result_pipe.recv() + return None + + def stop(self) -> None: + """Send shutdown signal.""" + if self._cmd_pipe: + try: + self._cmd_pipe.send(None) + except BrokenPipeError: + pass + + def join(self, timeout: float = 10.0) -> None: + if self._process: + self._process.join(timeout=timeout) + if self._process.is_alive(): + self._process.terminate() + + @property + def is_alive(self) -> bool: + return self._process is not None and self._process.is_alive() diff --git a/python/pto/runtime.py b/python/pto/runtime.py new file mode 100644 index 000000000..8a11b96af --- /dev/null +++ b/python/pto/runtime.py @@ -0,0 +1,75 @@ +"""Unified Runtime entry point. + +Routes by ``level`` parameter to the appropriate runtime implementation: + - ``"chip"`` → L2Runtime (single chip) + - ``"host"`` → L3Runtime (single host, multi chip) +""" + +from __future__ import annotations + +from typing import Any, Callable, Optional, Union + +from .types import Arg, CompiledPackage + + +class Runtime: + """Unified runtime interface. + + Usage:: + + # L2 — single chip + rt = pto.Runtime(level="chip", platform="a2a3", device=0) + rt.register("vector_add", orch="orch.cpp", kernels=[...]) + rt.run("vector_add", args=[Arg.input(x), Arg.output(y), Arg.scalar(n)]) + rt.close() + + # L3 — single host, multi chip + rt = pto.Runtime(level="host", platform="a2a3", devices=[0, 1, 2, 3]) + pkg = pto.compile(platform="a2a3", orch="orch.cpp", kernels=[...]) + rt.register("pipeline", orch=my_orch_func, kernels={"compute": pkg}) + rt.run("pipeline", args={"input": data}) + rt.close() + """ + + def __init__(self, level: str = "chip", **kwargs): + self._level = level.lower() + self._impl = _create_impl(self._level, **kwargs) + + @property + def level(self) -> str: + return self._level + + def register(self, name: str, **kwargs) -> None: + """Register a named computation.""" + self._impl.register(name, **kwargs) + + def run(self, name: str, args: Any = None) -> Any: + """Execute a registered computation.""" + return self._impl.run(name, args=args) + + def close(self) -> None: + """Release resources.""" + self._impl.close() + + def __enter__(self): + return self + + def __exit__(self, *exc): + self.close() + + +def _create_impl(level: str, **kwargs): + """Instantiate the right runtime backend for the given level.""" + if level == "chip": + from .l2_runtime import L2Runtime + platform = kwargs.get("platform", "a2a3") + device = kwargs.get("device", 0) + return L2Runtime(platform=platform, device=device) + + if level == "host": + from .l3_runtime import L3Runtime + platform = kwargs.get("platform", "a2a3") + devices = kwargs.get("devices", [0]) + return L3Runtime(platform=platform, devices=devices) + + raise ValueError(f"Unknown level '{level}'. Supported: 'chip', 'host'") diff --git a/python/pto/types.py b/python/pto/types.py new file mode 100644 index 000000000..967edfe15 --- /dev/null +++ b/python/pto/types.py @@ -0,0 +1,105 @@ +"""Core types for the unified PTO runtime.""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from enum import IntEnum +from typing import Any, Union + +import numpy as np + + +class ParamType(IntEnum): + SCALAR = 0 + INPUT = 1 + OUTPUT = 2 + INOUT = 3 + + +class Arg: + """Unified argument descriptor for runtime submit/run calls. + + Replaces simpler's split arrays (func_args, arg_types, arg_sizes). + """ + + __slots__ = ("type", "data", "size") + + def __init__(self, type: ParamType, data: Any, size: int = 0): + self.type = type + self.data = data + self.size = size + + # ---- Convenience constructors ---- + + @classmethod + def input(cls, data: np.ndarray) -> Arg: + return cls(ParamType.INPUT, data, data.nbytes) + + @classmethod + def output(cls, data: np.ndarray) -> Arg: + return cls(ParamType.OUTPUT, data, data.nbytes) + + @classmethod + def inout(cls, data: np.ndarray) -> Arg: + return cls(ParamType.INOUT, data, data.nbytes) + + @classmethod + def scalar(cls, value: Union[int, float]) -> Arg: + return cls(ParamType.SCALAR, value, 0) + + def __repr__(self) -> str: + if self.type == ParamType.SCALAR: + return f"Arg.scalar({self.data})" + type_name = self.type.name.lower() + shape = getattr(self.data, "shape", "?") + return f"Arg.{type_name}(shape={shape}, size={self.size})" + + +@dataclass +class TensorHandle: + """Opaque handle for a host-memory tensor managed by the runtime.""" + + id: int + shape: tuple + dtype: str + size: int # bytes + _data: np.ndarray = field(repr=False, default=None) + + @property + def data(self) -> np.ndarray: + return self._data + + def numpy(self) -> np.ndarray: + if self._data is None: + raise ValueError("TensorHandle has no backing data") + return self._data + + +@dataclass +class KernelSource: + """Describes a single kernel source file to compile.""" + + source: str + core_type: str = "aiv" + func_id: int = 0 + + +@dataclass +class CompiledPackage: + """A fully compiled L2 execution package. + + Contains all binaries needed to run a single L2 invocation: + runtime binaries + orchestration .so + kernel binaries + config. + """ + + platform: str + runtime_name: str + host_binary: bytes + aicpu_binary: bytes + aicore_binary: bytes + orch_binary: bytes + orch_func: str + kernel_binaries: list = field(default_factory=list) # [(func_id, bytes)] + block_dim: int = 1 + aicpu_thread_num: int = 1 + orch_thread_num: int = 1 diff --git a/python/toolchain.py b/python/toolchain.py index d6bc795be..185eb6093 100644 --- a/python/toolchain.py +++ b/python/toolchain.py @@ -102,6 +102,71 @@ def get_cmake_args(self) -> List[str]: ] +class AscendCToolchain(CCECToolchain): + """ccec compiler configured for AscendC kernel compilation. + + AscendC uses a different dialect (--cce-aicore-lang) than PTO-ISA (-x cce), + plus automatic synchronization (--cce-auto-sync) and the full AscendC SDK + include tree. + """ + + def get_compile_flags(self, core_type: str = "aiv", **kwargs) -> List[str]: + if self.platform in ("a5", "a5sim"): + arch = "dav-c310-vec" if core_type == "aiv" else "dav-c310-cube" + elif self.platform in ("a2a3", "a2a3sim"): + arch = "dav-c220-vec" if core_type == "aiv" else "dav-c220-cube" + else: + raise ValueError(f"Unknown platform: {self.platform}") + + flags = [ + "-c", "-O3", "-std=c++17", + "--cce-aicore-lang", + "-DTILING_KEY_VAR=0", + "--cce-aicore-only", + f"--cce-aicore-arch={arch}", + "--cce-auto-sync", + "-mllvm", "-cce-aicore-stack-size=0x8000", + "-mllvm", "-cce-aicore-function-stack-size=0x8000", + "-mllvm", "-cce-aicore-record-overflow=false", + ] + + # Force-include SDK version header if present + version_header = os.path.join( + self.ascend_home_path, "include", "ascendc", "asc_devkit_version.h" + ) + if os.path.isfile(version_header): + flags.extend(["-include", version_header]) + + return flags + + def get_ascendc_include_dirs(self) -> List[str]: + """Return AscendC SDK include directories (from bisheng_intf.cmake).""" + dk = os.path.join(self.ascend_home_path, "aarch64-linux") + asc = os.path.join(dk, "asc") + tikcfw = os.path.join(dk, "tikcpp", "tikcfw") + + dirs = [ + os.path.join(asc, "impl", "adv_api"), + os.path.join(asc, "impl", "basic_api"), + os.path.join(asc, "impl", "c_api"), + os.path.join(asc, "impl", "micro_api"), + os.path.join(asc, "impl", "simt_api"), + os.path.join(asc, "impl", "utils"), + os.path.join(asc, "include"), + os.path.join(asc, "include", "adv_api"), + os.path.join(asc, "include", "basic_api"), + os.path.join(asc, "include", "aicpu_api"), + os.path.join(asc, "include", "c_api"), + os.path.join(asc, "include", "micro_api"), + os.path.join(asc, "include", "simt_api"), + os.path.join(asc, "include", "utils"), + tikcfw, + os.path.join(tikcfw, "interface"), + os.path.join(tikcfw, "impl"), + ] + return [d for d in dirs if os.path.isdir(d)] + + class Gxx15Toolchain(Toolchain): """g++-15 compiler for simulation kernels.""" diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/README.md b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/README.md new file mode 100644 index 000000000..66d0c1d94 --- /dev/null +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/README.md @@ -0,0 +1,96 @@ +# AscendC Vector Example + +Demonstrates integrating an AscendC operator into the PTO tensormap_and_ringbuffer runtime. + +## Overview + +This device test runs two tasks on a single blockdim (2 AIV cores): + +1. **t0**: `z = add_custom(x, y)` -- AscendC AddCustom operator (func_id=0) +2. **t1**: `w = mul(z, z)` -- PTO-native kernel_mul (func_id=1) + +The AscendC kernel source is compiled directly by simpler's `AscendCCompiler` using the +CANN SDK's ccec compiler with `--cce-aicore-lang` flags, then linked with a generated PTO +wrapper to match PTO's `kernel_entry(int64_t* args)` convention. + +## Prerequisites + +- Ascend CANN SDK with ccec compiler (hardware platform only, no sim support) + +## How to Run + +```bash +python examples/scripts/run_example.py \ + -k tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels \ + -g tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/golden.py \ + -p a2a3 -d +``` + +Or via CI (device tests only): +```bash +./ci.sh -p a2a3 -d +``` + +## Architecture + +``` + AscendC source (.cpp) PTO wrapper (.cpp) + uses kernel_operator.h uses tensor.h + │ │ + ▼ ▼ + ccec --cce-aicore-lang ccec -x cce (PTO-ISA) + (AscendC toolchain flags) (PTO toolchain flags) + │ │ + ▼ ▼ + kernel.o wrapper.o + │ │ + └──────────┬─────────────────────────┘ + ▼ + ld.lld -r wrapper.o kernel.o + │ + ▼ + combined.o + │ + ▼ + extract_text_section + │ + ▼ + kernel binary + │ + ┌──────────────────▼───────────────────────────┐ + │ PTO Runtime │ + │ kernel_binaries = [(0, ascendc_bin), │ + │ (1, mul_bin)] │ + │ │ + │ Orchestration: │ + │ pto2_rt_submit_aiv_task(rt, 0, params) │ + │ pto2_rt_submit_aiv_task(rt, 1, params) │ + └───────────────────────────────────────────────┘ +``` + +Key: wrapper.o is listed first in the link command so `kernel_entry` lands +at `.text` offset 0 (PTO dispatch jumps to offset 0). + +## Configuration + +- `block_dim = 1`: Single blockdim (1 AIC + 2 AIV). Only AIV used. +- `aicpu_thread_num = 3`: 2 schedulers + 1 orchestrator. +- `compiler = "ascendc"`: Tells CodeRunner to use AscendCCompiler for this kernel. + +## kernel_config.py Schema for AscendC Kernels + +```python +{ + "func_id": 0, + "source": "path/to/ascendc_kernel.cpp", + "core_type": "aiv", + "compiler": "ascendc", + "ascendc_symbol": "add_custom", # extern "C" symbol name + "tensor_args": [ # ordered tensor descriptors + {"name": "x", "direction": "input"}, + {"name": "y", "direction": "input"}, + {"name": "z", "direction": "output"}, + ], + "has_workspace": False, # workspace pointer (optional) +} +``` diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/golden.py b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/golden.py new file mode 100644 index 000000000..fe1aa700a --- /dev/null +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/golden.py @@ -0,0 +1,52 @@ +""" +Golden script for AscendC vector example. + +Computation: + z = x + y (AscendC AddCustom) + w = z * z (PTO kernel_mul) + +With x=2.0, y=3.0: + z = 5.0 + w = 25.0 + +Args layout: [ptr_x, ptr_y, ptr_z, ptr_w, size_x, size_y, size_z, size_w, SIZE] +""" + +import ctypes +import torch + +__outputs__ = ["z", "w"] + +RTOL = 1e-5 +ATOL = 1e-5 + + +def generate_inputs(params: dict) -> list: + ROWS = 128 + COLS = 128 + SIZE = ROWS * COLS + + x = torch.full((SIZE,), 2.0, dtype=torch.float32) + y = torch.full((SIZE,), 3.0, dtype=torch.float32) + z = torch.zeros(SIZE, dtype=torch.float32) + w = torch.zeros(SIZE, dtype=torch.float32) + + return [ + ("x", x), + ("y", y), + ("z", z), + ("w", w), + ("size_x", ctypes.c_int64(x.nbytes)), + ("size_y", ctypes.c_int64(y.nbytes)), + ("size_z", ctypes.c_int64(z.nbytes)), + ("size_w", ctypes.c_int64(w.nbytes)), + ("SIZE", ctypes.c_int64(SIZE)), + ] + + +def compute_golden(tensors: dict, params: dict) -> None: + x = torch.as_tensor(tensors["x"]) + y = torch.as_tensor(tensors["y"]) + z_val = x + y + tensors["z"][:] = z_val + tensors["w"][:] = z_val * z_val diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/aiv/kernel_mul.cpp b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/aiv/kernel_mul.cpp new file mode 100644 index 000000000..51d81036e --- /dev/null +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/aiv/kernel_mul.cpp @@ -0,0 +1,79 @@ +/** + * Element-wise Tensor Multiplication Kernel + * + * Implements: out[i] = src0[i] * src1[i] + * + * This kernel performs element-wise multiplication of two tensors. It's + * compiled separately as a standalone kernel and linked with the dispatcher + * using function pointers, demonstrating the separation pattern used in + * production systems where kernel binaries are loaded dynamically. + */ + +#include +#include + +#include "tensor.h" + +using namespace pto; + +#ifndef __gm__ +#define __gm__ +#endif + +#ifndef __aicore__ +#define __aicore__ [aicore] +#endif + +/** + * Element-wise multiplication kernel implementation + * + * Unified signature: all arguments passed via int64_t array + * @param args Argument array: + * args[0] = src0 pointer (first input tensor) + * args[1] = src1 pointer (second input tensor) + * args[2] = out pointer (output tensor) + * args[3] = size (number of elements) + */ +extern "C" __aicore__ __attribute__((always_inline)) void kernel_entry(__gm__ int64_t* args) { + // Unpack arguments (Tensor* pointers from runtime) + __gm__ Tensor* src0_tensor = reinterpret_cast<__gm__ Tensor*>(args[0]); + __gm__ Tensor* src1_tensor = reinterpret_cast<__gm__ Tensor*>(args[1]); + __gm__ Tensor* out_tensor = reinterpret_cast<__gm__ Tensor*>(args[2]); + __gm__ float* src0 = reinterpret_cast<__gm__ float*>(src0_tensor->buffer.addr) + src0_tensor->start_offset; + __gm__ float* src1 = reinterpret_cast<__gm__ float*>(src1_tensor->buffer.addr) + src1_tensor->start_offset; + __gm__ float* out = reinterpret_cast<__gm__ float*>(out_tensor->buffer.addr) + out_tensor->start_offset; + + // Configuration: float, 128, 128, 128, 128 + constexpr int kTRows_ = 128; + constexpr int kTCols_ = 128; + constexpr int vRows = 128; + constexpr int vCols = 128; + + using DynShapeDim5 = Shape<1, 1, 1, vRows, vCols>; + using DynStridDim5 = Stride<1, 1, 1, kTCols_, 1>; + using GlobalData = GlobalTensor; + using TileData = Tile; + + TileData src0Tile(vRows, vCols); + TileData src1Tile(vRows, vCols); + TileData dstTile(vRows, vCols); + TASSIGN(src0Tile, 0x0); + TASSIGN(src1Tile, 0x10000); + TASSIGN(dstTile, 0x20000); + + GlobalData src0Global(src0); + GlobalData src1Global(src1); + GlobalData dstGlobal(out); + + TLOAD(src0Tile, src0Global); + TLOAD(src1Tile, src1Global); + set_flag(PIPE_MTE2, PIPE_V, EVENT_ID0); + wait_flag(PIPE_MTE2, PIPE_V, EVENT_ID0); + TMUL(dstTile, src0Tile, src1Tile); + set_flag(PIPE_V, PIPE_MTE3, EVENT_ID0); + wait_flag(PIPE_V, PIPE_MTE3, EVENT_ID0); + TSTORE(dstGlobal, dstTile); + + set_flag(PIPE_MTE3, PIPE_S, EVENT_ID7); + wait_flag(PIPE_MTE3, PIPE_S, EVENT_ID7); +} diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.cpp b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.cpp new file mode 100644 index 000000000..11026905f --- /dev/null +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.cpp @@ -0,0 +1,117 @@ +/** + * AscendC AddCustom Operator — Reference Source + * + * This file documents the AscendC kernel source used for PTO integration. + * The actual .o is built by build_add_custom.sh. + * + * PTO integration requires two adaptations from standard AscendC kernels: + * + * 1. NO __global__ attribute — PTO's dispatch loop calls kernels as + * subroutines; __global__ generates a prologue that resets AICore + * state and never returns, causing a hang. + * + * 2. NO GetBlockNum()/GetBlockIdx() partitioning — PTO dispatches each + * task to a single core, so the kernel must process all data. + * (Framework-launched AscendC launches across all blocks simultaneously.) + * + * Calling convention: + * extern "C" __aicore__ void add_custom( + * GM_ADDR x, GM_ADDR y, GM_ADDR z, + * GM_ADDR workspace, GM_ADDR tiling); + * + * The PTO wrapper (generated by AscendCCompiler) provides kernel_entry() + * which unpacks Tensor* args and forwards raw GM addresses to add_custom(). + * + * Static tiling: totalLength=16384 and tileNum=8 are hardcoded as constexpr. + * No runtime tiling pointer is needed (workspace and tiling are unused). + */ + +// ============================================================================ +// The actual source compiled by build_add_custom.sh (reproduced here for +// reference — not compiled by PTO). +// ============================================================================ + +#if 0 // Not compiled — reference only + +#include "kernel_operator.h" + +using namespace AscendC; +constexpr int32_t BUFFER_NUM = 2; + +class KernelAdd { +public: + __aicore__ inline KernelAdd() {} + __aicore__ inline void Init(GM_ADDR x, GM_ADDR y, GM_ADDR z, + uint32_t totalLength, uint32_t tileNum) + { + // Process ALL data — no GetBlockNum() division (PTO single-core dispatch) + this->blockLength = totalLength; + this->tileNum = tileNum; + this->tileLength = this->blockLength / tileNum / BUFFER_NUM; + xGm.SetGlobalBuffer((__gm__ DTYPE_X*)x, this->blockLength); + yGm.SetGlobalBuffer((__gm__ DTYPE_Y*)y, this->blockLength); + zGm.SetGlobalBuffer((__gm__ DTYPE_Z*)z, this->blockLength); + pipe.InitBuffer(inQueueX, BUFFER_NUM, this->tileLength * sizeof(DTYPE_X)); + pipe.InitBuffer(inQueueY, BUFFER_NUM, this->tileLength * sizeof(DTYPE_Y)); + pipe.InitBuffer(outQueueZ, BUFFER_NUM, this->tileLength * sizeof(DTYPE_Z)); + } + __aicore__ inline void Process() + { + int32_t loopCount = this->tileNum * BUFFER_NUM; + for (int32_t i = 0; i < loopCount; i++) { + CopyIn(i); + Compute(i); + CopyOut(i); + } + } +private: + __aicore__ inline void CopyIn(int32_t progress) + { + LocalTensor xLocal = inQueueX.AllocTensor(); + LocalTensor yLocal = inQueueY.AllocTensor(); + DataCopy(xLocal, xGm[progress * this->tileLength], this->tileLength); + DataCopy(yLocal, yGm[progress * this->tileLength], this->tileLength); + inQueueX.EnQue(xLocal); + inQueueY.EnQue(yLocal); + } + __aicore__ inline void Compute(int32_t progress) + { + LocalTensor xLocal = inQueueX.DeQue(); + LocalTensor yLocal = inQueueY.DeQue(); + LocalTensor zLocal = outQueueZ.AllocTensor(); + Add(zLocal, xLocal, yLocal, this->tileLength); + outQueueZ.EnQue(zLocal); + inQueueX.FreeTensor(xLocal); + inQueueY.FreeTensor(yLocal); + } + __aicore__ inline void CopyOut(int32_t progress) + { + LocalTensor zLocal = outQueueZ.DeQue(); + DataCopy(zGm[progress * this->tileLength], zLocal, this->tileLength); + outQueueZ.FreeTensor(zLocal); + } +private: + TPipe pipe; + TQue inQueueX, inQueueY; + TQue outQueueZ; + GlobalTensor xGm; + GlobalTensor yGm; + GlobalTensor zGm; + uint32_t blockLength; + uint32_t tileNum; + uint32_t tileLength; +}; + +// No __global__ — called as subroutine from PTO wrapper's kernel_entry +extern "C" __aicore__ void add_custom( + GM_ADDR x, GM_ADDR y, GM_ADDR z, + GM_ADDR workspace, GM_ADDR tiling) +{ + constexpr uint32_t totalLength = 16384; + constexpr uint32_t tileNum = 8; + KernelAdd op; + op.Init(x, y, z, totalLength, tileNum); + op.Process(); +} + +#endif diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.o b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.o new file mode 100644 index 000000000..218e54e65 Binary files /dev/null and b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/add_custom.o differ diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/build_add_custom.sh b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/build_add_custom.sh new file mode 100755 index 000000000..a6b900d58 --- /dev/null +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/ascendc/build_add_custom.sh @@ -0,0 +1,169 @@ +#!/bin/bash +# Compile AddCustom AscendC kernel for PTO runtime integration +# +# Based on: https://gitee.com/ascend/samples/tree/master/operator/ascendc/0_introduction/1_add_frameworklaunch/AddCustom +# +# Prerequisites: +# - CANN toolkit installed (ASCEND_HOME_PATH set) +# - ccec compiler available +# +# PTO integration requirements (compared to framework-launched AscendC): +# 1. NO __global__ attribute — PTO calls kernels as subroutines from its +# dispatch loop; __global__ generates an incompatible prologue/epilogue +# 2. NO GetBlockNum()/GetBlockIdx() partitioning — PTO dispatches each task +# to a single core, so the kernel must process all data +# 3. Static tiling — tiling values hardcoded as constexpr (no runtime tiling) +# +# Usage: +# bash build_add_custom.sh [--dtype float|half] [--output add_custom.o] + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +ASCEND_HOME="${ASCEND_HOME_PATH:-/usr/local/Ascend/cann-8.5.0}" +CCEC="${ASCEND_HOME}/bin/ccec" +BUILD_DIR="${SCRIPT_DIR}/_build" + +# Defaults +DTYPE="float" +OUTPUT="${SCRIPT_DIR}/add_custom.o" + +# Parse args +while [[ $# -gt 0 ]]; do + case $1 in + --dtype) DTYPE="$2"; shift 2 ;; + --output) OUTPUT="$2"; shift 2 ;; + *) echo "Unknown arg: $1"; exit 1 ;; + esac +done + +if [ ! -f "$CCEC" ]; then + echo "Error: ccec not found at $CCEC" + echo "Set ASCEND_HOME_PATH to your CANN installation" + exit 1 +fi + +mkdir -p "$BUILD_DIR" + +# Generate PTO-compatible kernel source +# Key differences from the Gitee original: +# - extern "C" __aicore__ (no __global__) +# - blockLength = totalLength (no GetBlockNum() division) +# - no GetBlockIdx() offset on GlobalTensor +cat > "$BUILD_DIR/add_custom.cpp" << 'SRC_EOF' +#include "kernel_operator.h" + +using namespace AscendC; +constexpr int32_t BUFFER_NUM = 2; + +class KernelAdd { +public: + __aicore__ inline KernelAdd() {} + __aicore__ inline void Init(GM_ADDR x, GM_ADDR y, GM_ADDR z, + uint32_t totalLength, uint32_t tileNum) + { + this->blockLength = totalLength; + this->tileNum = tileNum; + this->tileLength = this->blockLength / tileNum / BUFFER_NUM; + xGm.SetGlobalBuffer((__gm__ DTYPE_X*)x, this->blockLength); + yGm.SetGlobalBuffer((__gm__ DTYPE_Y*)y, this->blockLength); + zGm.SetGlobalBuffer((__gm__ DTYPE_Z*)z, this->blockLength); + pipe.InitBuffer(inQueueX, BUFFER_NUM, this->tileLength * sizeof(DTYPE_X)); + pipe.InitBuffer(inQueueY, BUFFER_NUM, this->tileLength * sizeof(DTYPE_Y)); + pipe.InitBuffer(outQueueZ, BUFFER_NUM, this->tileLength * sizeof(DTYPE_Z)); + } + __aicore__ inline void Process() + { + int32_t loopCount = this->tileNum * BUFFER_NUM; + for (int32_t i = 0; i < loopCount; i++) { + CopyIn(i); + Compute(i); + CopyOut(i); + } + } +private: + __aicore__ inline void CopyIn(int32_t progress) + { + LocalTensor xLocal = inQueueX.AllocTensor(); + LocalTensor yLocal = inQueueY.AllocTensor(); + DataCopy(xLocal, xGm[progress * this->tileLength], this->tileLength); + DataCopy(yLocal, yGm[progress * this->tileLength], this->tileLength); + inQueueX.EnQue(xLocal); + inQueueY.EnQue(yLocal); + } + __aicore__ inline void Compute(int32_t progress) + { + LocalTensor xLocal = inQueueX.DeQue(); + LocalTensor yLocal = inQueueY.DeQue(); + LocalTensor zLocal = outQueueZ.AllocTensor(); + Add(zLocal, xLocal, yLocal, this->tileLength); + outQueueZ.EnQue(zLocal); + inQueueX.FreeTensor(xLocal); + inQueueY.FreeTensor(yLocal); + } + __aicore__ inline void CopyOut(int32_t progress) + { + LocalTensor zLocal = outQueueZ.DeQue(); + DataCopy(zGm[progress * this->tileLength], zLocal, this->tileLength); + outQueueZ.FreeTensor(zLocal); + } +private: + TPipe pipe; + TQue inQueueX, inQueueY; + TQue outQueueZ; + GlobalTensor xGm; + GlobalTensor yGm; + GlobalTensor zGm; + uint32_t blockLength; + uint32_t tileNum; + uint32_t tileLength; +}; + +// No __global__ — called as subroutine from PTO wrapper's kernel_entry +extern "C" __aicore__ void add_custom( + GM_ADDR x, GM_ADDR y, GM_ADDR z, + GM_ADDR workspace, GM_ADDR tiling) +{ + constexpr uint32_t totalLength = 16384; + constexpr uint32_t tileNum = 8; + KernelAdd op; + op.Init(x, y, z, totalLength, tileNum); + op.Process(); +} +SRC_EOF + +# AscendC include directories +ASC_ROOT="${ASCEND_HOME}/aarch64-linux/asc" +TIKCPP_ROOT="${ASCEND_HOME}/aarch64-linux/tikcpp" + +echo "Compiling add_custom.cpp (dtype=$DTYPE)..." +"$CCEC" \ + -c -O3 -std=c++17 \ + --cce-aicore-lang \ + -DTILING_KEY_VAR=0 \ + --cce-aicore-only \ + --cce-aicore-arch=dav-c220-vec \ + --cce-auto-sync \ + -mllvm -cce-aicore-stack-size=0x8000 \ + -mllvm -cce-aicore-function-stack-size=0x8000 \ + -DDTYPE_X=$DTYPE -DDTYPE_Y=$DTYPE -DDTYPE_Z=$DTYPE \ + -I"${ASC_ROOT}/include" \ + -I"${ASC_ROOT}/include/basic_api" \ + -I"${ASC_ROOT}/include/adv_api" \ + -I"${ASC_ROOT}/include/c_api" \ + -I"${ASC_ROOT}/include/utils" \ + -I"${ASC_ROOT}/impl/adv_api" \ + -I"${ASC_ROOT}/impl/basic_api" \ + -I"${ASC_ROOT}/impl/c_api" \ + -I"${ASC_ROOT}/impl/micro_api" \ + -I"${ASC_ROOT}/impl/simt_api" \ + -I"${ASC_ROOT}/impl/utils" \ + -I"${TIKCPP_ROOT}/tikcfw" \ + -I"${TIKCPP_ROOT}/tikcfw/interface" \ + -I"${TIKCPP_ROOT}/tikcfw/impl" \ + -include "${ASCEND_HOME}/include/ascendc/asc_devkit_version.h" \ + -o "$OUTPUT" \ + "$BUILD_DIR/add_custom.cpp" + +echo "Output: $OUTPUT ($(wc -c < "$OUTPUT") bytes)" +echo "Done." diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/kernel_config.py b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/kernel_config.py new file mode 100644 index 000000000..ef0aafafb --- /dev/null +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/kernel_config.py @@ -0,0 +1,59 @@ +""" +AscendC Vector Example — kernel_config.py + +Demonstrates integrating a pre-compiled AscendC operator (add_custom) into the +PTO tensormap_and_ringbuffer runtime via wrapper generation + link. + +The .o is compiled with AscendC toolchain (ccec --cce-aicore-lang) but adapted +for PTO dispatch: + - No __global__ attribute (causes hang under PTO subroutine dispatch) + - No GetBlockNum()/GetBlockIdx() partitioning (PTO dispatches to single cores) + - Static tiling (constexpr values, no runtime tiling pointer) + +Simpler generates a PTO wrapper (kernel_entry), compiles it with PTO flags +(-x cce), and links it with the kernel .o so kernel_entry sits at .text offset 0. + +Computation: + z = x + y (AscendC add_custom, func_id=0) + w = z * z (PTO kernel_mul, func_id=1) + +The orchestration submits two AIV tasks in sequence. +""" + +from pathlib import Path + +_KERNELS_ROOT = Path(__file__).parent + +ORCHESTRATION = { + "source": str(_KERNELS_ROOT / "orchestration" / "ascendc_orch.cpp"), + "function_name": "aicpu_orchestration_entry", +} + +KERNELS = [ + { + "func_id": 0, + # Pre-compiled AscendC kernel .o with static tiling degeneration + # (tiling values baked in at compile time, no runtime tiling needed) + "source": str(_KERNELS_ROOT / "ascendc" / "add_custom.o"), + "core_type": "aiv", + "compiler": "ascendc", + "ascendc_symbol": "add_custom", + "tensor_args": [ + {"name": "x", "direction": "input"}, + {"name": "y", "direction": "input"}, + {"name": "z", "direction": "output"}, + ], + "has_workspace": True, + }, + { + "func_id": 1, + "source": str(_KERNELS_ROOT / "aiv" / "kernel_mul.cpp"), + "core_type": "aiv", + }, +] + +RUNTIME_CONFIG = { + "runtime": "tensormap_and_ringbuffer", + "aicpu_thread_num": 3, + "block_dim": 2, +} diff --git a/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/orchestration/ascendc_orch.cpp b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/orchestration/ascendc_orch.cpp new file mode 100644 index 000000000..fb4941860 --- /dev/null +++ b/tests/device_tests/a2a3/tensormap_and_ringbuffer/ascendc_vector_example/kernels/orchestration/ascendc_orch.cpp @@ -0,0 +1,89 @@ +/** + * AscendC Vector Example — Device Orchestration + * + * Demonstrates calling an AscendC kernel (AddCustom, func_id=0) and a + * PTO-native kernel (kernel_mul, func_id=1) from the same orchestration. + * + * DAG: + * t0: z = add_custom(x, y) (AscendC, func_id=0) + * t1: w = mul(z, z) (PTO, func_id=1) + * Dependencies: t0 -> t1 + * + * Both kernels run as AIV (vector) tasks on a single blockdim (2V). + * + * Args layout (from golden.py): + * [ptr_x, ptr_y, ptr_z, ptr_w, size_x, size_y, size_z, size_w, SIZE] + */ + +#include +#include + +#include "pto_orchestration_api.h" + +#define FUNC_ADD_CUSTOM 0 +#define FUNC_MUL 1 + +#define ARG_PTR_X 0 +#define ARG_PTR_Y 1 +#define ARG_PTR_Z 2 +#define ARG_PTR_W 3 +#define ARG_SIZE_X 4 +#define ARG_SIZE_Y 5 +#define ARG_SIZE_Z 6 +#define ARG_SIZE_W 7 +#define ARG_SIZE 8 + +extern "C" { + +__attribute__((visibility("default"))) +PTO2OrchestrationConfig aicpu_orchestration_config(uint64_t* args, int arg_count) { + (void)args; + (void)arg_count; + return PTO2OrchestrationConfig{ + .expected_arg_count = 9, + }; +} + +__attribute__((visibility("default"))) +void aicpu_orchestration_entry(PTO2Runtime* rt, uint64_t* args, int arg_count, + int orch_thread_num, int orch_thread_index) { + (void)arg_count; + (void)orch_thread_num; + (void)orch_thread_index; + + void* ptr_x = (void*)(uintptr_t)args[ARG_PTR_X]; + void* ptr_y = (void*)(uintptr_t)args[ARG_PTR_Y]; + void* ptr_z = (void*)(uintptr_t)args[ARG_PTR_Z]; + void* ptr_w = (void*)(uintptr_t)args[ARG_PTR_W]; + int SIZE = (int)(args[ARG_SIZE] & 0x7FFFFFFF); + + LOG_INFO(rt, "[ascendc_orch] SIZE=%d", SIZE); + + uint32_t shapes[1] = {(uint32_t)SIZE}; + Tensor ext_x = make_tensor_external(ptr_x, shapes, 1, DataType::FLOAT32); + Tensor ext_y = make_tensor_external(ptr_y, shapes, 1, DataType::FLOAT32); + Tensor ext_z = make_tensor_external(ptr_z, shapes, 1, DataType::FLOAT32); + Tensor ext_w = make_tensor_external(ptr_w, shapes, 1, DataType::FLOAT32); + + // t0: z = add_custom(x, y) — AscendC kernel via PTO dispatch + { + PTOParam params_t0; + params_t0.add_input(ext_x); + params_t0.add_input(ext_y); + params_t0.add_output(ext_z); + pto2_rt_submit_aiv_task(rt, FUNC_ADD_CUSTOM, params_t0); + } + + // t1: w = mul(z, z) — PTO-native kernel + { + PTOParam params_t1; + params_t1.add_input(ext_z); + params_t1.add_input(ext_z); + params_t1.add_output(ext_w); + pto2_rt_submit_aiv_task(rt, FUNC_MUL, params_t1); + } + + LOG_INFO(rt, "[ascendc_orch] Submitted 2 tasks"); +} + +} // extern "C" diff --git a/tests/test_ascendc_compiler.py b/tests/test_ascendc_compiler.py new file mode 100644 index 000000000..434bbcf88 --- /dev/null +++ b/tests/test_ascendc_compiler.py @@ -0,0 +1,307 @@ +"""Tests for AscendC compiler module (python/ascendc_compiler.py).""" + +import sys +from pathlib import Path + +import pytest + +PROJECT_ROOT = Path(__file__).parent.parent +sys.path.insert(0, str(PROJECT_ROOT / "python")) + + +class TestGenerateWrapperSource: + """Test wrapper C++ source generation.""" + + def _gen(self, **kwargs): + from ascendc_compiler import generate_wrapper_source + return generate_wrapper_source(**kwargs) + + def test_basic_three_tensors(self): + """Generate wrapper for a simple 3-tensor AscendC kernel (x, y, z).""" + src = self._gen( + ascendc_kernel_symbol="add_custom", + tensor_args=[ + {"name": "x", "direction": "input"}, + {"name": "y", "direction": "input"}, + {"name": "z", "direction": "output"}, + ], + ) + assert "add_custom(" in src + assert "kernel_entry" in src + assert "t_x" in src and "t_y" in src and "t_z" in src + assert "gm_x" in src and "gm_y" in src and "gm_z" in src + assert "nullptr" in src + + def test_forward_declaration(self): + """Forward declares AscendC kernel symbol.""" + src = self._gen( + ascendc_kernel_symbol="add_custom", + tensor_args=[ + {"name": "x", "direction": "input"}, + {"name": "y", "direction": "input"}, + {"name": "z", "direction": "output"}, + ], + ) + fwd_lines = [l for l in src.splitlines() + if "extern" in l and "add_custom" in l] + assert len(fwd_lines) == 1 + assert "uint8_t*" in fwd_lines[0] + + def test_with_static_tiling_data(self): + """Tiling data bytes are embedded as a const array.""" + tiling = bytes([0x10, 0x27, 0x00, 0x00]) + src = self._gen( + ascendc_kernel_symbol="my_op", + tensor_args=[ + {"name": "a", "direction": "input"}, + {"name": "b", "direction": "output"}, + ], + tiling_data=tiling, + ) + assert "TILING_DATA[4]" in src + assert "0x10" in src and "0x27" in src + assert "(uint8_t*)TILING_DATA" in src + call_lines = [l for l in src.splitlines() + if "my_op(" in l and "extern" not in l] + assert len(call_lines) == 1 + assert "TILING_DATA" in call_lines[0] + + def test_with_workspace(self): + """Workspace parameter adds an extra nullptr in the call.""" + src = self._gen( + ascendc_kernel_symbol="op", + tensor_args=[{"name": "a", "direction": "input"}], + has_workspace=True, + ) + # Call should have: gm_a, nullptr (workspace), nullptr (tiling) + call_line = [l for l in src.splitlines() if "op(" in l and "extern" not in l][0] + assert call_line.count("nullptr") == 2 + + def test_empty_tiling_treated_as_none(self): + """Empty tiling bytes => no TILING_DATA array, nullptr passed.""" + src = self._gen( + ascendc_kernel_symbol="op", + tensor_args=[{"name": "a", "direction": "input"}], + tiling_data=b"", + ) + assert "TILING_DATA" not in src + assert "nullptr" in src + + def test_includes_tensor_header(self): + """Wrapper source includes tensor.h for Tensor struct access.""" + src = self._gen( + ascendc_kernel_symbol="op", + tensor_args=[{"name": "a", "direction": "input"}], + ) + assert '#include "tensor.h"' in src + + def test_correct_arg_indexing(self): + """Each tensor is unpacked from the correct args index.""" + src = self._gen( + ascendc_kernel_symbol="op", + tensor_args=[ + {"name": "a", "direction": "input"}, + {"name": "b", "direction": "input"}, + {"name": "c", "direction": "input"}, + {"name": "d", "direction": "output"}, + ], + ) + assert "args[0]" in src + assert "args[1]" in src + assert "args[2]" in src + assert "args[3]" in src + + def test_large_tiling_data_formatting(self): + """Large tiling data is formatted correctly across multiple lines.""" + tiling = bytes(range(48)) + src = self._gen( + ascendc_kernel_symbol="op", + tensor_args=[{"name": "a", "direction": "input"}], + tiling_data=tiling, + ) + assert "TILING_DATA[48]" in src + assert "0x00" in src + assert "0x2f" in src # last byte = 47 = 0x2f + + def test_no_ascendc_headers(self): + """Wrapper does NOT include kernel_operator.h (PTO code only).""" + src = self._gen( + ascendc_kernel_symbol="op", + tensor_args=[{"name": "a", "direction": "input"}], + ) + assert "kernel_operator.h" not in src + + +class TestExtractKernelArtifacts: + """Test extraction of .o and tiling data from kernel_meta directory.""" + + def test_extracts_o_file(self, tmp_path): + """Finds and reads a .o file from kernel_meta.""" + from ascendc_compiler import extract_kernel_artifacts + + meta_dir = tmp_path / "kernel_meta" + meta_dir.mkdir() + (meta_dir / "kernel.o").write_bytes(b"\x7fELFfake") + + kernel_o, tiling = extract_kernel_artifacts(str(meta_dir)) + assert kernel_o == b"\x7fELFfake" + + def test_extracts_tiling_bin(self, tmp_path): + """Finds and reads a tiling .bin file.""" + from ascendc_compiler import extract_kernel_artifacts + + meta_dir = tmp_path / "kernel_meta" + meta_dir.mkdir() + (meta_dir / "kernel.o").write_bytes(b"obj") + (meta_dir / "tiling_data.bin").write_bytes(b"\x01\x02\x03") + + kernel_o, tiling = extract_kernel_artifacts(str(meta_dir)) + assert tiling == b"\x01\x02\x03" + + def test_extracts_tiling_from_json(self, tmp_path): + """Falls back to extracting tiling data from JSON metadata.""" + import json + from ascendc_compiler import extract_kernel_artifacts + + meta_dir = tmp_path / "kernel_meta" + meta_dir.mkdir() + (meta_dir / "kernel.o").write_bytes(b"obj") + meta = {"tiling_data": [0x10, 0x20, 0x30]} + (meta_dir / "metadata.json").write_text(json.dumps(meta)) + + kernel_o, tiling = extract_kernel_artifacts(str(meta_dir)) + assert tiling == bytes([0x10, 0x20, 0x30]) + + def test_extracts_tiling_hex_from_json(self, tmp_path): + """Tiling data as hex string in JSON.""" + import json + from ascendc_compiler import extract_kernel_artifacts + + meta_dir = tmp_path / "kernel_meta" + meta_dir.mkdir() + (meta_dir / "kernel.o").write_bytes(b"obj") + meta = {"tiling_data": "aabb"} + (meta_dir / "metadata.json").write_text(json.dumps(meta)) + + kernel_o, tiling = extract_kernel_artifacts(str(meta_dir)) + assert tiling == bytes([0xaa, 0xbb]) + + def test_missing_dir_raises(self): + """Raises FileNotFoundError for non-existent directory.""" + from ascendc_compiler import extract_kernel_artifacts + + with pytest.raises(FileNotFoundError): + extract_kernel_artifacts("/nonexistent/path") + + def test_no_o_file_returns_none(self, tmp_path): + """Returns (None, ...) when no .o file exists.""" + from ascendc_compiler import extract_kernel_artifacts + + meta_dir = tmp_path / "kernel_meta" + meta_dir.mkdir() + (meta_dir / "readme.txt").write_text("no object file") + + kernel_o, tiling = extract_kernel_artifacts(str(meta_dir)) + assert kernel_o is None + + +class TestAscendCCompilerInit: + """Test AscendCCompiler initialization.""" + + def test_sim_platform_raises_on_compile(self): + """Simulation platforms raise RuntimeError when trying to wrap+link.""" + from ascendc_compiler import AscendCCompiler + + compiler = AscendCCompiler(platform="a2a3sim") + with pytest.raises(RuntimeError, match="hardware platform"): + compiler.compile_ascendc_kernel( + ascendc_kernel_o=b"fake_kernel_o", + ascendc_kernel_symbol="op", + tensor_args=[{"name": "a", "direction": "input"}], + ) + + def test_unknown_platform_raises(self): + """Unknown platform raises ValueError.""" + from ascendc_compiler import AscendCCompiler + + with pytest.raises(ValueError, match="Unknown platform"): + AscendCCompiler(platform="unknown_platform") + + +class TestAscendCToolchain: + """Test AscendCToolchain compiler flags and include dirs.""" + + @pytest.fixture(autouse=True) + def _ensure_ascend(self): + import env_manager + env_manager.ensure("ASCEND_HOME_PATH") + + def test_flags_use_aicore_lang(self): + """AscendC flags must use --cce-aicore-lang, not -x cce.""" + from toolchain import AscendCToolchain + + tc = AscendCToolchain(platform="a2a3") + flags = tc.get_compile_flags(core_type="aiv") + assert "--cce-aicore-lang" in flags + assert "-x" not in flags + assert "cce" not in flags + assert "--cce-auto-sync" in flags + + def test_flags_contain_arch(self): + """Flags include correct architecture for platform and core_type.""" + from toolchain import AscendCToolchain + + tc = AscendCToolchain(platform="a2a3") + aiv_flags = tc.get_compile_flags(core_type="aiv") + assert "--cce-aicore-arch=dav-c220-vec" in aiv_flags + + aic_flags = tc.get_compile_flags(core_type="aic") + assert "--cce-aicore-arch=dav-c220-cube" in aic_flags + + def test_include_dirs_are_specific(self): + """Include dirs are specific paths, not os.walk results.""" + from toolchain import AscendCToolchain + + tc = AscendCToolchain(platform="a2a3") + dirs = tc.get_ascendc_include_dirs() + assert len(dirs) > 0 + # All returned dirs must actually exist + for d in dirs: + assert Path(d).is_dir(), f"Include dir does not exist: {d}" + # Must include key AscendC directories + dir_basenames = [Path(d).name for d in dirs] + assert "include" in dir_basenames # asc/include + + def test_pto_flags_use_x_cce(self): + """PTO flags use -x cce, not --cce-aicore-lang.""" + from toolchain import CCECToolchain + + tc = CCECToolchain(platform="a2a3") + flags = tc.get_compile_flags(core_type="aiv") + assert "-x" in flags + assert "cce" in flags + assert "--cce-aicore-lang" not in flags + + +class TestCodeRunnerAscendCDispatch: + """Test that CodeRunner dispatches to AscendC compiler for compiler='ascendc' kernels.""" + + def test_default_compiler_is_pto(self): + """Kernels without 'compiler' field default to 'pto'.""" + kernel = {"func_id": 0, "source": "/fake.cpp", "core_type": "aiv"} + assert kernel.get("compiler", "pto") == "pto" + + def test_ascendc_compiler_detected(self): + """Kernels with compiler='ascendc' are detected.""" + kernel = { + "func_id": 0, + "source": "/fake.o", + "core_type": "aiv", + "compiler": "ascendc", + } + assert kernel.get("compiler", "pto") == "ascendc" + + def test_source_must_be_o_file(self): + """AscendC kernels require .o files, not .cpp sources.""" + kernel_o = {"source": "ascendc/add_custom.o", "compiler": "ascendc"} + assert kernel_o["source"].endswith(".o")