diff --git a/libraries/dl-streamer/samples/gstreamer/README.md b/libraries/dl-streamer/samples/gstreamer/README.md
index d11784428..5a983fd0d 100644
--- a/libraries/dl-streamer/samples/gstreamer/README.md
+++ b/libraries/dl-streamer/samples/gstreamer/README.md
@@ -27,6 +27,7 @@ Samples separated into several categories
3. Python samples
* [Hello DLStreamer Sample](./python/hello_dlstreamer/README.md) - constructs an object detection pipeline, add logic to analyze metadata and count objects and visualize results along with object count summary in a local window
* [Draw Face Attributes Python Sample](./python/draw_face_attributes/README.md) - constructs pipeline and sets Python callback to access frame metadata and visualize inference results
+ * [Open Close Valve Sample](./python/open_close_valve/README.md) - constructs pipeline with two sinks. On of them has [GStreamer valve element](https://gstreamer.freedesktop.org/documentation/coreelements/valve.html?gi-language=python), which is managed based object detection result and opened/closed by callback.
4. Benchmark
* [Benchmark Sample](./benchmark/README.md) - measures overall performance of single-channel or multi-channel video analytics pipelines
diff --git a/libraries/dl-streamer/samples/gstreamer/python/open_close_valve/README.md b/libraries/dl-streamer/samples/gstreamer/python/open_close_valve/README.md
new file mode 100644
index 000000000..ca213088f
--- /dev/null
+++ b/libraries/dl-streamer/samples/gstreamer/python/open_close_valve/README.md
@@ -0,0 +1,298 @@
+# Valve Open/Close Detection Sample for DLStreamer
+
+
+### Overview
+This sample demonstrates how to build an application that constructs and executes a DLStreamer pipeline with two outputs, where one output is controlled by GStreamer's __valve__ element using a callback method. The application creates two parallel video streams with __intelligent valve control__ based on real-time truck detection using AI models.
+### Key Features
+- Dual Video Streams: Two synchronized output windows with valve-controlled flow
+- AI Truck Detection: YOLO11s model for real-time object detection
+- Smart Valve Control: Second stream activates only when trucks are detected
+- Object Tracking: Maintains object identity across frames
+- Vehicle Classification: Detailed vehicle attribute analysis
+- __Callback-Based__ Control: Demonstrates probe callback implementation
+
+
+```mermaid
+flowchart LR
+ A[filesrc] --> B[decodebin3]
+ B --> C{tee}
+ C --> |Video stream| D[gvadetect]
+ C --> |Video stream| E{Valve element}
+ F ----> G[Output 1]
+ D -->|Metadata| F[gvaclassify]
+ F --> H(callback for gvaclassify pad)
+ H --> |Open/Close|E
+ E --> I[Output 2]
+
+```
+The individual pipeline stages implement the following functions:
+* __filesrc__ element reads video stream from a local file
+* __decodebin3__ element decodes video stream into individual frames
+* __gvadetect__ element runs AI inference object detection for each frame
+* __gvaclassify__ element runs AI inference object classification for each frame
+* __gvawatermark__ element draws (overlays) object bounding boxes on top of analyzed frames
+* __autovideosink__ element renders video stream on local display
+
+In addition, the sample uses 'queue' and 'videoconvert' elements to adapt interface between functional stages.
+
+## How It Works
+
+### Pipeline creations steps
+#### Initiaization
+
+###### Initialize GStreamer framework
+
+```
+Gst.init(None) # Initialize GStreamer
+```
+- Initialize GStreamer framework
+
+###### Controller creation
+
+```
+controller = DualStreamController(video_source)
+```
+
+- Create controller instance with video file path
+- Initialize pipeline components (valve, classifier elements)
+
+
+#### Pipeline construction
+
+###### Video Source & Preprocessing
+
+```
+filesrc location=video.mp4 ! decodebin3 !
+videoconvert ! videoscale ! videorate ! video/x-raw,width=640,height=480,framerate=30/1,format=I420
+```
+- Input: MP4 video file
+- Processing: Decode, convert, scale to 640x480, set 30 FPS
+- Output: I420 format for AI processing
+
+
+###### Stream Splitting
+```
+tee name=main_tee allow-not-linked=false
+```
+- Creates identical copies of video stream
+
+
+#### Preview Stream (Always Active)
+
+```
+queue name=preview_queue max-size-buffers=30 !
+identity name=sync_point1 sync=true !
+```
+- Buffer: 30 frames capacity
+- Sync: Maintains timing consistency
+
+###### Text Overlay
+```
+textoverlay name=ai_overlay text="Detection Video Stream"
+ valignment=bottom halignment=center
+ font-desc="Sans Bold 14" color=0xFF000000
+```
+- Add stream identification label
+
+
+###### AI Detection Chain
+```
+gvadetect model=yolo11s.xml threshold=0.6 inference-interval=10 !
+gvatrack name=object_tracker !
+gvaclassify model=vehicle-attributes-recognition.xml inference-interval=1 !
+```
+
+- Detection: YOLO11s model, 60% confidence threshold, every 10th frame
+- Tracking: Maintain object identity across frames
+- Classification: Vehicle attributes, every frame for detected objects
+
+###### Visualization & Output
+
+
+```
+gvawatermark name=preview_watermark !
+videoconvert ! autovideosink sync=true
+```
+- Watermark: Renders detection results as overlays
+- Output: First video window (always visible)
+
+
+#### Valve Stream (Controlled)
+
+
+
+###### Valve Control Element
+
+```
+valve name=control_valve drop=false !
+```
+- Control video flow based on detection results
+- States:
+ - ```drop=false``` Video flows through
+ - ```drop=true``` Video blocked
+
+You can learn more about the valve element and its properties in the [GStreamer documentation](https://gstreamer.freedesktop.org/documentation/coreelements/valve.html?gi-language=python).
+
+
+#### Processing Chain
+```
+textoverlay text="Valve Stream" !
+gvadetect model=yolo11s.xml inference-interval=1 !
+gvawatermark ! videoconvert ! autovideosink sync=true
+```
+- Secondary stream with independent AI processing:
+ - Detection: Every frame for responsive control
+ - Output: Second video window (conditional visibility)
+
+
+#### Callback Setup
+###### Element Reference Retrieval
+
+```
+self.pre_view_classify = self.pipeline.get_by_name("pre_view_classify")
+pre_view_classify_pad = self.pre_view_classify.get_static_pad("sink")
+```
+
+- Get reference to classification element's input pad
+
+
+
+###### Probe Attachment
+```
+pre_view_classify_pad.add_probe(
+ Gst.PadProbeType.BUFFER,
+ self.object_detector_callback,
+ 0
+)
+```
+- Attach callback function to monitor detection results:
+ - Trigger: Every buffer passing through classifier
+ - Function: **object_detector_callback** processes AI results
+
+
+#### Detection Callback Logic
+###### Metadata Extraction
+```
+rmeta = GstAnalytics.buffer_get_analytics_relation_meta(buffer)
+for mtd in rmeta:
+ if type(mtd) == GstAnalytics.ODMtd:
+ object_type = GLib.quark_to_string(mtd.get_obj_type())
+```
+- Extract AI detection results from video buffer:
+ - Analytics Meta: Contains detection information
+ - Object Type: Identified object class (truck, car, etc.)
+
+
+###### Valve Control Decision
+```
+if object_type == "truck":
+ self.open_valve() # Set drop=false
+else:
+ self.close_valve() # Set drop=true
+```
+- Control valve based on detection results:
+ - __Truck Detected__: Enable second stream
+ - __No Truck__: Disable second stream
+
+
+#### Pipeline Execution
+###### State Management
+```
+controller.pipeline.set_state(Gst.State.PLAYING)
+```
+- Start video processing:
+ - Transitions pipeline from NULL to PLAYING state
+ - Begins video flow and AI processing
+
+###### Message loop
+```
+while not terminate:
+ msg = bus.timed_pop_filtered(
+ Gst.CLOCK_TIME_NONE,
+ Gst.MessageType.EOS | Gst.MessageType.ERROR
+ )
+```
+- Monitor pipeline status and handle events"
+ - EOS: End of stream (video finished)
+ - ERROR: Pipeline errors
+ - Interrupt: Ctrl+C handling
+
+
+#### Runtime Behavior
+###### Continuous Operation
+```mermaid
+sequenceDiagram
+ participant V as Video Stream
+ participant AI as AI Detection
+ participant C as Callback
+ participant Valve as Valve Element
+ participant S as Second Stream
+
+ loop Every Frame
+ V->>AI: Video buffer
+ AI->>C: Detection results
+ C->>C: Check for truck
+ alt Truck found
+ C->>Valve: drop=false
+ Valve->>S: Allow video
+ else No truck
+ C->>Valve: drop=true
+ Valve->>S: Block video
+ end
+ end
+```
+
+- Output Streams:
+ - Stream 1: Always displays video with AI annotations
+ - Stream 2: Shows video only when trucks are detected
+ - Console: Real-time detection events and system status
+
+
+## Prerequisite
+This sample application requires video, models and proc-model files. Those files can be downloaded in the following way:
+
+
+#### Video sample
+Sample video file are available in the following repo: [edge-ai-resources](https://github.com/open-edge-platform/edge-ai-resources). But that particular one used in this sample can be downloaded by following command(s):
+```
+mkdir videos
+wget -P ./videos https://github.com/open-edge-platform/edge-ai-resources/raw/main/videos/cars_extended.mp4
+```
+
+
+#### Model file
+
+All models OpenVINO can be downloaded by [Open Model Zoo](../../../download_omz_models.sh) download script. But the one used in this sample can be downloaded by following commands:
+
+```
+mkdir models
+wget -P ./models https://storage.openvinotoolkit.org/repositories/open_model_zoo/2023.0/models_bin/1/vehicle-attributes-recognition-barrier-0039/FP16/vehicle-attributes-recognition-barrier-0039.xml
+wget -P ./models https://storage.openvinotoolkit.org/repositories/open_model_zoo/2023.0/models_bin/1/vehicle-attributes-recognition-barrier-0039/FP16/vehicle-attributes-recognition-barrier-0039.bin
+```
+
+#### post-proc file
+If your current working directory is the open-close-valve-sample folder, the model-proc file should be available at the following location:
+```
+ls ../../model_proc/intel/vehicle-attributes-recognition-barrier-0039.json
+```
+Otherwise, the user must adjust the file path appropriately.
+
+## Sample execution
+
+#### Command line
+
+```
+: python3 open_close_valve_sample.py
+```
+## Sample Output
+
+The sample:
+* Opens two (windows) sinks
+* One window continuously displays the video file.
+* The second window becomes active only when a truck is detected in the first window. The valve element is operated (opened/closed) depending on whether a truck is detected.
+* While both windows are active, they display the same synchronized content.
+
+
+
+## See also
+* [Samples overview](../../README.md)
\ No newline at end of file
diff --git a/libraries/dl-streamer/samples/gstreamer/python/open_close_valve/open_close_valve_sample.py b/libraries/dl-streamer/samples/gstreamer/python/open_close_valve/open_close_valve_sample.py
new file mode 100644
index 000000000..8d73f0590
--- /dev/null
+++ b/libraries/dl-streamer/samples/gstreamer/python/open_close_valve/open_close_valve_sample.py
@@ -0,0 +1,294 @@
+#!/usr/bin/env python3
+# Copyright (C) 2025 Intel Corporation
+#
+# SPDX-License-Identifier: MIT
+# ==============================================================================
+"""
+DL Streamer Open/Close Valve Sample.
+
+This module demonstrates dual GStreamer pipeline control with a valve element
+to dynamically route video streams based on object detection results.
+"""
+
+import sys
+import time
+#from contextlib import contextmanager
+import gi
+gi.require_version('Gst', '1.0')
+gi.require_version('GstAnalytics', '1.0')
+from gi.repository import Gst, GLib, GstAnalytics
+
+
+class DualStreamController:
+ """Class to create and control dual GStreamer streams with a valve element."""
+ def __init__(self, video_source):
+ """
+ Initialize dual stream controller
+ """
+
+ Gst.init(None)
+ self.video_source = video_source
+ self.pipeline = None
+ self.valve = None
+ self.pre_view_classify = None
+ self.loop = GLib.MainLoop()
+ self.valve_opened = False
+
+
+ def create_pipeline(self):
+ """Create GStreamer pipeline with dual streams"""
+
+ # Define source element string
+ source_str = f"filesrc location={self.video_source} ! decodebin3"
+
+ # Build pipeline string:
+ pipeline_str = f"""
+ {source_str} !
+ videoconvert ! videoscale ! videorate !
+ video/x-raw,width=640,height=480,framerate=30/1,format=I420 !
+
+ tee name=main_tee allow-not-linked=false !
+
+ queue name=preview_queue
+ max-size-buffers=30
+ max-size-bytes=0
+ max-size-time=300000000
+ leaky=no
+ flush-on-eos=true !
+ identity name=sync_point1 sync=true drop-probability=0.0 !
+ textoverlay name=ai_overlay
+ text="Detection Video Stream"
+ valignment=bottom halignment=center
+ font-desc="Sans Bold 14" color=0xFF000000 !
+ gvadetect model=/home/labrat/models/public/yolo11s/FP32/yolo11s.xml
+ pre-process-backend=opencv
+ device=CPU
+ threshold=0.6
+ inference-interval=10
+ inference-region=0
+ model-instance-id=inst0
+ batch-size=1
+ name=detection !
+ gvatrack name=object_tracker !
+ queue name=detect_to_classify
+ max-size-buffers=20
+ leaky=no !
+ gvaclassify model=./models/vehicle-attributes-recognition-barrier-0039.xml
+ model-proc=../../model_proc/intel/vehicle-attributes-recognition-barrier-0039.json
+ device=CPU
+ inference-interval=1
+ name=pre_view_classify !
+ gvawatermark name=preview_watermark !
+ videoconvert name=preview_convert !
+ autovideosink name=preview_sink
+ sync=true
+
+ main_tee. !
+ queue name=stream1_queue
+ max-size-buffers=3
+ max-size-bytes=0
+ max-size-time=300000000
+ leaky=no
+ flush-on-eos=true !
+ identity name=sync_point2 sync=true drop-probability=0.0 !
+ valve name=control_valve drop=false !
+ queue name=valve_buffer
+ max-size-buffers=1
+ leaky=no !
+ textoverlay name=valve_overlay
+ text="Valve Stream"
+ valignment=bottom halignment=center
+ font-desc="Sans Bold 14" color=0xFF10FF00 !
+ gvadetect model=/home/labrat/models/public/yolo11s/FP16/yolo11s.xml
+ pre-process-backend=opencv
+ device=CPU
+ threshold=0.6
+ inference-interval=1
+ inference-region=0
+ model-instance-id=inst0
+ batch-size=1
+ name=valve_detection !
+ queue name=valve_detect_to_classify
+ max-size-buffers=20
+ leaky=no !
+ gvawatermark name=valve_watermark !
+ videoconvert name=stream1_convert !
+ autovideosink name=stream1_sink
+ sync=true
+ """
+
+ try:
+ self.pipeline = Gst.parse_launch(pipeline_str)
+ if not self.pipeline:
+ print("Error: Could not create pipeline")
+ return False
+ print("Pipeline created")
+
+ # Get valve element from pipeline
+ self.valve = self.pipeline.get_by_name("control_valve")
+ if not self.valve:
+ print("Error: Could not find control_valve element")
+ return False
+ print("Found control_valve element")
+
+ # Get pre_view_classify element from pipeline
+ # Below we add a probe to the sink pad of pre_view_classify to monitor detected objects
+ self.pre_view_classify = self.pipeline.get_by_name("pre_view_classify")
+ if not self.pre_view_classify:
+ print("Error: Could not find pre_view_classify element")
+ return False
+ print("Found pre_view_classify element")
+
+ # Get sink pad of pre_view_classify
+ pre_view_classify_pad = self.pre_view_classify.get_static_pad("sink")
+ if not pre_view_classify_pad:
+ print("Unable to get sink pad of gvaclassify")
+ return False
+ print("Got sink pad of pre_view_classify")
+
+ # and add probe/callback
+ pre_view_classify_pad.add_probe(Gst.PadProbeType.BUFFER,
+ self.object_detector_callback, 0)
+ print("All elements found and pipeline created successfully")
+ except GLib.Error as e:
+ print(f"Error creating pipeline: {e}")
+ return False
+ return True
+
+ def object_detector_callback(self, pad, info, u_data):
+ """
+ Callback function for object detection probe on GStreamer pad.
+
+ Processes analytics metadata from the buffer to detect objects and control valve state.
+ When a 'truck' object is detected, the valve is opened; otherwise, it is closed.
+
+ Args:
+ pad: GStreamer pad object that triggered the probe.
+ info: Probe info containing buffer and other data.
+ u_data: User data passed to the callback.
+
+ Returns:
+ Gst.PadProbeReturn.OK: Indicates the probe should continue normal processing.
+ """
+
+ buffer = info.get_buffer()
+ if not buffer:
+ print("object_detector_callback: no buffer. Continueing...")
+ return Gst.PadProbeReturn.OK
+
+ rmeta = GstAnalytics.buffer_get_analytics_relation_meta(buffer)
+ if not rmeta:
+ return Gst.PadProbeReturn.OK
+ else:
+ for mtd in rmeta:
+ if type(mtd) == GstAnalytics.ODMtd:
+ object_type = GLib.quark_to_string(mtd.get_obj_type())
+ if object_type=="truck":
+ self.open_valve()
+ else:
+ self.close_valve()
+ return Gst.PadProbeReturn.OK
+
+
+ def start(self):
+ """
+ Start the GStreamer pipeline and initialize monitoring threads.
+
+ Returns:
+ bool: True if the pipeline started successfully, False otherwise.
+ """
+
+ # Start pipeline
+ if not self.pipeline:
+ print("Controller: No pipeline to start")
+ return False
+
+ # Set pipeline to PLAYING state
+ if self.pipeline.set_state(Gst.State.PLAYING) == Gst.StateChangeReturn.FAILURE:
+ print("Failed to start pipeline")
+ return False
+
+ # Wait a moment for pipeline to initialize
+ time.sleep(1)
+
+ print("Pipeline started successfully")
+ return True
+
+
+ def open_valve(self):
+ """Open valve - enable controlled stream (drop=false)"""
+ if self.valve:
+ self.valve.set_property("drop", False)
+ self.valve_opened = True
+ else:
+ print("Valve not available")
+
+ def close_valve(self):
+ """Close valve - disable controlled stream (drop=true)"""
+ if self.valve:
+ self.valve.set_property("drop", True)
+ self.valve_opened = False
+ else:
+ print("Controller: Valve not available")
+
+
+def display_header():
+ """Display the header information for the Open/Close Valve sample."""
+ print("\n# ====================================== #")
+ print("# Copyright (C) 2025 Intel Corporation #")
+ print("# #")
+ print("# SPDX-License-Identifier: MIT #")
+ print("# ====================================== #")
+ print("# DL Streamer Open/Close Valve Sample #")
+ print("# ====================================== #\n")
+
+def main():
+ """Main function to run the Open/Close Valve sample."""
+ # Display header
+ display_header()
+
+ # Create dual stream controller
+ video_source = "./videos/cars_extended.mp4"
+ controller = DualStreamController(video_source)
+
+ # Create pipeline
+ if not controller.create_pipeline():
+ print("Failed to create pipeline. Exiting...")
+ return 1
+
+ # Get pipeline bus
+ bus = controller.pipeline.get_bus()
+
+ # Start pipeline
+ if not controller.start():
+ print("Failed to start pipeline. Exiting...")
+ return 1
+
+ # Run until interrupted
+ print("Running pipeline. Press Ctrl+C to stop...")
+ terminate = False
+ try:
+ while not terminate:
+ msg = bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE,
+ Gst.MessageType.EOS | Gst.MessageType.ERROR)
+ if msg:
+ if msg.type == Gst.MessageType.ERROR:
+ err, debug_info = msg.parse_error()
+ print(f"Error received from element {msg.src.get_name()}")
+ print(f"Debug info: {debug_info}")
+ terminate = True
+ if msg.type == Gst.MessageType.EOS:
+ print("Pipeline complete.")
+ terminate = True
+ except KeyboardInterrupt as e:
+ print(f"Interrupted by user. Stopping pipeline...[{e}]")
+ terminate = True
+ except GLib.Error as e:
+ print(f"Exception occurred: {e}")
+
+ # Stop pipeline
+ controller.pipeline.set_state(Gst.State.NULL)
+ return 0
+
+if __name__ == "__main__":
+ sys.exit(main())