diff --git a/tools/visual-pipeline-and-platform-evaluation-tool/Dockerfile.vippet b/tools/visual-pipeline-and-platform-evaluation-tool/Dockerfile.vippet index bb94c047a..1f5318960 100644 --- a/tools/visual-pipeline-and-platform-evaluation-tool/Dockerfile.vippet +++ b/tools/visual-pipeline-and-platform-evaluation-tool/Dockerfile.vippet @@ -18,7 +18,7 @@ FROM intel/dlstreamer:2025.1.2-ubuntu24 USER root -RUN apt-get update && apt-get install --yes --no-install-recommends gstreamer1.0-plugins-ugly +RUN apt-get update && apt-get install --yes --no-install-recommends python3.12-dev gstreamer1.0-plugins-ugly USER dlstreamer @@ -27,7 +27,7 @@ ENV GST_PLUGIN_PATH=$GST_PLUGIN_PATH/usr/lib/x86_64-linux-gnu/gstreamer-1.0/ WORKDIR /home/dlstreamer/vippet ENV VIRTUAL_ENV=/home/dlstreamer/vippet/.venv -RUN python3 -m venv $VIRTUAL_ENV +RUN python3 -m venv $VIRTUAL_ENV --system-site-packages ENV PATH="$VIRTUAL_ENV/bin:$PATH" COPY requirements.txt /home/dlstreamer/vippet/requirements.txt @@ -51,4 +51,4 @@ COPY --chown=dlstreamer:dlstreamer \ COPY --chown=dlstreamer:dlstreamer pipelines/ /home/dlstreamer/vippet/pipelines -CMD ["/bin/bash", "-c", "./cleanup_models.sh; python app.py"] +CMD ["/bin/bash", "-c", "./cleanup_models.sh; python app.py > fps-metrics.txt"] diff --git a/tools/visual-pipeline-and-platform-evaluation-tool/Makefile b/tools/visual-pipeline-and-platform-evaluation-tool/Makefile index 46f6bedba..017a31641 100644 --- a/tools/visual-pipeline-and-platform-evaluation-tool/Makefile +++ b/tools/visual-pipeline-and-platform-evaluation-tool/Makefile @@ -30,7 +30,7 @@ PUBLIC_MODELS_ENV := .public_models_env export RENDER_GROUP_ID := 1000 $(VENV_DIR): requirements.txt ## Create Python venv - python3 -m venv $@ ;\ + python3 -m venv $@ --system-site-packages;\ set +u; . ./$@/bin/activate; set -u ;\ python -m pip install --upgrade pip ;\ python -m pip install -r requirements-dev.txt diff --git a/tools/visual-pipeline-and-platform-evaluation-tool/pipelines/simplevs/pipeline.py b/tools/visual-pipeline-and-platform-evaluation-tool/pipelines/simplevs/pipeline.py index c94651afc..774ddc6ac 100644 --- a/tools/visual-pipeline-and-platform-evaluation-tool/pipelines/simplevs/pipeline.py +++ b/tools/visual-pipeline-and-platform-evaluation-tool/pipelines/simplevs/pipeline.py @@ -236,4 +236,4 @@ def evaluate( else: streams += "fakesink " - return "gst-launch-1.0 -q " + streams + return streams diff --git a/tools/visual-pipeline-and-platform-evaluation-tool/pipelines/smartnvr/pipeline.py b/tools/visual-pipeline-and-platform-evaluation-tool/pipelines/smartnvr/pipeline.py index bc613956f..a1df32d20 100644 --- a/tools/visual-pipeline-and-platform-evaluation-tool/pipelines/smartnvr/pipeline.py +++ b/tools/visual-pipeline-and-platform-evaluation-tool/pipelines/smartnvr/pipeline.py @@ -394,4 +394,4 @@ def evaluate( ) # Evaluate the pipeline - return "gst-launch-1.0 -q " + streams + return streams diff --git a/tools/visual-pipeline-and-platform-evaluation-tool/utils.py b/tools/visual-pipeline-and-platform-evaluation-tool/utils.py index a5cac4f30..69c30f9e4 100644 --- a/tools/visual-pipeline-and-platform-evaluation-tool/utils.py +++ b/tools/visual-pipeline-and-platform-evaluation-tool/utils.py @@ -20,6 +20,11 @@ from gstpipeline import GstPipeline +import gi +gi.require_version('Gst', '1.0') +from gi.repository import Gst, GLib, GObject +Gst.init(None) + cancelled = False logger = logging.getLogger("utils") @@ -333,6 +338,33 @@ def get_video_resolution(video_path): ) return default_width, default_height +def glib_mainloop(mainloop): + try: + mainloop.run() + except KeyboardInterrupt: + pass + +def bus_call(bus, message, pipeline, mainloop): + t = message.type + if t == Gst.MessageType.EOS: + logger.info("pipeline ended") + pipeline.set_state(Gst.State.NULL) + mainloop.quit() + # sys.exit() + elif t == Gst.MessageType.ERROR: + err, debug = message.parse_error() + logger.info("Error:\n{}\nAdditional debug info:\n{}\n".format(err, debug)) + pipeline.set_state(Gst.State.NULL) + mainloop.quit() + # sys.exit() + else: + pass + return True + +def set_callbacks(pipeline, mainloop): + bus = pipeline.get_bus() + bus.add_signal_watch() + bus.connect("message", bus_call, pipeline, mainloop) def run_pipeline_and_extract_metrics( pipeline_cmd: GstPipeline, @@ -379,251 +411,260 @@ def run_pipeline_and_extract_metrics( logger.info(f"Pipeline Command: {_pipeline}") try: - # Set the environment variable to enable all drivers - env = os.environ.copy() - env["GST_VA_ALL_DRIVERS"] = "1" - - # Spawn command in a subprocess - process = Popen(_pipeline.split(" "), stdout=PIPE, stderr=PIPE, env=env) - - exit_code = None - total_fps = None - per_stream_fps = None - num_streams = None - last_fps = None - channels = inference_channels + regular_channels - avg_fps_dict = {} - process_output = [] - process_stderr = [] - - # Define pattern to capture FPSCounter metrics - overall_pattern = r"FpsCounter\(overall ([\d.]+)sec\): total=([\d.]+) fps, number-streams=(\d+), per-stream=([\d.]+) fps" - avg_pattern = r"FpsCounter\(average ([\d.]+)sec\): total=([\d.]+) fps, number-streams=(\d+), per-stream=([\d.]+) fps" - last_pattern = r"FpsCounter\(last ([\d.]+)sec\): total=([\d.]+) fps, number-streams=(\d+), per-stream=([\d.]+) fps" - - stop_event = threading.Event() - shm_fd = None - - def process_worker(): - try: - logger.debug("Starting process worker thread") - while not stop_event.is_set(): - reads, _, _ = select.select( - [process.stdout, process.stderr], [], [], poll_interval - ) - for r in reads: - line = r.readline() - if not line: - continue - if r == process.stdout: - process_output.append(line) - - # Write the average FPS to the log - line_str = line.decode("utf-8") - match = re.search(avg_pattern, line_str) - if match: - result = { - "total_fps": float(match.group(2)), - "number_streams": int(match.group(3)), - "per_stream_fps": float(match.group(4)), - } - logger.info( - f"Avg FPS: {result['total_fps']} fps; Num Streams: {result['number_streams']}; Per Stream FPS: {result['per_stream_fps']} fps." - ) - - # Skip the result if the number of streams does not match the expected channels - if result["number_streams"] != channels: - continue - - latest_fps = result["per_stream_fps"] - - # Write latest FPS to a file - with open( - "/home/dlstreamer/vippet/.collector-signals/fps.txt", - "w", - ) as f: - f.write(f"{latest_fps}\n") - elif r == process.stderr: - process_stderr.append(line) - except (OSError, ValueError, select.error) as e: - logger.error(f"process_worker exception: {e}") - finally: - logger.debug("process_worker thread is stopping") - - # process_worker runs in a separate thread to avoid blocking the main thread in which we want to read frames as fast as possible - worker_thread = threading.Thread(target=process_worker, daemon=True) - worker_thread.start() - - try: - if live_preview_enabled: - # Wait for the metadata file to be created, 10 seconds max - wait_time = 0 - max_wait = 10 - while ( - not os.path.exists(VIDEO_STREAM_META_PATH) - and wait_time < max_wait - ): - time.sleep(0.1) - wait_time += 0.1 - - # Wait for the shared memory file to be created, 10 seconds max - shm_file = None - wait_time = 0 - while shm_file is None and wait_time < max_wait: - shm_file = find_shm_file() - if shm_file is None: - time.sleep(0.1) - wait_time += 0.1 - - if shm_file is None: - logger.error( - f"Could not find shm_file for live preview after {max_wait} seconds, will not show live preview." - ) - else: - shm_fd = open(shm_file, "rb") - - while True: - # Handle interruption - if cancelled: - process.terminate() - cancelled = False - logger.info("Process cancelled, terminating") - break - - # If process is zombie, break and close shm_fd - try: - if ps.Process(process.pid).status() == "zombie": - exit_code = process.wait() - break - except ps.NoSuchProcess as e: - logger.info( - f"Process {process.pid} is no longer running: {e}" - ) - break - - frame = read_shared_memory_frame( - meta_path=VIDEO_STREAM_META_PATH, shm_fd=shm_fd - ) - yield frame - time.sleep(1.0 / DEFAULT_FRAME_RATE) - - # Wait for GStreamer process to end if not already - try: - if process.poll() is None: - exit_code = process.wait(timeout=10) - except subprocess.TimeoutExpired: - logger.warning( - "Process did not exit cleanly after closing socket." - ) - - else: - # No live preview: just wait for process - while process.poll() is None: - if cancelled: - process.terminate() - cancelled = False - logger.info("Process cancelled, terminating") - break - time.sleep(0.1) - try: - if ps.Process(process.pid).status() == "zombie": - exit_code = process.wait() - break - except ps.NoSuchProcess as e: - logger.info( - f"Process {process.pid} is no longer running: {e}" - ) - break - - finally: - logger.debug("Stopping frame worker thread") - stop_event.set() - worker_thread.join() - if shm_fd is not None: - with contextlib.suppress(Exception): - shm_fd.close() - - # Process the output and extract FPS metrics - for line in process_output: - line_str = line.decode("utf-8") - match = re.search(overall_pattern, line_str) - if match: - result = { - "total_fps": float(match.group(2)), - "number_streams": int(match.group(3)), - "per_stream_fps": float(match.group(4)), - } - if result["number_streams"] == channels: - total_fps = result["total_fps"] - num_streams = result["number_streams"] - per_stream_fps = result["per_stream_fps"] - break - - match = re.search(avg_pattern, line_str) - if match: - result = { - "total_fps": float(match.group(2)), - "number_streams": int(match.group(3)), - "per_stream_fps": float(match.group(4)), - } - avg_fps_dict[result["number_streams"]] = result - - match = re.search(last_pattern, line_str) - if match: - result = { - "total_fps": float(match.group(2)), - "number_streams": int(match.group(3)), - "per_stream_fps": float(match.group(4)), - } - last_fps = result - - if total_fps is None and avg_fps_dict.keys(): - if channels in avg_fps_dict.keys(): - total_fps = avg_fps_dict[channels]["total_fps"] - num_streams = avg_fps_dict[channels]["number_streams"] - per_stream_fps = avg_fps_dict[channels]["per_stream_fps"] - else: - closest_match = min( - avg_fps_dict.keys(), - key=lambda x: abs(x - channels), - default=None, - ) - total_fps = avg_fps_dict[closest_match]["total_fps"] - num_streams = avg_fps_dict[closest_match]["number_streams"] - per_stream_fps = avg_fps_dict[closest_match]["per_stream_fps"] - - if total_fps is None and last_fps: - total_fps = last_fps["total_fps"] - num_streams = last_fps["number_streams"] - per_stream_fps = last_fps["per_stream_fps"] - - if total_fps is None: - total_fps = "N/A" - num_streams = "N/A" - per_stream_fps = "N/A" - - # Save results - results.append( - { - "params": params, - "exit_code": exit_code, - "total_fps": total_fps, - "per_stream_fps": per_stream_fps, - "num_streams": num_streams, - "stdout": "".join( - [ - line.decode("utf-8", errors="replace") - for line in process_output - ] - ), - "stderr": "".join( - [ - line.decode("utf-8", errors="replace") - for line in process_stderr - ] - ), - } - ) + mainloop = GLib.MainLoop() + pipeline = Gst.parse_launch(_pipeline) + + set_callbacks(pipeline, mainloop) + + pipeline.set_state(Gst.State.PLAYING) + glib_mainloop(mainloop) + + # ----------------------------------------------------------------------- + # Potential implementation if no better way to read FPS metrics is found: + # Implement reading FPS metrics from the fps-metrics.txt file + # ----------------------------------------------------------------------- + + # # Spawn command in a subprocess + # process = Popen(_pipeline.split(" "), stdout=PIPE, stderr=PIPE, env=env) + # + # exit_code = None + # total_fps = None + # per_stream_fps = None + # num_streams = None + # last_fps = None + # channels = inference_channels + regular_channels + # avg_fps_dict = {} + # process_output = [] + # process_stderr = [] + # + # # Define pattern to capture FPSCounter metrics + # overall_pattern = r"FpsCounter\(overall ([\d.]+)sec\): total=([\d.]+) fps, number-streams=(\d+), per-stream=([\d.]+) fps" + # avg_pattern = r"FpsCounter\(average ([\d.]+)sec\): total=([\d.]+) fps, number-streams=(\d+), per-stream=([\d.]+) fps" + # last_pattern = r"FpsCounter\(last ([\d.]+)sec\): total=([\d.]+) fps, number-streams=(\d+), per-stream=([\d.]+) fps" + # + # stop_event = threading.Event() + # shm_fd = None + # + # def process_worker(): + # try: + # logger.debug("Starting process worker thread") + # while not stop_event.is_set(): + # reads, _, _ = select.select( + # [process.stdout, process.stderr], [], [], poll_interval + # ) + # for r in reads: + # line = r.readline() + # if not line: + # continue + # if r == process.stdout: + # process_output.append(line) + # + # # Write the average FPS to the log + # line_str = line.decode("utf-8") + # match = re.search(avg_pattern, line_str) + # if match: + # result = { + # "total_fps": float(match.group(2)), + # "number_streams": int(match.group(3)), + # "per_stream_fps": float(match.group(4)), + # } + # logger.info( + # f"Avg FPS: {result['total_fps']} fps; Num Streams: {result['number_streams']}; Per Stream FPS: {result['per_stream_fps']} fps." + # ) + # + # # Skip the result if the number of streams does not match the expected channels + # if result["number_streams"] != channels: + # continue + # + # latest_fps = result["per_stream_fps"] + # + # # Write latest FPS to a file + # with open( + # "/home/dlstreamer/vippet/.collector-signals/fps.txt", + # "w", + # ) as f: + # f.write(f"{latest_fps}\n") + # elif r == process.stderr: + # process_stderr.append(line) + # except (OSError, ValueError, select.error) as e: + # logger.error(f"process_worker exception: {e}") + # finally: + # logger.debug("process_worker thread is stopping") + # + # # process_worker runs in a separate thread to avoid blocking the main thread in which we want to read frames as fast as possible + # worker_thread = threading.Thread(target=process_worker, daemon=True) + # worker_thread.start() + # + # try: + # if live_preview_enabled: + # # Wait for the metadata file to be created, 10 seconds max + # wait_time = 0 + # max_wait = 10 + # while ( + # not os.path.exists(VIDEO_STREAM_META_PATH) + # and wait_time < max_wait + # ): + # time.sleep(0.1) + # wait_time += 0.1 + # + # # Wait for the shared memory file to be created, 10 seconds max + # shm_file = None + # wait_time = 0 + # while shm_file is None and wait_time < max_wait: + # shm_file = find_shm_file() + # if shm_file is None: + # time.sleep(0.1) + # wait_time += 0.1 + # + # if shm_file is None: + # logger.error( + # f"Could not find shm_file for live preview after {max_wait} seconds, will not show live preview." + # ) + # else: + # shm_fd = open(shm_file, "rb") + # + # while True: + # # Handle interruption + # if cancelled: + # process.terminate() + # cancelled = False + # logger.info("Process cancelled, terminating") + # break + # + # # If process is zombie, break and close shm_fd + # try: + # if ps.Process(process.pid).status() == "zombie": + # exit_code = process.wait() + # break + # except ps.NoSuchProcess as e: + # logger.info( + # f"Process {process.pid} is no longer running: {e}" + # ) + # break + # + # frame = read_shared_memory_frame( + # meta_path=VIDEO_STREAM_META_PATH, shm_fd=shm_fd + # ) + # yield frame + # time.sleep(1.0 / DEFAULT_FRAME_RATE) + # + # # Wait for GStreamer process to end if not already + # try: + # if process.poll() is None: + # exit_code = process.wait(timeout=10) + # except subprocess.TimeoutExpired: + # logger.warning( + # "Process did not exit cleanly after closing socket." + # ) + # + # else: + # # No live preview: just wait for process + # while process.poll() is None: + # if cancelled: + # process.terminate() + # cancelled = False + # logger.info("Process cancelled, terminating") + # break + # time.sleep(0.1) + # try: + # if ps.Process(process.pid).status() == "zombie": + # exit_code = process.wait() + # break + # except ps.NoSuchProcess as e: + # logger.info( + # f"Process {process.pid} is no longer running: {e}" + # ) + # break + # + # finally: + # logger.debug("Stopping frame worker thread") + # stop_event.set() + # worker_thread.join() + # if shm_fd is not None: + # with contextlib.suppress(Exception): + # shm_fd.close() + # + # # Process the output and extract FPS metrics + # for line in process_output: + # line_str = line.decode("utf-8") + # match = re.search(overall_pattern, line_str) + # if match: + # result = { + # "total_fps": float(match.group(2)), + # "number_streams": int(match.group(3)), + # "per_stream_fps": float(match.group(4)), + # } + # if result["number_streams"] == channels: + # total_fps = result["total_fps"] + # num_streams = result["number_streams"] + # per_stream_fps = result["per_stream_fps"] + # break + # + # match = re.search(avg_pattern, line_str) + # if match: + # result = { + # "total_fps": float(match.group(2)), + # "number_streams": int(match.group(3)), + # "per_stream_fps": float(match.group(4)), + # } + # avg_fps_dict[result["number_streams"]] = result + # + # match = re.search(last_pattern, line_str) + # if match: + # result = { + # "total_fps": float(match.group(2)), + # "number_streams": int(match.group(3)), + # "per_stream_fps": float(match.group(4)), + # } + # last_fps = result + # + # if total_fps is None and avg_fps_dict.keys(): + # if channels in avg_fps_dict.keys(): + # total_fps = avg_fps_dict[channels]["total_fps"] + # num_streams = avg_fps_dict[channels]["number_streams"] + # per_stream_fps = avg_fps_dict[channels]["per_stream_fps"] + # else: + # closest_match = min( + # avg_fps_dict.keys(), + # key=lambda x: abs(x - channels), + # default=None, + # ) + # total_fps = avg_fps_dict[closest_match]["total_fps"] + # num_streams = avg_fps_dict[closest_match]["number_streams"] + # per_stream_fps = avg_fps_dict[closest_match]["per_stream_fps"] + # + # if total_fps is None and last_fps: + # total_fps = last_fps["total_fps"] + # num_streams = last_fps["number_streams"] + # per_stream_fps = last_fps["per_stream_fps"] + # + # if total_fps is None: + # total_fps = "N/A" + # num_streams = "N/A" + # per_stream_fps = "N/A" + # + # # Save results + # results.append( + # { + # "params": params, + # "exit_code": exit_code, + # "total_fps": total_fps, + # "per_stream_fps": per_stream_fps, + # "num_streams": num_streams, + # "stdout": "".join( + # [ + # line.decode("utf-8", errors="replace") + # for line in process_output + # ] + # ), + # "stderr": "".join( + # [ + # line.decode("utf-8", errors="replace") + # for line in process_stderr + # ] + # ), + # } + # ) except subprocess.CalledProcessError as e: logger.error(f"Error: {e}")