diff --git a/parallel-orch/node.py b/parallel-orch/node.py index 1f34e1b3..7fcfa309 100644 --- a/parallel-orch/node.py +++ b/parallel-orch/node.py @@ -279,6 +279,7 @@ def kill(self): self.exec_ctxt.process.kill() def reset_to_ready(self): + util.perf_log_start("Node", "Reset to Ready", self.cnid) assert self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING, NodeState.SPECULATED] @@ -289,6 +290,7 @@ def reset_to_ready(self): # TODO: make this more sophisticated if self.state in [NodeState.EXECUTING, NodeState.SPEC_EXECUTING]: + util.perf_log("Node", "EXE", self.cnid, optional_message="Killed") self.kill() # Probably delete them from tmpfs too @@ -300,15 +302,19 @@ def reset_to_ready(self): self.exec_ctxt = None self.exec_result = None self.state = NodeState.READY + + util.perf_log("Node", "Reset to Ready", self.cnid) def start_executing(self, env_file): assert self.state == NodeState.READY + util.perf_log_start("Node", "EXE", self.cnid) self.start_command(env_file) self.state = NodeState.EXECUTING def start_spec_executing(self, env_file): assert self.state == NodeState.READY + util.perf_log_start("Node", "EXE", self.cnid) self.start_command(env_file, speculate=True) self.state = NodeState.SPEC_EXECUTING @@ -316,7 +322,11 @@ def commit_frontier_execution(self): assert self.state == NodeState.EXECUTING self.exec_result = ExecResult(self.exec_ctxt.process.pid, self.exec_ctxt.process.returncode) self.gather_fs_actions() + + util.perf_log_start("Node", "Commit", self.cnid) executor.commit_workspace(self.exec_ctxt.sandbox_dir) + util.perf_log("Node", "Commit", self.cnid) + self.state = NodeState.COMMITTED def finish_spec_execution(self): @@ -328,7 +338,11 @@ def finish_spec_execution(self): def commit_speculated(self): assert self.state == NodeState.SPECULATED + + util.perf_log_start("Node", "Commit", self.cnid) executor.commit_workspace(self.exec_ctxt.sandbox_dir) + util.perf_log("Node", "Commit", self.cnid) + self.state = NodeState.COMMITTED def transition_from_stopped_to_executing(self, env_file=None): @@ -370,6 +384,7 @@ def get_rw_set(self): return self.rwset def has_env_conflict_with(self, other_env) -> bool: + util.perf_log_start("Node", "Env Dependency Resolution", self.cnid) # Early return if paths are the same if self.exec_ctxt.pre_env_file == other_env: return False @@ -416,6 +431,7 @@ def parse_env(content): logging.critical(f"Variable {key} differs: node environment has {node_env_vars[key]}, other has {other_env_vars[key]}") conflict_exists = True + util.perf_log("Node", "Env Dependency Resolution", self.cnid) return conflict_exists diff --git a/parallel-orch/partial_program_order.py b/parallel-orch/partial_program_order.py index 73ad58ef..9a772744 100644 --- a/parallel-orch/partial_program_order.py +++ b/parallel-orch/partial_program_order.py @@ -157,15 +157,19 @@ def valid(self): return True def fetch_fs_actions(self): + util.perf_log_start("PPO", "Fetch FS Actions") for node in self.get_executing_normal_and_spec_nodes(): node.gather_fs_actions() + util.perf_log("PPO", "Fetch FS Actions") def _has_fs_deps(self, concrete_node_id: ConcreteNodeId): + util.perf_log_start("PPO", "FS Dep Checking", concrete_node_id) node_of_interest : ConcreteNode = self.get_concrete_node(concrete_node_id) for nid in self.to_be_resolved[concrete_node_id]: node: ConcreteNode = self.get_concrete_node(nid) if node.get_rw_set().has_conflict(node_of_interest.get_rw_set()): return True + util.perf_log("PPO", "FS Dep Checking", concrete_node_id) return False # TODO: It's currently designed this way to avoid reading trace file all the time @@ -188,6 +192,7 @@ def schedule_spec_work(self, concrete_node_id: ConcreteNodeId, env_file: str): def handle_complete(self, concrete_node_id: ConcreteNodeId, has_pending_wait: bool, current_env: str): event_log(f"handle_complete {concrete_node_id}") + util.perf_log("Node", "EXE", concrete_node_id) node = self.get_concrete_node(concrete_node_id) # TODO: complete the state matching if node.is_executing(): @@ -217,6 +222,9 @@ def reset_succeeding_nodes(self, node_id: NodeId, env_file: str): # # uncommitted_node.start_spec_executing(env_file) def adding_new_basic_block(self, concrete_node_id: ConcreteNodeId): + + util.perf_log_start("PPO", "Adding New Basic Block", concrete_node_id) + basic_block = self.hsprog.find_basic_block(concrete_node_id.node_id) if len(self.concrete_nodes) != 0: prev_concrete_node_id = next(reversed(self.concrete_nodes)) @@ -237,6 +245,9 @@ def adding_new_basic_block(self, concrete_node_id: ConcreteNodeId): self.prev_concrete_node[new_concrete_node_id] = [] prev_concrete_node_id = new_concrete_node_id assert concrete_node_id in self.concrete_nodes + + util.perf_log("PPO", "Adding New Basic Block", concrete_node_id) + def finish_wait_unsafe(self, concrete_node_id: ConcreteNodeId): node = self.concrete_nodes[concrete_node_id] diff --git a/parallel-orch/scheduler_server.py b/parallel-orch/scheduler_server.py index 74a8583d..a5e0fbb6 100644 --- a/parallel-orch/scheduler_server.py +++ b/parallel-orch/scheduler_server.py @@ -76,11 +76,15 @@ def __init__(self, socket_file): self.partial_program_order = None def handle_init(self, input_cmd: str): + util.perf_log_start("Scheduler", "Init") + assert(input_cmd.startswith("Init")) partial_order_file = input_cmd.split(":")[1].rstrip() logging.debug(f'Scheduler: Received partial_order_file: {partial_order_file}') self.partial_program_order = util.parse_partial_program_order_from_file(partial_order_file) util.debug_log(str(self.partial_program_order.hsprog)) + + util.perf_log("Scheduler", "Init") def handle_wait(self, input_cmd: str, connection): concrete_node_id, env_file = self.__parse_wait(input_cmd) diff --git a/parallel-orch/util.py b/parallel-orch/util.py index 3886a816..1da17cfd 100644 --- a/parallel-orch/util.py +++ b/parallel-orch/util.py @@ -2,7 +2,6 @@ import logging import os import socket -import subprocess import tempfile import time import re @@ -13,6 +12,7 @@ from partial_program_order import PartialProgramOrder DEBUG_LOG = '[DEBUG_LOG] ' +PERFORMANCE_LOG = '[PERFORMANCE_LOG] ' def debug_log(s): logging.debug(DEBUG_LOG + s) @@ -112,40 +112,40 @@ def compare_env_strings(file1_content, file2_content): dict2 = parse_env_string_to_dict(file2_content) return compare_dicts(dict1, dict2) -def log_time_delta_from_start(module: str, action: str, node=None): - logging.info(f">|{module}|{action}{',' + str(node) if node is not None else ''}|Time From start:{to_milliseconds_str(time.time() - config.START_TIME)}") +# Generate a key for the named timestamp. +# If a custom key is set, it overrides all other arguments. Otherwise, use a combination of all non-None arguments. +def get_timestamp_key(module: str, action: str, node=None): + parts = [module, action] + ([str(node)] if node is not None else []) + return '|'.join(parts) -def set_named_timestamp(action: str, node=None, key=None): - if key is None: - key = f"{action}{',' + str(node) if node is not None else ''}" - config.NAMED_TIMESTAMPS[key] = time.time() +# Convert seconds to a formatted milliseconds string. +def to_milliseconds_str(seconds: float) -> str: + return f"{seconds * 1000:.3f}ms" -def invalidate_named_timestamp(action: str, node=None, key=None): - if key is None: - key = f"{action}{',' + str(node) if node is not None else ''}" - del config.NAMED_TIMESTAMPS[key] +# Log the time delta from the start for a given module and action. +def perf_log_start_only(module: str, action: str, node=None, optional_message=None): + key = get_timestamp_key(module, action, node) + logging.info("%s %s||Time From start:%s||%s", PERFORMANCE_LOG, key, to_milliseconds_str(time.time() - config.START_TIME), optional_message if optional_message is not None else "") -def log_time_delta_from_start_and_set_named_timestamp(module: str, action: str, node=None, key=None): - try: - set_named_timestamp(action, node, key) - logging.info(f">|{module}|{action}{',' + str(node) if node is not None else ''}|Time from start:{to_milliseconds_str(time.time() - config.START_TIME)}") - except KeyError: - logging.error(f"Named timestamp {key} already exists") +# Set a named timestamp. +def set_named_timestamp(module: str, action: str, node=None): + key = get_timestamp_key(module, action, node) + config.NAMED_TIMESTAMPS[key] = time.time() -def log_time_delta_from_named_timestamp(module: str, action: str, node=None, key=None, invalidate=True): +# Log the time delta from the start and set a named timestamp. +def perf_log_start(module: str, action: str, node=None, optional_message=None): + key = get_timestamp_key(module, action, node) + set_named_timestamp(module, action, node) + logging.info("%s %s||Time from start:%s||%s", PERFORMANCE_LOG, key, to_milliseconds_str(time.time() - config.START_TIME), optional_message if optional_message is not None else "") + +# Log the time delta from a named timestamp. +def perf_log(module: str, action: str, node=None, optional_message=None): + key = get_timestamp_key(module, action, node) try: - if key is None: - key = f"{action}{',' + str(node) if node is not None else ''}" - logging.info(f">|{module}|{action}{',' + str(node) if node is not None else ''}|Time from start:{to_milliseconds_str(time.time() - config.START_TIME)}|Step time:{to_milliseconds_str(time.time() - config.NAMED_TIMESTAMPS[key])}") - if invalidate: - invalidate_named_timestamp(action, node, key) + step_time = time.time() - config.NAMED_TIMESTAMPS[key] + logging.info("%s %s||Time from start:%s||Step time:%s||%s", PERFORMANCE_LOG, key, to_milliseconds_str(time.time() - config.START_TIME), to_milliseconds_str(step_time), optional_message if optional_message is not None else "") except KeyError: - logging.error(f"Named timestamp {key} does not exist") - -def to_milliseconds_str(seconds: float) -> str: - return f"{seconds * 1000:.3f}ms" - - + logging.error("Named timestamp %s does not exist", key) def get_all_child_processes(pid): try: diff --git a/report/result_analyzer.py b/report/result_analyzer.py index e0a7d1df..332e6698 100644 --- a/report/result_analyzer.py +++ b/report/result_analyzer.py @@ -1,14 +1,18 @@ -from datetime import datetime import matplotlib.pyplot as plt import matplotlib.dates as mdates import hashlib import numpy as np +import sys +from datetime import datetime +from pprint import pprint +from datetime import datetime class ResultAnalyzer: @staticmethod def process_results(orch_log): log_lines = orch_log.split("\n") prog_blocks = [] + performance_data = {} current_block = [] block_start_time = None @@ -18,20 +22,92 @@ def process_results(orch_log): time_str = parts[1] log_content = parts[2].strip() if log_content == "[PROG_LOG]": - # Start of a new block if current_block: prog_blocks.append((block_start_time, current_block)) current_block = [] block_start_time = datetime.strptime(time_str, "%Y-%m-%d %H:%M:%S,%f") else: - # Continuing the current block state, node_id, command = log_content.replace("[PROG_LOG] ", "").split(",", 2) current_block.append((node_id.strip(), state.strip())) + elif line.startswith("INFO|") and "[PERFORMANCE_LOG]" in line: + parts = line.split("]")[1].strip().split("||") + unprocessed_time = line.split("]")[0].split("|")[1] + log_time = datetime.strptime(unprocessed_time, "%Y-%m-%d %H:%M:%S,%f") + key = parts[0].strip() + optional_parts = parts[1:] + time_from_start = "" + step_time = "" + message = "" + + for part in optional_parts: + if part.startswith("Time from start:"): + time_from_start = part.replace("Time from start:", "").strip() + elif part.startswith("Step time:"): + step_time = part.replace("Step time:", "").strip() + else: + message = part.strip() + + if key not in performance_data: + performance_data[key] = [] + performance_data[key].append({ + "log_time": log_time, + "time_from_start": time_from_start, + "step_time": step_time, + "message": message + }) + + # Process to create summary + performance_summary = {} + for key, entries in performance_data.items(): + entries.sort(key=lambda x: x['log_time']) # Ensure chronological order + last_initial_entry = None + if key not in performance_summary: + performance_summary[key] = [] + + for entry in entries: + if entry['step_time']: # This entry is a second term of a pair + if last_initial_entry: + start_time = last_initial_entry['log_time'] + end_time = entry['log_time'] + step_time = entry['step_time'] + performance_summary[key].append({ + "start_time": start_time, + "end_time": end_time, + "step_time": float(step_time.strip("ms")), + "time_from_start": float(entry['time_from_start'].strip("ms")), + "message": entry['message'] + }) + last_initial_entry = None # Reset for the next pair + else: + # This entry is a potential first term of a pair, keep it and wait for its pair + if last_initial_entry: + # Keep the last of consecutive initial terms + performance_summary[key].append({ + "start_time": last_initial_entry['log_time'], + "end_time": None, + "step_time": "", + "time_from_start": last_initial_entry['time_from_start'], + "message": last_initial_entry['message'] + }) + last_initial_entry = entry + + # Handle the last unpaired initial entry, if any + if last_initial_entry: + performance_summary[key].append({ + "start_time": last_initial_entry['log_time'], + "end_time": None, + "step_time": "", + "time_from_start": last_initial_entry['time_from_start'], + "message": last_initial_entry['message'] + }) + # Append the last block if not empty if current_block: prog_blocks.append((block_start_time, current_block)) - return prog_blocks + return prog_blocks, performance_summary + + @staticmethod def compare_results(bash_output, orch_output, max_lines=1000): @@ -51,4 +127,16 @@ def compare_results(bash_output, orch_output, max_lines=1000): if hash_value not in bash_hashes: diffs.append(f'+ {line}') - return diffs \ No newline at end of file + return diffs + + +def __main__(): + # Example usage + + with open(sys.argv[1], "r") as file: + bash_output = file.read() + pprint(ResultAnalyzer.process_results(bash_output)[1]) + pprint([len(x) for x in ResultAnalyzer.process_results(bash_output)[1].values()]) + +if __name__ == "__main__": + __main__() \ No newline at end of file