diff --git a/libraries/dl-streamer/samples/gstreamer/README.md b/libraries/dl-streamer/samples/gstreamer/README.md index d11784428..5a983fd0d 100644 --- a/libraries/dl-streamer/samples/gstreamer/README.md +++ b/libraries/dl-streamer/samples/gstreamer/README.md @@ -27,6 +27,7 @@ Samples separated into several categories 3. Python samples * [Hello DLStreamer Sample](./python/hello_dlstreamer/README.md) - constructs an object detection pipeline, add logic to analyze metadata and count objects and visualize results along with object count summary in a local window * [Draw Face Attributes Python Sample](./python/draw_face_attributes/README.md) - constructs pipeline and sets Python callback to access frame metadata and visualize inference results + * [Open Close Valve Sample](./python/open_close_valve/README.md) - constructs pipeline with two sinks. On of them has [GStreamer valve element](https://gstreamer.freedesktop.org/documentation/coreelements/valve.html?gi-language=python), which is managed based object detection result and opened/closed by callback. 4. Benchmark * [Benchmark Sample](./benchmark/README.md) - measures overall performance of single-channel or multi-channel video analytics pipelines diff --git a/libraries/dl-streamer/samples/gstreamer/python/open_close_valve/README.md b/libraries/dl-streamer/samples/gstreamer/python/open_close_valve/README.md new file mode 100644 index 000000000..ca213088f --- /dev/null +++ b/libraries/dl-streamer/samples/gstreamer/python/open_close_valve/README.md @@ -0,0 +1,298 @@ +# Valve Open/Close Detection Sample for DLStreamer + + +### Overview +This sample demonstrates how to build an application that constructs and executes a DLStreamer pipeline with two outputs, where one output is controlled by GStreamer's __valve__ element using a callback method. The application creates two parallel video streams with __intelligent valve control__ based on real-time truck detection using AI models. +### Key Features +- Dual Video Streams: Two synchronized output windows with valve-controlled flow +- AI Truck Detection: YOLO11s model for real-time object detection +- Smart Valve Control: Second stream activates only when trucks are detected +- Object Tracking: Maintains object identity across frames +- Vehicle Classification: Detailed vehicle attribute analysis +- __Callback-Based__ Control: Demonstrates probe callback implementation + + +```mermaid +flowchart LR + A[filesrc] --> B[decodebin3] + B --> C{tee} + C --> |Video stream| D[gvadetect] + C --> |Video stream| E{Valve element} + F ----> G[Output 1] + D -->|Metadata| F[gvaclassify] + F --> H(callback for gvaclassify pad) + H --> |Open/Close|E + E --> I[Output 2] + +``` +The individual pipeline stages implement the following functions: +* __filesrc__ element reads video stream from a local file +* __decodebin3__ element decodes video stream into individual frames +* __gvadetect__ element runs AI inference object detection for each frame +* __gvaclassify__ element runs AI inference object classification for each frame +* __gvawatermark__ element draws (overlays) object bounding boxes on top of analyzed frames +* __autovideosink__ element renders video stream on local display + +In addition, the sample uses 'queue' and 'videoconvert' elements to adapt interface between functional stages. + +## How It Works + +### Pipeline creations steps +#### Initiaization + +###### Initialize GStreamer framework + +``` +Gst.init(None) # Initialize GStreamer +``` +- Initialize GStreamer framework + +###### Controller creation + +``` +controller = DualStreamController(video_source) +``` + +- Create controller instance with video file path +- Initialize pipeline components (valve, classifier elements) +
+ +#### Pipeline construction + +###### Video Source & Preprocessing + +``` +filesrc location=video.mp4 ! decodebin3 ! +videoconvert ! videoscale ! videorate ! video/x-raw,width=640,height=480,framerate=30/1,format=I420 +``` +- Input: MP4 video file +- Processing: Decode, convert, scale to 640x480, set 30 FPS +- Output: I420 format for AI processing + + +###### Stream Splitting +``` +tee name=main_tee allow-not-linked=false +``` +- Creates identical copies of video stream +
+ +#### Preview Stream (Always Active) + +``` +queue name=preview_queue max-size-buffers=30 ! +identity name=sync_point1 sync=true ! +``` +- Buffer: 30 frames capacity +- Sync: Maintains timing consistency + +###### Text Overlay +``` +textoverlay name=ai_overlay text="Detection Video Stream" + valignment=bottom halignment=center + font-desc="Sans Bold 14" color=0xFF000000 +``` +- Add stream identification label + + +###### AI Detection Chain +``` +gvadetect model=yolo11s.xml threshold=0.6 inference-interval=10 ! +gvatrack name=object_tracker ! +gvaclassify model=vehicle-attributes-recognition.xml inference-interval=1 ! +``` + +- Detection: YOLO11s model, 60% confidence threshold, every 10th frame +- Tracking: Maintain object identity across frames +- Classification: Vehicle attributes, every frame for detected objects + +###### Visualization & Output + + +``` +gvawatermark name=preview_watermark ! +videoconvert ! autovideosink sync=true +``` +- Watermark: Renders detection results as overlays +- Output: First video window (always visible) +
+ +#### Valve Stream (Controlled) + + + +###### Valve Control Element + +``` +valve name=control_valve drop=false ! +``` +- Control video flow based on detection results +- States: + - ```drop=false``` Video flows through + - ```drop=true``` Video blocked + +You can learn more about the valve element and its properties in the [GStreamer documentation](https://gstreamer.freedesktop.org/documentation/coreelements/valve.html?gi-language=python). +
+ +#### Processing Chain +``` +textoverlay text="Valve Stream" ! +gvadetect model=yolo11s.xml inference-interval=1 ! +gvawatermark ! videoconvert ! autovideosink sync=true +``` +- Secondary stream with independent AI processing: + - Detection: Every frame for responsive control + - Output: Second video window (conditional visibility) +
+ +#### Callback Setup +###### Element Reference Retrieval + +``` +self.pre_view_classify = self.pipeline.get_by_name("pre_view_classify") +pre_view_classify_pad = self.pre_view_classify.get_static_pad("sink") +``` + +- Get reference to classification element's input pad + + + +###### Probe Attachment +``` +pre_view_classify_pad.add_probe( + Gst.PadProbeType.BUFFER, + self.object_detector_callback, + 0 +) +``` +- Attach callback function to monitor detection results: + - Trigger: Every buffer passing through classifier + - Function: **object_detector_callback** processes AI results +
+ +#### Detection Callback Logic +###### Metadata Extraction +``` +rmeta = GstAnalytics.buffer_get_analytics_relation_meta(buffer) +for mtd in rmeta: + if type(mtd) == GstAnalytics.ODMtd: + object_type = GLib.quark_to_string(mtd.get_obj_type()) +``` +- Extract AI detection results from video buffer: + - Analytics Meta: Contains detection information + - Object Type: Identified object class (truck, car, etc.) + + +###### Valve Control Decision +``` +if object_type == "truck": + self.open_valve() # Set drop=false +else: + self.close_valve() # Set drop=true +``` +- Control valve based on detection results: + - __Truck Detected__: Enable second stream + - __No Truck__: Disable second stream +
+ +#### Pipeline Execution +###### State Management +``` +controller.pipeline.set_state(Gst.State.PLAYING) +``` +- Start video processing: + - Transitions pipeline from NULL to PLAYING state + - Begins video flow and AI processing + +###### Message loop +``` +while not terminate: + msg = bus.timed_pop_filtered( + Gst.CLOCK_TIME_NONE, + Gst.MessageType.EOS | Gst.MessageType.ERROR + ) +``` +- Monitor pipeline status and handle events" + - EOS: End of stream (video finished) + - ERROR: Pipeline errors + - Interrupt: Ctrl+C handling +
+ +#### Runtime Behavior +###### Continuous Operation +```mermaid +sequenceDiagram + participant V as Video Stream + participant AI as AI Detection + participant C as Callback + participant Valve as Valve Element + participant S as Second Stream + + loop Every Frame + V->>AI: Video buffer + AI->>C: Detection results + C->>C: Check for truck + alt Truck found + C->>Valve: drop=false + Valve->>S: Allow video + else No truck + C->>Valve: drop=true + Valve->>S: Block video + end + end +``` + +- Output Streams: + - Stream 1: Always displays video with AI annotations + - Stream 2: Shows video only when trucks are detected + - Console: Real-time detection events and system status +
+ +## Prerequisite +This sample application requires video, models and proc-model files. Those files can be downloaded in the following way: +
+ +#### Video sample +Sample video file are available in the following repo: [edge-ai-resources](https://github.com/open-edge-platform/edge-ai-resources). But that particular one used in this sample can be downloaded by following command(s): +``` +mkdir videos +wget -P ./videos https://github.com/open-edge-platform/edge-ai-resources/raw/main/videos/cars_extended.mp4 +``` +
+ +#### Model file + +All models OpenVINO can be downloaded by [Open Model Zoo](../../../download_omz_models.sh) download script. But the one used in this sample can be downloaded by following commands: + +``` +mkdir models +wget -P ./models https://storage.openvinotoolkit.org/repositories/open_model_zoo/2023.0/models_bin/1/vehicle-attributes-recognition-barrier-0039/FP16/vehicle-attributes-recognition-barrier-0039.xml +wget -P ./models https://storage.openvinotoolkit.org/repositories/open_model_zoo/2023.0/models_bin/1/vehicle-attributes-recognition-barrier-0039/FP16/vehicle-attributes-recognition-barrier-0039.bin +``` + +#### post-proc file +If your current working directory is the open-close-valve-sample folder, the model-proc file should be available at the following location: +``` +ls ../../model_proc/intel/vehicle-attributes-recognition-barrier-0039.json +``` +Otherwise, the user must adjust the file path appropriately. + +## Sample execution + +#### Command line + +``` +: python3 open_close_valve_sample.py +``` +## Sample Output + +The sample: +* Opens two (windows) sinks +* One window continuously displays the video file. +* The second window becomes active only when a truck is detected in the first window. The valve element is operated (opened/closed) depending on whether a truck is detected. +* While both windows are active, they display the same synchronized content. + + + +## See also +* [Samples overview](../../README.md) \ No newline at end of file diff --git a/libraries/dl-streamer/samples/gstreamer/python/open_close_valve/open_close_valve_sample.py b/libraries/dl-streamer/samples/gstreamer/python/open_close_valve/open_close_valve_sample.py new file mode 100644 index 000000000..8d73f0590 --- /dev/null +++ b/libraries/dl-streamer/samples/gstreamer/python/open_close_valve/open_close_valve_sample.py @@ -0,0 +1,294 @@ +#!/usr/bin/env python3 +# Copyright (C) 2025 Intel Corporation +# +# SPDX-License-Identifier: MIT +# ============================================================================== +""" +DL Streamer Open/Close Valve Sample. + +This module demonstrates dual GStreamer pipeline control with a valve element +to dynamically route video streams based on object detection results. +""" + +import sys +import time +#from contextlib import contextmanager +import gi +gi.require_version('Gst', '1.0') +gi.require_version('GstAnalytics', '1.0') +from gi.repository import Gst, GLib, GstAnalytics + + +class DualStreamController: + """Class to create and control dual GStreamer streams with a valve element.""" + def __init__(self, video_source): + """ + Initialize dual stream controller + """ + + Gst.init(None) + self.video_source = video_source + self.pipeline = None + self.valve = None + self.pre_view_classify = None + self.loop = GLib.MainLoop() + self.valve_opened = False + + + def create_pipeline(self): + """Create GStreamer pipeline with dual streams""" + + # Define source element string + source_str = f"filesrc location={self.video_source} ! decodebin3" + + # Build pipeline string: + pipeline_str = f""" + {source_str} ! + videoconvert ! videoscale ! videorate ! + video/x-raw,width=640,height=480,framerate=30/1,format=I420 ! + + tee name=main_tee allow-not-linked=false ! + + queue name=preview_queue + max-size-buffers=30 + max-size-bytes=0 + max-size-time=300000000 + leaky=no + flush-on-eos=true ! + identity name=sync_point1 sync=true drop-probability=0.0 ! + textoverlay name=ai_overlay + text="Detection Video Stream" + valignment=bottom halignment=center + font-desc="Sans Bold 14" color=0xFF000000 ! + gvadetect model=/home/labrat/models/public/yolo11s/FP32/yolo11s.xml + pre-process-backend=opencv + device=CPU + threshold=0.6 + inference-interval=10 + inference-region=0 + model-instance-id=inst0 + batch-size=1 + name=detection ! + gvatrack name=object_tracker ! + queue name=detect_to_classify + max-size-buffers=20 + leaky=no ! + gvaclassify model=./models/vehicle-attributes-recognition-barrier-0039.xml + model-proc=../../model_proc/intel/vehicle-attributes-recognition-barrier-0039.json + device=CPU + inference-interval=1 + name=pre_view_classify ! + gvawatermark name=preview_watermark ! + videoconvert name=preview_convert ! + autovideosink name=preview_sink + sync=true + + main_tee. ! + queue name=stream1_queue + max-size-buffers=3 + max-size-bytes=0 + max-size-time=300000000 + leaky=no + flush-on-eos=true ! + identity name=sync_point2 sync=true drop-probability=0.0 ! + valve name=control_valve drop=false ! + queue name=valve_buffer + max-size-buffers=1 + leaky=no ! + textoverlay name=valve_overlay + text="Valve Stream" + valignment=bottom halignment=center + font-desc="Sans Bold 14" color=0xFF10FF00 ! + gvadetect model=/home/labrat/models/public/yolo11s/FP16/yolo11s.xml + pre-process-backend=opencv + device=CPU + threshold=0.6 + inference-interval=1 + inference-region=0 + model-instance-id=inst0 + batch-size=1 + name=valve_detection ! + queue name=valve_detect_to_classify + max-size-buffers=20 + leaky=no ! + gvawatermark name=valve_watermark ! + videoconvert name=stream1_convert ! + autovideosink name=stream1_sink + sync=true + """ + + try: + self.pipeline = Gst.parse_launch(pipeline_str) + if not self.pipeline: + print("Error: Could not create pipeline") + return False + print("Pipeline created") + + # Get valve element from pipeline + self.valve = self.pipeline.get_by_name("control_valve") + if not self.valve: + print("Error: Could not find control_valve element") + return False + print("Found control_valve element") + + # Get pre_view_classify element from pipeline + # Below we add a probe to the sink pad of pre_view_classify to monitor detected objects + self.pre_view_classify = self.pipeline.get_by_name("pre_view_classify") + if not self.pre_view_classify: + print("Error: Could not find pre_view_classify element") + return False + print("Found pre_view_classify element") + + # Get sink pad of pre_view_classify + pre_view_classify_pad = self.pre_view_classify.get_static_pad("sink") + if not pre_view_classify_pad: + print("Unable to get sink pad of gvaclassify") + return False + print("Got sink pad of pre_view_classify") + + # and add probe/callback + pre_view_classify_pad.add_probe(Gst.PadProbeType.BUFFER, + self.object_detector_callback, 0) + print("All elements found and pipeline created successfully") + except GLib.Error as e: + print(f"Error creating pipeline: {e}") + return False + return True + + def object_detector_callback(self, pad, info, u_data): + """ + Callback function for object detection probe on GStreamer pad. + + Processes analytics metadata from the buffer to detect objects and control valve state. + When a 'truck' object is detected, the valve is opened; otherwise, it is closed. + + Args: + pad: GStreamer pad object that triggered the probe. + info: Probe info containing buffer and other data. + u_data: User data passed to the callback. + + Returns: + Gst.PadProbeReturn.OK: Indicates the probe should continue normal processing. + """ + + buffer = info.get_buffer() + if not buffer: + print("object_detector_callback: no buffer. Continueing...") + return Gst.PadProbeReturn.OK + + rmeta = GstAnalytics.buffer_get_analytics_relation_meta(buffer) + if not rmeta: + return Gst.PadProbeReturn.OK + else: + for mtd in rmeta: + if type(mtd) == GstAnalytics.ODMtd: + object_type = GLib.quark_to_string(mtd.get_obj_type()) + if object_type=="truck": + self.open_valve() + else: + self.close_valve() + return Gst.PadProbeReturn.OK + + + def start(self): + """ + Start the GStreamer pipeline and initialize monitoring threads. + + Returns: + bool: True if the pipeline started successfully, False otherwise. + """ + + # Start pipeline + if not self.pipeline: + print("Controller: No pipeline to start") + return False + + # Set pipeline to PLAYING state + if self.pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE: + print("Failed to start pipeline") + return False + + # Wait a moment for pipeline to initialize + time.sleep(1) + + print("Pipeline started successfully") + return True + + + def open_valve(self): + """Open valve - enable controlled stream (drop=false)""" + if self.valve: + self.valve.set_property("drop", False) + self.valve_opened = True + else: + print("Valve not available") + + def close_valve(self): + """Close valve - disable controlled stream (drop=true)""" + if self.valve: + self.valve.set_property("drop", True) + self.valve_opened = False + else: + print("Controller: Valve not available") + + +def display_header(): + """Display the header information for the Open/Close Valve sample.""" + print("\n# ====================================== #") + print("# Copyright (C) 2025 Intel Corporation #") + print("# #") + print("# SPDX-License-Identifier: MIT #") + print("# ====================================== #") + print("# DL Streamer Open/Close Valve Sample #") + print("# ====================================== #\n") + +def main(): + """Main function to run the Open/Close Valve sample.""" + # Display header + display_header() + + # Create dual stream controller + video_source = "./videos/cars_extended.mp4" + controller = DualStreamController(video_source) + + # Create pipeline + if not controller.create_pipeline(): + print("Failed to create pipeline. Exiting...") + return 1 + + # Get pipeline bus + bus = controller.pipeline.get_bus() + + # Start pipeline + if not controller.start(): + print("Failed to start pipeline. Exiting...") + return 1 + + # Run until interrupted + print("Running pipeline. Press Ctrl+C to stop...") + terminate = False + try: + while not terminate: + msg = bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE, + Gst.MessageType.EOS | Gst.MessageType.ERROR) + if msg: + if msg.type == Gst.MessageType.ERROR: + err, debug_info = msg.parse_error() + print(f"Error received from element {msg.src.get_name()}") + print(f"Debug info: {debug_info}") + terminate = True + if msg.type == Gst.MessageType.EOS: + print("Pipeline complete.") + terminate = True + except KeyboardInterrupt as e: + print(f"Interrupted by user. Stopping pipeline...[{e}]") + terminate = True + except GLib.Error as e: + print(f"Exception occurred: {e}") + + # Stop pipeline + controller.pipeline.set_state(Gst.State.NULL) + return 0 + +if __name__ == "__main__": + sys.exit(main())