diff --git a/examples/a2a3/tensormap_and_ringbuffer/mixed_example/kernels/orchestration/mixed_orch.cpp b/examples/a2a3/tensormap_and_ringbuffer/mixed_example/kernels/orchestration/mixed_orch.cpp index 739a30f03..f8efcebbe 100644 --- a/examples/a2a3/tensormap_and_ringbuffer/mixed_example/kernels/orchestration/mixed_orch.cpp +++ b/examples/a2a3/tensormap_and_ringbuffer/mixed_example/kernels/orchestration/mixed_orch.cpp @@ -102,13 +102,13 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry( Arg args; args.add_input(ext_A); args.add_input(ext_B); - args.add_inout(C_view); + args.add_output(C_view); args.add_input(ext_D); args.add_input(ext_E); - args.add_inout(F_view); + args.add_output(F_view); args.add_input(ext_G); args.add_input(ext_H); - args.add_inout(I_view); + args.add_output(I_view); pto2_rt_submit_task(mk, args); } @@ -117,7 +117,7 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry( Arg args; args.add_input(ext_A); args.add_input(ext_B); - args.add_inout(J_view); + args.add_output(J_view); pto2_rt_submit_aic_task(FUNC_MATMUL, args); } @@ -126,7 +126,7 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry( Arg args; args.add_input(ext_D); args.add_input(ext_E); - args.add_inout(K_view); + args.add_output(K_view); pto2_rt_submit_aiv_task(FUNC_ADD_STANDALONE, args); } @@ -138,10 +138,10 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry( Arg args; args.add_input(ext_D); args.add_input(ext_E); - args.add_inout(L_view); + args.add_output(L_view); args.add_input(ext_G); args.add_input(ext_H); - args.add_inout(M_view); + args.add_output(M_view); pto2_rt_submit_task(mk, args); } @@ -153,10 +153,10 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry( Arg args; args.add_input(ext_A); args.add_input(ext_B); - args.add_inout(N_view); + args.add_output(N_view); args.add_input(ext_D); args.add_input(ext_E); - args.add_inout(O_view); + args.add_output(O_view); pto2_rt_submit_task(mk, args); } } diff --git a/examples/a2a3/tensormap_and_ringbuffer/vector_example/kernels/orchestration/example_orchestration.cpp b/examples/a2a3/tensormap_and_ringbuffer/vector_example/kernels/orchestration/example_orchestration.cpp index 142555ed9..1c6242497 100644 --- a/examples/a2a3/tensormap_and_ringbuffer/vector_example/kernels/orchestration/example_orchestration.cpp +++ b/examples/a2a3/tensormap_and_ringbuffer/vector_example/kernels/orchestration/example_orchestration.cpp @@ -110,7 +110,7 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry( Arg params_t4; params_t4.add_input(g); params_t4.add_input(c); - params_t4.add_inout(ext_f); + params_t4.add_output(ext_f); pto2_rt_submit_aiv_task(0, params_t4); // kernel_add } // inner scope ends: releases d, e, g } diff --git a/python/runtime_compiler.py b/python/runtime_compiler.py index 27723e9e6..d4e025a5f 100644 --- a/python/runtime_compiler.py +++ b/python/runtime_compiler.py @@ -279,11 +279,23 @@ def _run_build_step( logger.debug(result.stderr) if result.returncode != 0: - logger.error(f"[{platform}] {step_name} failed: {result.stderr}") - raise RuntimeError(f"{step_name} failed for {platform}: {result.stderr}") + self._log_failed_build_output(platform, step_name, result) + raise RuntimeError(f"{step_name} failed for {platform} with exit code {result.returncode}") except FileNotFoundError: raise RuntimeError(f"{step_name} not found. Please install {step_name}.") + @staticmethod + def _log_failed_build_output(platform: str, step_name: str, result: subprocess.CompletedProcess) -> None: + """Emit captured build output at ERROR level so failures are visible by default.""" + logger.error(f"[{platform}] {step_name} failed with exit code {result.returncode}") + + if result.stdout: + logger.error(f"[{platform}] {step_name} stdout:\n{result.stdout.rstrip()}") + if result.stderr: + logger.error(f"[{platform}] {step_name} stderr:\n{result.stderr.rstrip()}") + if not result.stdout and not result.stderr: + logger.error(f"[{platform}] {step_name} produced no stdout/stderr output") + def _run_compilation( self, cmake_source_dir: str, diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/common.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/common.h index b1da07086..1a5af9de3 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/common.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/common.h @@ -1,42 +1,55 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ + #pragma once #include #include + #include #include /** - * 获取当前调用栈信息(包含文件路径和行号) - * 实现在 common.cpp 中 + * Get the current stack trace, including file paths and line numbers. + * Implemented in common.cpp. */ std::string get_stacktrace(int skip_frames = 1); /** - * 断言失败异常,包含文件、行号、条件和调用栈信息 + * Assertion failure exception with condition, file, line, and stack trace. */ class AssertionError : public std::runtime_error { -public: + public: AssertionError(const char* condition, const char* file, int line); const char* condition() const { return condition_; } const char* file() const { return file_; } int line() const { return line_; } -private: + private: const char* condition_; const char* file_; int line_; }; /** - * 断言失败时的处理函数 - * 实现在 common.cpp 中 + * Assertion failure handler. + * Implemented in common.cpp. */ [[noreturn]] void assert_impl(const char* condition, const char* file, int line); /** - * debug_assert 宏 - 在 debug 模式下检查条件,失败时抛出异常并打印调用栈 - * 在 release 模式 (NDEBUG) 下为空操作 + * debug_assert macro: + * checks the condition in debug builds and throws with a stack trace on failure. + * It is a no-op in release builds (NDEBUG). */ #ifdef NDEBUG #define debug_assert(cond) ((void)0) @@ -50,7 +63,8 @@ class AssertionError : public std::runtime_error { #endif /** - * always_assert 宏 - 无论 debug 还是 release 模式都检查条件 + * always_assert macro: + * checks the condition in both debug and release builds. */ #define always_assert(cond) \ do { \ @@ -58,3 +72,22 @@ class AssertionError : public std::runtime_error { assert_impl(#cond, __FILE__, __LINE__); \ } \ } while (0) + +#define PTO_PRAGMA(x) _Pragma(#x) + +#if defined(__clang__) +#define MAYBE_UNINITIALIZED_BEGIN \ + PTO_PRAGMA(clang diagnostic push) \ + PTO_PRAGMA(clang diagnostic ignored "-Wuninitialized") \ + PTO_PRAGMA(clang diagnostic ignored "-Wsometimes-uninitialized") +#define MAYBE_UNINITIALIZED_END PTO_PRAGMA(clang diagnostic pop) +#elif defined(__GNUC__) +#define MAYBE_UNINITIALIZED_BEGIN \ + PTO_PRAGMA(GCC diagnostic push) \ + PTO_PRAGMA(GCC diagnostic ignored "-Wuninitialized") \ + PTO_PRAGMA(GCC diagnostic ignored "-Wmaybe-uninitialized") +#define MAYBE_UNINITIALIZED_END PTO_PRAGMA(GCC diagnostic pop) +#else +#define MAYBE_UNINITIALIZED_BEGIN +#define MAYBE_UNINITIALIZED_END +#endif diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index 678656b82..9a6b5fad8 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -114,6 +114,41 @@ static uint32_t g_orch_submit_idx = 0; #define CYCLE_COUNT_LAP_RECORD(acc, phase_id, tid) #endif +static bool pto2_append_fanin_or_fail(PTO2OrchestratorState* orch, + PTO2TaskId task_id, + int32_t tensor_arg_index, + TensorArgType ptype, + PTO2TaskSlotState* prod_state, + PTO2TaskSlotState* fanin_states[], + int32_t* fanin_count, + const char* reason) { + for (int32_t j = 0; j < *fanin_count; j++) { + if (fanin_states[j] == prod_state) { + return true; + } + } + + if (*fanin_count >= PTO2_MAX_INPUTS) { + LOG_ERROR("========================================"); + LOG_ERROR("FATAL: Dependency Overflow Detected!"); + LOG_ERROR("========================================"); + LOG_ERROR("Task requires more than PTO2_MAX_INPUTS unique fanin dependencies."); + LOG_ERROR(" task_id.raw: %" PRIu64, task_id.raw); + LOG_ERROR(" tensor_arg_index: %d", tensor_arg_index); + LOG_ERROR(" tensor_arg_type: %d", static_cast(ptype)); + LOG_ERROR(" fanin_count: %d / %d", *fanin_count, PTO2_MAX_INPUTS); + LOG_ERROR(" reason: %s", reason); + LOG_ERROR("This is a runtime dependency-tracking limit."); + LOG_ERROR("========================================"); + orch->sm_handle->header->orch_error_code.store(PTO2_ERROR_DEPENDENCY_OVERFLOW, std::memory_order_release); + orch->fatal = true; + return false; + } + + fanin_states[(*fanin_count)++] = prod_state; + return true; +} + // ============================================================================= // Orchestrator Initialization // ============================================================================= @@ -377,7 +412,7 @@ TaskOutputTensors pto2_submit_mixed_task( int32_t local_id = alloc_result.task_id; int32_t slot = alloc_result.slot; - PTO2TaskId task_id = pto2_make_task_id(ring_id, static_cast(local_id)); + PTO2TaskId task_id = PTO2TaskId::make(ring_id, static_cast(local_id)); PTO2TaskDescriptor& task = allocator.task_by_slot(slot); PTO2TaskPayload* payload = &orch->sm_handle->task_payloads[ring_id][slot]; @@ -446,47 +481,48 @@ TaskOutputTensors pto2_submit_mixed_task( // === STEP 3: Lookup inputs + materialize runtime-created outputs === for (int i = 0; i < args.tensor_count(); i++) { TensorArgType ptype = args.tag(i); + if (ptype == TensorArgType::OUTPUT) { + // Runtime-created OUTPUT tensors are not looked up in the TensorMap since they have no dependencies. + continue; + } - switch (ptype) { - case TensorArgType::INOUT: - case TensorArgType::INPUT: { - if (args.tensor(i).ptr->manual_dep) break; - // Look up producer via TensorMap (reads from cached stack tensor) - PTO2LookupResult lookup_result; - orch->tensor_map.lookup(*args.tensor(i).ptr, lookup_result); - - for (int r = 0; r < lookup_result.count; r++) { - PTO2TensorMapEntry& entry = *lookup_result.entries[r].entry; - auto overlap_status = lookup_result.entries[r].overlap_status; - // Check if this producer is already in fanin list (avoid duplicates) - auto prod_ring = entry.producer_task_id.ring(); - auto prod_local = entry.producer_task_id.local(); - PTO2TaskSlotState* prod_state = - &sched->ring_sched_states[prod_ring].get_slot_state_by_task_id(prod_local); - bool already_added = false; - for (int j = 0; j < fanin_count; j++) { - if (fanin_states[j] == prod_state) { - already_added = true; - break; - } - } - - if (!already_added) { - // Add to fanin list (this task depends on producer) - if (fanin_count < PTO2_MAX_INPUTS) { - fanin_states[fanin_count++] = prod_state; - } - } - if (ptype == TensorArgType::INOUT && overlap_status == OverlapStatus::COVERED) { - if (!entry.with_alloc) { - orch->tensor_map.remove_entry(entry); - } - } - } - break; + const Tensor* tensor = args.tensor(i).ptr; + + // Step A: creator retention — all existing tensors extend their creator lifetime. + PTO2TaskId owner = tensor->owner_task_id; + if (owner.is_valid() && sched != nullptr) { + PTO2TaskSlotState* prod_state = + &sched->ring_sched_states[owner.ring()].get_slot_state_by_task_id(owner.local()); + if (!pto2_append_fanin_or_fail( + orch, task_id, i, ptype, prod_state, fanin_states, &fanin_count, "creator retention")) { + return result; + } + } + + // Step B: only INPUT/INOUT need modifier dependency lookup. + if (ptype != TensorArgType::INPUT && ptype != TensorArgType::INOUT) { + continue; + } + if (tensor->manual_dep) { + continue; + } + + PTO2LookupResult lookup_result; + orch->tensor_map.lookup(*tensor, lookup_result); + + for (int r = 0; r < lookup_result.count; r++) { + PTO2TensorMapEntry& entry = *lookup_result.entries[r].entry; + auto overlap_status = lookup_result.entries[r].overlap_status; + auto prod_ring = entry.producer_task_id.ring(); + auto prod_local = entry.producer_task_id.local(); + PTO2TaskSlotState* prod_state = &sched->ring_sched_states[prod_ring].get_slot_state_by_task_id(prod_local); + if (!pto2_append_fanin_or_fail( + orch, task_id, i, ptype, prod_state, fanin_states, &fanin_count, "overlap lookup")) { + return result; + } + if (ptype == TensorArgType::INOUT && overlap_status == OverlapStatus::COVERED) { + orch->tensor_map.remove_entry(entry); } - default: - break; } } @@ -496,16 +532,9 @@ TaskOutputTensors pto2_submit_mixed_task( { for (int i = 0; i < args.tensor_count(); i++) { TensorArgType ptype = args.tag(i); - if (ptype == TensorArgType::INOUT) { + if (ptype == TensorArgType::INOUT || ptype == TensorArgType::OUTPUT_EXISTING) { if (!args.tensor(i).ptr->manual_dep) { - orch->tensor_map.insert(*args.tensor(i).ptr, task_id, false); - } - } else if (ptype == TensorArgType::OUTPUT) { - if (!args.tensor(i).create_info->manual_dep) { - orch->tensor_map.insert(*args.tensor(i).create_info, - reinterpret_cast(reinterpret_cast(alloc_result.packed_base) + offsets[i]), - task_id, - true); + orch->tensor_map.insert(*args.tensor(i).ptr, task_id); } } } @@ -536,6 +565,14 @@ TaskOutputTensors pto2_submit_mixed_task( payload->init(args, result, alloc_result.packed_base, offsets, buffer_sizes); + // Write owner_task_id into materialized OUTPUT tensors so creator-only dependency + // tracking remains available even when manual_dep skips OverlapMap publication. + for (int i = 0; i < args.tensor_count(); i++) { + if (args.tag(i) == TensorArgType::OUTPUT) { + payload->tensors[i].owner_task_id = task_id; + } + } + CYCLE_COUNT_LAP_RECORD(g_orch_args_cycle, AicpuPhaseId::ORCH_PARAMS, task_id.raw); #if PTO2_ORCH_PROFILING g_orch_args_atomic_count += 2; // fanout_lock.store + fanout_count.store diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp index 55b67aa20..bd049ecf8 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2.cpp @@ -1,3 +1,14 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ + /** * PTO Runtime2 - Main Implementation * @@ -7,11 +18,15 @@ */ #include "pto_runtime2.h" + +#include #include #include -#include -#include "common/unified_log.h" + +#include + #include "aicpu/device_time.h" +#include "common/unified_log.h" // Weak fallback for HOST .so builds (never called, but satisfies linker). // The AICPU build links the strong symbol from platform/.../device_time.cpp. @@ -24,86 +39,98 @@ __attribute__((weak, visibility("hidden"))) uint64_t get_sys_cnt_aicpu() { retur thread_local int pto2_current_orch_idx = 0; -void pto2_set_orch_thread_idx(int idx) { - pto2_current_orch_idx = idx; -} +void pto2_set_orch_thread_idx(int idx) { pto2_current_orch_idx = idx; } // ============================================================================= // Orchestration Ops Table (function-pointer dispatch for orchestration .so) // ============================================================================= -static TaskOutputTensors submit_task_impl(PTO2Runtime* rt, const MixedKernels& mixed_kernels, - const Arg& args) { - return pto2_submit_mixed_task(&rt->orchestrators[pto2_current_orch_idx], mixed_kernels, - args); +static TaskOutputTensors submit_task_impl(PTO2Runtime* rt, const MixedKernels& mixed_kernels, const Arg& args) { + return pto2_submit_mixed_task(&rt->orchestrators[pto2_current_orch_idx], mixed_kernels, args); } -void pto2_rt_scope_begin(PTO2Runtime* rt) { - pto2_scope_begin(&rt->orchestrators[pto2_current_orch_idx]); -} +void pto2_rt_scope_begin(PTO2Runtime* rt) { pto2_scope_begin(&rt->orchestrators[pto2_current_orch_idx]); } -void pto2_rt_scope_end(PTO2Runtime* rt) { - pto2_scope_end(&rt->orchestrators[pto2_current_orch_idx]); -} +void pto2_rt_scope_end(PTO2Runtime* rt) { pto2_scope_end(&rt->orchestrators[pto2_current_orch_idx]); } -void pto2_rt_orchestration_done(PTO2Runtime* rt) { - pto2_orchestrator_done(&rt->orchestrators[pto2_current_orch_idx]); -} +void pto2_rt_orchestration_done(PTO2Runtime* rt) { pto2_orchestrator_done(&rt->orchestrators[pto2_current_orch_idx]); } -static bool is_fatal_impl(PTO2Runtime* rt) { - return rt->orchestrators[pto2_current_orch_idx].fatal; -} +static bool is_fatal_impl(PTO2Runtime* rt) { return rt->orchestrators[pto2_current_orch_idx].fatal; } -// Wait for TensorMap producers of this tensor to be safe for data access. -// For reads: wait until producer COMPLETED (done writing). +// Wait for all producers of this tensor to be safe for data access. +// Checks owner metadata (lifecycle anchor) and OverlapMap (modifier writers). +// For reads: wait until each producer COMPLETED (done writing). // For writes: also wait until all consumers done reading // (fanout_refcount >= fanout_count - 1, excluding scope reference). // Uses cycle-based timeout (checked every 1024 spins). // Returns false on timeout (sets orch.fatal). -static bool wait_for_tensor_ready(PTO2Runtime* rt, const Tensor& tensor, - bool wait_for_consumers, const char* caller) { +MAYBE_UNINITIALIZED_BEGIN +static bool wait_for_tensor_ready(PTO2Runtime* rt, const Tensor& tensor, bool wait_for_consumers, const char* caller) { PTO2OrchestratorState& orch = rt->orchestrators[pto2_current_orch_idx]; + + // Collect producer slot states from both maps, deduplicated by pointer. + // +1: one creator slot + up to PTO2_LOOKUP_MAX_RESULTS modifier slots. + constexpr int kMaxWait = PTO2_LOOKUP_MAX_RESULTS + 1; + PTO2TaskSlotState* slots[kMaxWait]; + int slot_count = 0; + + // Step A: creator retention — read owner directly from tensor metadata + PTO2TaskId owner = tensor.owner_task_id; + if (owner.is_valid()) { + slots[slot_count++] = &rt->scheduler.ring_sched_states[owner.ring()].get_slot_state_by_task_id(owner.local()); + } + + // Step B: modifier writer lookup (OverlapMap) PTO2LookupResult lookup_result; orch.tensor_map.lookup(tensor, lookup_result); - for (int r = 0; r < lookup_result.count; r++) { - PTO2TensorMapEntry& entry = *lookup_result.entries[r].entry; - PTO2TaskId producer_id = entry.producer_task_id; - uint8_t ring_id = producer_id.ring(); - int32_t local_id = producer_id.local(); - PTO2TaskSlotState& slot = - rt->scheduler.ring_sched_states[ring_id].get_slot_state_by_task_id(local_id); - - // Wait for producer to complete (WAW safety) + PTO2TaskId pid = lookup_result.entries[r].entry->producer_task_id; + PTO2TaskSlotState* s = &rt->scheduler.ring_sched_states[pid.ring()].get_slot_state_by_task_id(pid.local()); + bool already = false; + for (int j = 0; j < slot_count; j++) { + if (slots[j] == s) { + already = true; + break; + } + } + if (!already && slot_count < kMaxWait) { + slots[slot_count++] = s; + } + } + + // Wait for each producer + for (int p = 0; p < slot_count; p++) { + PTO2TaskSlotState& slot = *slots[p]; + uint8_t ring_id = slot.ring_id; + int32_t local_id = static_cast(slot.task->task_id.local()); + uint64_t t0 = get_sys_cnt_aicpu(); int32_t spin_count = 0; while (slot.task_state.load(std::memory_order_acquire) < PTO2_TASK_COMPLETED) { SPIN_WAIT_HINT(); - if ((++spin_count & 1023) == 0 && - get_sys_cnt_aicpu() - t0 > PTO2_TENSOR_DATA_TIMEOUT_CYCLES) { + if ((++spin_count & 1023) == 0 && get_sys_cnt_aicpu() - t0 > PTO2_TENSOR_DATA_TIMEOUT_CYCLES) { orch.fatal = true; unified_log_error(caller, "Timeout (%llu cycles): producer (ring=%d, local=%d) not completed", - (unsigned long long)PTO2_TENSOR_DATA_TIMEOUT_CYCLES, ring_id, local_id); + (unsigned long long)PTO2_TENSOR_DATA_TIMEOUT_CYCLES, // NOLINT(runtime/int) + ring_id, + local_id); return false; } } - // For writes: also wait for all consumers to finish reading (WAR safety). - // fanout_count includes 1 scope reference that won't release until scope_end, - // so wait until fanout_refcount >= fanout_count - 1. if (wait_for_consumers) { t0 = get_sys_cnt_aicpu(); spin_count = 0; - while (slot.fanout_refcount.load(std::memory_order_acquire) - < slot.fanout_count - 1) { + while (slot.fanout_refcount.load(std::memory_order_acquire) < slot.fanout_count - 1) { SPIN_WAIT_HINT(); - if ((++spin_count & 1023) == 0 && - get_sys_cnt_aicpu() - t0 > PTO2_TENSOR_DATA_TIMEOUT_CYCLES) { + if ((++spin_count & 1023) == 0 && get_sys_cnt_aicpu() - t0 > PTO2_TENSOR_DATA_TIMEOUT_CYCLES) { orch.fatal = true; unified_log_error(caller, "Timeout (%llu cycles): consumers of producer (ring=%d, local=%d) not done", - (unsigned long long)PTO2_TENSOR_DATA_TIMEOUT_CYCLES, ring_id, local_id); + (unsigned long long)PTO2_TENSOR_DATA_TIMEOUT_CYCLES, // NOLINT(runtime/int) + ring_id, + local_id); return false; } } @@ -111,9 +138,9 @@ static bool wait_for_tensor_ready(PTO2Runtime* rt, const Tensor& tensor, } return true; } +MAYBE_UNINITIALIZED_END -uint64_t pto2_get_tensor_data(PTO2Runtime* rt, const Tensor& tensor, - uint32_t ndims, const uint32_t indices[]) { +uint64_t pto2_get_tensor_data(PTO2Runtime* rt, const Tensor& tensor, uint32_t ndims, const uint32_t indices[]) { if (tensor.buffer.addr == 0) { unified_log_error(__FUNCTION__, "get_tensor_data: buffer not allocated (addr=0). " @@ -127,16 +154,14 @@ uint64_t pto2_get_tensor_data(PTO2Runtime* rt, const Tensor& tensor, uint64_t flat_offset = tensor.compute_flat_offset(indices, ndims); uint64_t elem_size = get_element_size(tensor.dtype); - const void* ptr = reinterpret_cast( - tensor.buffer.addr + flat_offset * elem_size); + const void* ptr = reinterpret_cast(tensor.buffer.addr + flat_offset * elem_size); uint64_t result = 0; memcpy(&result, ptr, elem_size); return result; } -void pto2_set_tensor_data(PTO2Runtime* rt, const Tensor& tensor, - uint32_t ndims, const uint32_t indices[], - uint64_t value) { +void pto2_set_tensor_data( + PTO2Runtime* rt, const Tensor& tensor, uint32_t ndims, const uint32_t indices[], uint64_t value) { if (tensor.buffer.addr == 0) { unified_log_error(__FUNCTION__, "set_tensor_data: buffer not allocated (addr=0). " @@ -151,24 +176,23 @@ void pto2_set_tensor_data(PTO2Runtime* rt, const Tensor& tensor, uint64_t flat_offset = tensor.compute_flat_offset(indices, ndims); uint64_t elem_size = get_element_size(tensor.dtype); - void* ptr = reinterpret_cast( - tensor.buffer.addr + flat_offset * elem_size); + void* ptr = reinterpret_cast(tensor.buffer.addr + flat_offset * elem_size); memcpy(ptr, &value, elem_size); } static const PTO2RuntimeOps s_runtime_ops = { - .submit_task = submit_task_impl, - .scope_begin = pto2_rt_scope_begin, - .scope_end = pto2_rt_scope_end, - .orchestration_done = pto2_rt_orchestration_done, - .is_fatal = is_fatal_impl, - .log_error = unified_log_error, - .log_warn = unified_log_warn, - .log_info = unified_log_info, - .log_debug = unified_log_debug, - .log_always = unified_log_always, - .get_tensor_data = pto2_get_tensor_data, - .set_tensor_data = pto2_set_tensor_data, + .submit_task = submit_task_impl, + .scope_begin = pto2_rt_scope_begin, + .scope_end = pto2_rt_scope_end, + .orchestration_done = pto2_rt_orchestration_done, + .is_fatal = is_fatal_impl, + .log_error = unified_log_error, + .log_warn = unified_log_warn, + .log_info = unified_log_info, + .log_debug = unified_log_debug, + .log_always = unified_log_always, + .get_tensor_data = pto2_get_tensor_data, + .set_tensor_data = pto2_set_tensor_data, }; // ============================================================================= @@ -176,17 +200,13 @@ static const PTO2RuntimeOps s_runtime_ops = { // ============================================================================= PTO2Runtime* pto2_runtime_create(PTO2RuntimeMode mode) { - return pto2_runtime_create_custom(mode, - PTO2_TASK_WINDOW_SIZE, - PTO2_HEAP_SIZE); + return pto2_runtime_create_custom(mode, PTO2_TASK_WINDOW_SIZE, PTO2_HEAP_SIZE); } -PTO2Runtime* pto2_runtime_create_custom(PTO2RuntimeMode mode, - uint64_t task_window_size, - uint64_t heap_size, - int32_t dep_pool_capacity) { +PTO2Runtime* pto2_runtime_create_custom( + PTO2RuntimeMode mode, uint64_t task_window_size, uint64_t heap_size, int32_t dep_pool_capacity) { // Allocate runtime context - PTO2Runtime* rt = (PTO2Runtime*)calloc(1, sizeof(PTO2Runtime)); + PTO2Runtime* rt = static_cast(calloc(1, sizeof(PTO2Runtime))); if (!rt) { return NULL; } @@ -203,25 +223,24 @@ PTO2Runtime* pto2_runtime_create_custom(PTO2RuntimeMode mode, // Allocate GM heap for output buffers (all rings combined) uint64_t total_heap_size = heap_size * PTO2_MAX_RING_DEPTH; rt->gm_heap_size = total_heap_size; - #if defined(_POSIX_C_SOURCE) && _POSIX_C_SOURCE >= 200112L - if (posix_memalign(&rt->gm_heap, PTO2_ALIGN_SIZE, total_heap_size) != 0) { - pto2_sm_destroy(rt->sm_handle); - free(rt); - return NULL; - } - #else - rt->gm_heap = aligned_alloc(PTO2_ALIGN_SIZE, total_heap_size); - if (!rt->gm_heap) { - pto2_sm_destroy(rt->sm_handle); - free(rt); - return NULL; - } - #endif +#if defined(_POSIX_C_SOURCE) && _POSIX_C_SOURCE >= 200112L + if (posix_memalign(&rt->gm_heap, PTO2_ALIGN_SIZE, total_heap_size) != 0) { + pto2_sm_destroy(rt->sm_handle); + free(rt); + return NULL; + } +#else + rt->gm_heap = aligned_alloc(PTO2_ALIGN_SIZE, total_heap_size); + if (!rt->gm_heap) { + pto2_sm_destroy(rt->sm_handle); + free(rt); + return NULL; + } +#endif rt->gm_heap_owned = true; // Initialize first orchestrator - if (!pto2_orchestrator_init(&rt->orchestrators[0], rt->sm_handle, - rt->gm_heap, heap_size, dep_pool_capacity)) { + if (!pto2_orchestrator_init(&rt->orchestrators[0], rt->sm_handle, rt->gm_heap, heap_size, dep_pool_capacity)) { free(rt->gm_heap); pto2_sm_destroy(rt->sm_handle); free(rt); @@ -244,16 +263,16 @@ PTO2Runtime* pto2_runtime_create_custom(PTO2RuntimeMode mode, } PTO2Runtime* pto2_runtime_create_from_sm(PTO2RuntimeMode mode, - PTO2SharedMemoryHandle* sm_handle, - void* gm_heap, - uint64_t heap_size, - int orch_count, - int32_t dep_pool_capacity) { + PTO2SharedMemoryHandle* sm_handle, + void* gm_heap, + uint64_t heap_size, + int orch_count, + int32_t dep_pool_capacity) { if (!sm_handle) return NULL; if (orch_count < 1) orch_count = 1; if (orch_count > PTO2_MAX_ORCH_THREADS) orch_count = PTO2_MAX_ORCH_THREADS; - PTO2Runtime* rt = (PTO2Runtime*)calloc(1, sizeof(PTO2Runtime)); + PTO2Runtime* rt = static_cast(calloc(1, sizeof(PTO2Runtime))); if (!rt) return NULL; rt->ops = &s_runtime_ops; @@ -266,8 +285,7 @@ PTO2Runtime* pto2_runtime_create_from_sm(PTO2RuntimeMode mode, // Initialize all orchestrator states for (int i = 0; i < orch_count; i++) { - if (!pto2_orchestrator_init(&rt->orchestrators[i], rt->sm_handle, - rt->gm_heap, heap_size, dep_pool_capacity)) { + if (!pto2_orchestrator_init(&rt->orchestrators[i], rt->sm_handle, rt->gm_heap, heap_size, dep_pool_capacity)) { for (int j = 0; j < i; j++) { pto2_orchestrator_destroy(&rt->orchestrators[j]); } diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h index c6cf2313c..8f89afb25 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_runtime2_types.h @@ -33,6 +33,7 @@ #include "pto2_dispatch_payload.h" #include "pto_submit_types.h" +#include "pto_task_id.h" #include "pto_types.h" // ============================================================================= @@ -77,7 +78,8 @@ #define PTO2_ERROR_HEAP_RING_DEADLOCK 2 #define PTO2_ERROR_FLOW_CONTROL_DEADLOCK 3 #define PTO2_ERROR_DEP_POOL_OVERFLOW 4 -#define PTO2_ERROR_INVALID_ARGS 5 // Arg construction error (invalid args) +#define PTO2_ERROR_INVALID_ARGS 5 // Arg construction error (invalid args) +#define PTO2_ERROR_DEPENDENCY_OVERFLOW 6 // Too many unique fanin dependencies for one task // Scheduler errors (100+): detected in scheduler threads #define PTO2_ERROR_SCHEDULER_TIMEOUT 100 @@ -127,31 +129,8 @@ constexpr uint64_t PTO2_TENSOR_DATA_TIMEOUT_CYCLES = 15 * 1000 * 1000 * 1000ULL; // ============================================================================= /** - * TaskId: 64-bit encoding used across Runtime2. - * - * raw encoding: (ring_id << 32) | local_id - * - * ring_id: which ring layer (0..PTO2_MAX_RING_DEPTH-1) - * local_id: per-ring monotonic counter + * TaskId: defined in pto_task_id.h (included above). */ -struct PTO2TaskId { - uint64_t raw; - - constexpr PTO2TaskId() : raw(0) {} - constexpr explicit PTO2TaskId(uint64_t v) : raw(v) {} - - constexpr uint8_t ring() const { return static_cast(raw >> 32); } - constexpr uint32_t local() const { return static_cast(raw & 0xFFFFFFFFu); } - - constexpr bool operator==(const PTO2TaskId& other) const { return raw == other.raw; } - constexpr bool operator!=(const PTO2TaskId& other) const { return raw != other.raw; } -}; - -static_assert(sizeof(PTO2TaskId) == 8, "PTO2TaskId must stay 8 bytes (shared memory ABI)"); - -static inline PTO2TaskId pto2_make_task_id(uint8_t ring_id, uint32_t local_id) { - return PTO2TaskId{(static_cast(ring_id) << 32) | static_cast(local_id)}; -} // ============================================================================= // Worker Types diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_task_id.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_task_id.h new file mode 100644 index 000000000..595372f90 --- /dev/null +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_task_id.h @@ -0,0 +1,50 @@ +/* + * Copyright (c) PyPTO Contributors. + * This program is free software, you can redistribute it and/or modify it under the terms and conditions of + * CANN Open Software License Agreement Version 2.0 (the "License"). + * Please refer to the License for details. You may not use this file except in compliance with the License. + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, EITHER EXPRESS OR IMPLIED, + * INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, MERCHANTABILITY, OR FITNESS FOR A PARTICULAR PURPOSE. + * See LICENSE in the root of the software repository for the full text of the License. + * ----------------------------------------------------------------------------------------------------------- + */ + +/** + * PTO2TaskId — minimal standalone header. + * + * Factored out of pto_runtime2_types.h so that tensor.h can include it + * without pulling in scheduler-internal constants (heap sizes, timeouts, etc.). + */ + +#pragma once + +#include + +/** + * TaskId: 64-bit encoding used across Runtime2. + * + * raw encoding: (ring_id << 32) | local_id + * + * ring_id: which ring layer (0..PTO2_MAX_RING_DEPTH-1) + * local_id: per-ring monotonic counter + * + * Invalid sentinel: raw == UINT64_MAX (no valid task has this encoding). + */ +struct PTO2TaskId { + uint64_t raw; + + static constexpr PTO2TaskId make(uint8_t ring_id, uint32_t local_id) { + return PTO2TaskId{(static_cast(ring_id) << 32) | static_cast(local_id)}; + } + + static constexpr PTO2TaskId invalid() { return PTO2TaskId{UINT64_MAX}; } + + constexpr uint8_t ring() const { return static_cast(raw >> 32); } + constexpr uint32_t local() const { return static_cast(raw & 0xFFFFFFFFu); } + constexpr bool is_valid() const { return raw != UINT64_MAX; } + + constexpr bool operator==(const PTO2TaskId& other) const { return raw == other.raw; } + constexpr bool operator!=(const PTO2TaskId& other) const { return raw != other.raw; } +}; + +static_assert(sizeof(PTO2TaskId) == 8, "PTO2TaskId must stay 8 bytes (shared memory ABI)"); diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h index 153f378ba..0916d96da 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_tensormap.h @@ -73,7 +73,7 @@ extern uint64_t g_insert_count; * * Cache line 1 (64B, lookup hot path): * next_in_bucket, producer_task_id, buffer_addr — chain traversal + validity + hash match - * version, ndims, is_all_offset_zero, with_alloc, bucket_index — overlap fast path + * version, ndims, is_all_offset_zero, bucket_index — overlap fast path * shapes[5] — overlap comparison * * Cache line 2 (64B, insert/remove/slow-path only): @@ -88,14 +88,14 @@ struct alignas(64) PTO2TensorMapEntry { uint64_t buffer_addr; // 8B: tensor base address (hash key) PTO2TensorMapEntry* next_in_bucket; // 8B: next entry in hash bucket chain PTO2TaskId producer_task_id; // 8B: raw (ring_id << 32) | local_id - int32_t version; // 4B: tensor version for overlap detection int32_t bucket_index; // 4B: bucket index (-1 if unlinked) + uint32_t __padding0__; // 4B: occupies Tensor::start_offset high half + int32_t version; // 4B: tensor version for overlap detection uint32_t ndims; // 4B: number of dimensions + DataType __padding_dtype__; // 1B: occupies Tensor::dtype bool is_all_offset_zero; // 1B: fast-path flag - bool with_alloc; // 1B: true=OUTPUT, false=INOUT - // padding: 2B + uint8_t __padding1__[2]; uint32_t shapes[RUNTIME_MAX_TENSOR_DIMS]; // 20B: shape per dimension - // padding: 4B to fill 64B // === Cache line 2 (64B) — insert/remove/slow-path === PTO2TensorMapEntry* prev_in_bucket; // 8B: prev in hash bucket chain @@ -164,6 +164,7 @@ static_assert(offsetof(PTO2TensorMapEntry, buffer_addr) == offsetof(Tensor, buff static_assert(offsetof(PTO2TensorMapEntry, version) == offsetof(Tensor, version)); static_assert(offsetof(PTO2TensorMapEntry, ndims) == offsetof(Tensor, ndims)); static_assert(offsetof(PTO2TensorMapEntry, is_all_offset_zero) == offsetof(Tensor, is_all_offset_zero)); +static_assert(offsetof(PTO2TensorMapEntry, shapes) == offsetof(Tensor, shapes)); static_assert( offsetof(PTO2TensorMapEntry, prev_in_bucket) == 64, "TensorMapEntry must be exactly 2 cache lines (128 bytes)"); @@ -356,24 +357,10 @@ struct PTO2TensorMap { * @param tensor Tensor produced * @param producer_task_id Task ID of producer */ - void insert(const Tensor& tensor, PTO2TaskId producer_task_id, bool with_alloc) { + void insert(const Tensor& tensor, PTO2TaskId producer_task_id) { PTO2TensorMapEntry* entry = new_entry(); entry->copy_from_tensor(tensor); - link_entry(entry, tensor.buffer.addr, producer_task_id, with_alloc); - } - - /** - * Insert a new entry from TensorCreateInfo (for deferred OUTPUT tensors) - * - * @param tensor_create_info Create-info describing the output - * @param addr Allocated buffer address - * @param producer_task_id Task ID of producer - * @param with_alloc True if runtime allocated the buffer - */ - void insert(const TensorCreateInfo& tensor_create_info, void* addr, PTO2TaskId producer_task_id, bool with_alloc) { - PTO2TensorMapEntry* entry = new_entry(); - entry->copy_tensor_create_info(tensor_create_info, reinterpret_cast(addr)); - link_entry(entry, reinterpret_cast(addr), producer_task_id, with_alloc); + link_entry(entry, tensor.buffer.addr, producer_task_id); } /** @@ -396,7 +383,7 @@ struct PTO2TensorMap { // Only remove if this entry belongs to the retiring task // (slot may have been reused by a newer task) debug_assert(cur_entry->producer_task_id == - pto2_make_task_id(static_cast(ring_id), static_cast(local_id))); + PTO2TaskId::make(static_cast(ring_id), static_cast(local_id))); free_entry(*cur_entry); cur_entry = next_entry; } @@ -426,7 +413,7 @@ struct PTO2TensorMap { /** * Link an initialized entry into bucket and task chains. */ - void link_entry(PTO2TensorMapEntry* entry, uint64_t addr, PTO2TaskId producer_task_id, bool with_alloc) { + void link_entry(PTO2TensorMapEntry* entry, uint64_t addr, PTO2TaskId producer_task_id) { #if PTO2_TENSORMAP_PROFILING g_insert_count++; #endif @@ -436,7 +423,6 @@ struct PTO2TensorMap { int32_t task_slot = local_id & (task_window_sizes[ring_id] - 1); entry->producer_task_id = producer_task_id; - entry->with_alloc = with_alloc; // Insert at head of hash bucket entry->bucket_index = bucket_index; diff --git a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_types.h b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_types.h index 2e2adf425..6c3eb3acc 100644 --- a/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_types.h +++ b/src/a2a3/runtime/tensormap_and_ringbuffer/runtime/pto_types.h @@ -189,6 +189,22 @@ struct Arg : TaskArgs #include -#include "common.h" // NOLINT(build/include_subdir) -#include "data_type.h" // NOLINT(build/include_subdir) +#include "common.h" // NOLINT(build/include_subdir) +#include "data_type.h" // NOLINT(build/include_subdir) +#include "pto_task_id.h" // NOLINT(build/include_subdir) constexpr int RUNTIME_MAX_TENSOR_DIMS = 5; @@ -57,20 +58,20 @@ struct Segment { * * Layout (64B) is aligned with Tensor cacheline 1 so that * init_from_create_info() can copy the entire cacheline with a single memcpy, - * then overwrite buffer.addr and buffer.size separately. + * then overwrite buffer/owner metadata and refresh start_offset later. * * Arg::add_output() stores a pointer to this object, so the original * must remain valid (not a temporary) until after the submit call. */ -class TensorCreateInfo { -public: // NOLINT(whitespace/indent) +class alignas(64) TensorCreateInfo { + public: // NOLINT(whitespace/indent) TensorCreateInfo( const uint32_t shapes[], uint32_t ndims, DataType dtype = DataType::FLOAT32, bool manual_dep = false) : initial_value(0), has_initial_value(false), version(0), - dtype(dtype), ndims(ndims), + dtype(dtype), is_all_offset_zero(true), is_raw_eq_shapes(true), manual_dep(manual_dep) { @@ -95,25 +96,25 @@ class TensorCreateInfo { return total * get_element_size(dtype); } -public: // NOLINT(whitespace/indent) - // --- Bytes [0, 24): TensorCreateInfo-only fields --- - // These occupy the same positions as Tensor::buffer and Tensor::start_offset, - // which are overwritten after the memcpy in init_from_create_info(). + public: // NOLINT(whitespace/indent) + // --- Bytes [0, 32): TensorCreateInfo-only fields --- + // These occupy the same positions as Tensor::buffer, Tensor::owner_task_id, + // and Tensor::start_offset. The runtime overwrites owner metadata after the + // memcpy and refreshes start_offset during payload materialization. uint64_t initial_value; bool has_initial_value; uint8_t __pad1__[7]; - uint64_t __pad2__; // → Tensor::start_offset (zeroed) + uint64_t __pad2__; // → Tensor::owner_task_id + uint64_t __pad3__; // → Tensor::start_offset (zeroed) - // --- Bytes [24, 64): Matches Tensor cacheline 1 layout --- + // --- Bytes [32, 64): Matches Tensor cacheline 1 layout --- int32_t version; // Always 0 for create-info outputs - DataType dtype; uint32_t ndims; + DataType dtype; bool is_all_offset_zero; // Always true for create-info outputs bool is_raw_eq_shapes; // Always true for create-info outputs bool manual_dep; - uint8_t __pad3__; uint32_t raw_shapes[RUNTIME_MAX_TENSOR_DIMS]; // → Tensor::shapes - uint32_t __pad4__; TensorCreateInfo() = default; @@ -137,12 +138,12 @@ static_assert(sizeof(TensorCreateInfo) == 64); * - is_all_offset_zero: when true, offsets[] are implicitly zero — skip offset read/write * - is_raw_eq_shapes: when true, raw_shapes[] == shapes[] — skip raw_shapes read/write, * use shapes[] wherever raw_shapes would be needed - * - manual_dep: when true, dependency tracking is managed manually + * - manual_dep: when true, keep creator retention only and skip OverlapMap dependency tracking * * When BOTH flags are true, cache line 2 is never accessed. * - * Layout: cache line 1 holds hot-path fields (buffer, start_offset, version, - * dtype, ndims, flags, shapes); cache line 2 holds warm-path fields (raw_shapes, offsets). + * Layout: cache line 1 holds hot-path fields (buffer, owner_task_id, start_offset, + * version, ndims, dtype, flags, shapes); cache line 2 holds warm-path fields (raw_shapes, offsets). * * Construction: * Users cannot default-construct or directly construct a Tensor. @@ -154,21 +155,21 @@ static_assert(sizeof(TensorCreateInfo) == 64); */ struct alignas(64) Tensor { // === Cache line 1 (64B) — hot path === - PTOBufferHandle buffer; // Underlying memory buffer (addr in bytes, size in bytes) - uint64_t start_offset; // Cached 1D element offset (precomputed from raw_shapes + offsets), only calc before - // incore, useless in orch - int32_t version; // Tensor version for overlap detection - DataType dtype; // Data type of tensor elements - uint32_t ndims; // Number of dimensions used - bool is_all_offset_zero; // True when all offsets[] are zero (skip offset read/write) - bool is_raw_eq_shapes; // True when raw_shapes[] == shapes[] (skip raw_shapes read/write) - bool manual_dep; // True when dependency is managed manually (skip tensormap lookup/insert) + PTOBufferHandle buffer; // Underlying memory buffer (addr in bytes, size in bytes) + PTO2TaskId owner_task_id; // Creator task; PTO2TaskId::invalid() for external tensors + uint64_t start_offset; // Cached 1D element offset (precomputed from raw_shapes + offsets) + int32_t version; // Tensor version for overlap detection + uint32_t ndims; // Number of dimensions used + DataType dtype; // Data type of tensor elements + bool is_all_offset_zero; // True when all offsets[] are zero (skip offset read/write) + bool is_raw_eq_shapes; // True when raw_shapes[] == shapes[] (skip raw_shapes read/write) + bool manual_dep; // True when dependency tracking is creator-only (skip OverlapMap lookup/insert) uint32_t shapes[RUNTIME_MAX_TENSOR_DIMS]; // Current view shape per dimension - uint32_t __padding__; // === Cache line 2 (64B) — warm path === uint32_t raw_shapes[RUNTIME_MAX_TENSOR_DIMS]; // Underlying buffer shape per dimension uint32_t offsets[RUNTIME_MAX_TENSOR_DIMS]; // Multi-dimensional offset per dimension + uint8_t _pad_cl2[24]; // Tail padding (bytes 104–127) // --- Copy / move / destroy are public (valid tensors can be freely copied) --- Tensor(const Tensor&) = default; @@ -213,6 +214,7 @@ struct alignas(64) Tensor { offsets[i] = in_offsets[i]; } } + owner_task_id = PTO2TaskId::invalid(); } void init(const Tensor& other) { @@ -265,6 +267,7 @@ struct alignas(64) Tensor { } } is_all_offset_zero = all_zero; + owner_task_id = other.owner_task_id; } /// Compute 1D flat element offset from multi-dimensional indices. @@ -286,6 +289,7 @@ struct alignas(64) Tensor { void init_from_create_info(const TensorCreateInfo& ci, void* addr, uint64_t buffer_size) { memcpy(this, &ci, 64); buffer = {reinterpret_cast(addr), buffer_size}; + owner_task_id = PTO2TaskId::invalid(); // caller (orchestrator) overwrites with actual task_id if (ci.has_initial_value) { fill_initial_value(ci.initial_value); } @@ -438,7 +442,7 @@ struct alignas(64) Tensor { return ss.str(); } -private: + private: // Default and parameterized constructors are private. // Valid Tensors come only from controlled entry points. Tensor() = default; @@ -475,12 +479,14 @@ struct alignas(64) Tensor { static_assert(sizeof(Tensor) == 128, "Tensor must be exactly 2 cache lines (128 bytes)"); static_assert(offsetof(Tensor, raw_shapes) == 64); +static_assert(offsetof(Tensor, owner_task_id) == 16, "owner_task_id must be at bytes 16-23 (cacheline 1)"); +static_assert(offsetof(Tensor, start_offset) == 24, "start_offset must be at bytes 24-31 (cacheline 1)"); // TensorCreateInfo layout must match Tensor cacheline 1 for memcpy optimization static_assert(sizeof(TensorCreateInfo) == 64, "TensorCreateInfo must match Tensor cacheline 1 size (64 bytes)"); static_assert(offsetof(TensorCreateInfo, version) == offsetof(Tensor, version)); -static_assert(offsetof(TensorCreateInfo, dtype) == offsetof(Tensor, dtype)); static_assert(offsetof(TensorCreateInfo, ndims) == offsetof(Tensor, ndims)); +static_assert(offsetof(TensorCreateInfo, dtype) == offsetof(Tensor, dtype)); static_assert(offsetof(TensorCreateInfo, is_all_offset_zero) == offsetof(Tensor, is_all_offset_zero)); static_assert(offsetof(TensorCreateInfo, is_raw_eq_shapes) == offsetof(Tensor, is_raw_eq_shapes)); static_assert(offsetof(TensorCreateInfo, manual_dep) == offsetof(Tensor, manual_dep)); diff --git a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp index dc9c28cca..99d7e482b 100644 --- a/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp +++ b/src/a5/runtime/tensormap_and_ringbuffer/runtime/pto_orchestrator.cpp @@ -514,6 +514,9 @@ TaskOutputTensors pto2_submit_mixed_task( result.output_count++; break; } + + default: + break; } } diff --git a/src/common/task_interface/data_type.h b/src/common/task_interface/data_type.h index 843d512a7..baaf54a57 100644 --- a/src/common/task_interface/data_type.h +++ b/src/common/task_interface/data_type.h @@ -32,7 +32,7 @@ /** * Supported data types for tensor elements */ -enum class DataType : uint32_t { +enum class DataType : uint8_t { FLOAT32, // 4 bytes FLOAT16, // 2 bytes INT32, // 4 bytes @@ -45,6 +45,8 @@ enum class DataType : uint32_t { DATA_TYPE_NUM, }; +static_assert(sizeof(DataType) == 1, "DataType must stay 1 byte"); + /** * Get the size in bytes of a single element of the given data type * diff --git a/src/common/task_interface/tensor_arg.h b/src/common/task_interface/tensor_arg.h index e1b1a39c4..ec5bcb436 100644 --- a/src/common/task_interface/tensor_arg.h +++ b/src/common/task_interface/tensor_arg.h @@ -27,7 +27,7 @@ struct ContinuousTensor { uint64_t data; // Host/device memory address uint32_t shapes[CONTINUOUS_TENSOR_MAX_DIMS]; // Shape per dim (element count) uint32_t ndims; // Number of dimensions (1..5) - DataType dtype; // DataType : uint32_t + DataType dtype; // DataType : uint8_t uint64_t nbytes() const { uint64_t total = 1; @@ -44,13 +44,15 @@ struct ContinuousTensor { static_assert( std::is_trivially_copyable::value, "ContinuousTensor must be trivially copyable for DMA"); static_assert( - sizeof(ContinuousTensor) == 40, "ContinuousTensor size must be exactly 40B (36B fields + 4B tail padding)"); + sizeof(ContinuousTensor) == 40, "ContinuousTensor size must be exactly 40B (33B fields + 7B tail padding)"); /** * TensorArgType - Distinguishes inputs, outputs, and in-place updates */ enum class TensorArgType : int32_t { - INPUT = 0, // Read-only input buffer - OUTPUT = 1, // Write-only output buffer (runtime allocates) - INOUT = 2, // Read-then-write: modifier for downstream + INPUT = 0, // Read-only input buffer + OUTPUT = 1, // Write-only output buffer (runtime allocates) + INOUT = 2, // Read-then-write: modifier for downstream + OUTPUT_EXISTING = 3, // Write-only existing tensor: skips OverlapMap lookup, depends on creator + NO_DEP = 4, // No-dependency existing tensor: skips OverlapMap lookup, no publish }; diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/alternating_matmul_add/kernels/orchestration/alternating_orch.cpp b/tests/st/a2a3/tensormap_and_ringbuffer/alternating_matmul_add/kernels/orchestration/alternating_orch.cpp index 2d35c2b0a..1421fd532 100644 --- a/tests/st/a2a3/tensormap_and_ringbuffer/alternating_matmul_add/kernels/orchestration/alternating_orch.cpp +++ b/tests/st/a2a3/tensormap_and_ringbuffer/alternating_matmul_add/kernels/orchestration/alternating_orch.cpp @@ -99,7 +99,7 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry( Arg params_matmul; params_matmul.add_input(A_view); params_matmul.add_input(B_view); - params_matmul.add_inout(C_view); + params_matmul.add_output(C_view); pto2_rt_submit_aic_task(FUNC_MATMUL, params_matmul); total_matmul++; } @@ -119,7 +119,7 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry( Arg params_add; params_add.add_input(X_view); params_add.add_input(Y_view); - params_add.add_inout(Z_view); + params_add.add_output(Z_view); pto2_rt_submit_aiv_task(FUNC_ADD, params_add); total_add++; } diff --git a/tests/st/a2a3/tensormap_and_ringbuffer/scalar_data_test/kernels/orchestration/scalar_data_orch.cpp b/tests/st/a2a3/tensormap_and_ringbuffer/scalar_data_test/kernels/orchestration/scalar_data_orch.cpp index 441dd3109..ec07da7e2 100644 --- a/tests/st/a2a3/tensormap_and_ringbuffer/scalar_data_test/kernels/orchestration/scalar_data_orch.cpp +++ b/tests/st/a2a3/tensormap_and_ringbuffer/scalar_data_test/kernels/orchestration/scalar_data_orch.cpp @@ -225,23 +225,23 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry( set_tensor_data(ext_check, 1, check_idx, waw_val); // ========================================================= - // Step 12: External tensor WAR — must use INOUT, not INPUT + // Step 12: External tensor WAR — must use add_output or add_inout, not add_input // // For external tensors, using add_input() does NOT create a // TensorMap entry. set_tensor_data would then write immediately // without waiting for the reader kernel — a WAR data race. // - // Using add_inout() creates a TensorMap entry, enabling - // set_tensor_data to detect the producer via TensorMap lookup + // Using add_output() (or add_inout()) creates a TensorMap entry, + // enabling set_tensor_data to detect the producer via TensorMap lookup // and wait for fanout_refcount (all consumers done). // - // Here we submit noop with ext_b as INOUT (noop doesn't modify - // data), then set_tensor_data overwrites ext_b[0] = 55.0. + // Here we submit noop with ext_b as write-only output (noop doesn't + // read data), then set_tensor_data overwrites ext_b[0] = 55.0. // set_tensor_data auto-waits for the noop to complete. // ========================================================= { Arg args; - args.add_inout(ext_b); // INOUT: creates TensorMap entry (not INPUT!) + args.add_output(ext_b); // write-only: creates TensorMap entry (not add_input!) pto2_rt_submit_aiv_task(FUNC_NOOP, args); } @@ -258,13 +258,13 @@ __attribute__((visibility("default"))) void aicpu_orchestration_entry( set_tensor_data(ext_b, 1, idx, 0.0f); // ========================================================= - // Step 13: result = a + b (external output via INOUT, kernel_add) + // Step 13: result = a + b (external output via add_output, kernel_add) // ========================================================= { Arg args; args.add_input(ext_a); args.add_input(ext_b); - args.add_inout(ext_result); + args.add_output(ext_result); pto2_rt_submit_aiv_task(FUNC_ADD, args); }