diff --git a/datafusion/core/tests/sql/explain_analyze.rs b/datafusion/core/tests/sql/explain_analyze.rs index 26b71b5496f2..e56d4e6d8b04 100644 --- a/datafusion/core/tests/sql/explain_analyze.rs +++ b/datafusion/core/tests/sql/explain_analyze.rs @@ -61,12 +61,9 @@ async fn explain_analyze_baseline_metrics() { assert_metrics!( &formatted, "AggregateExec: mode=Partial, gby=[]", - "metrics=[output_rows=3, elapsed_compute=" - ); - assert_metrics!( - &formatted, - "AggregateExec: mode=Partial, gby=[]", - "output_bytes=" + "metrics=[output_rows=3, elapsed_compute=", + "output_bytes=", + "output_batches=3" ); assert_metrics!( @@ -75,59 +72,76 @@ async fn explain_analyze_baseline_metrics() { "reduction_factor=5.1% (5/99)" ); - assert_metrics!( - &formatted, - "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]", - "metrics=[output_rows=5, elapsed_compute=" - ); - assert_metrics!( - &formatted, - "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]", - "output_bytes=" - ); - assert_metrics!( - &formatted, - "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434", - "metrics=[output_rows=99, elapsed_compute=" - ); + { + let expected_batch_count_after_repartition = + if cfg!(not(feature = "force_hash_collisions")) { + "output_batches=3" + } else { + "output_batches=1" + }; + + assert_metrics!( + &formatted, + "AggregateExec: mode=FinalPartitioned, gby=[c1@0 as c1]", + "metrics=[output_rows=5, elapsed_compute=", + "output_bytes=", + expected_batch_count_after_repartition + ); + + assert_metrics!( + &formatted, + "RepartitionExec: partitioning=Hash([c1@0], 3), input_partitions=3", + "metrics=[output_rows=5, elapsed_compute=", + "output_bytes=", + expected_batch_count_after_repartition + ); + + assert_metrics!( + &formatted, + "ProjectionExec: expr=[]", + "metrics=[output_rows=5, elapsed_compute=", + "output_bytes=", + expected_batch_count_after_repartition + ); + + assert_metrics!( + &formatted, + "CoalesceBatchesExec: target_batch_size=4096", + "metrics=[output_rows=5, elapsed_compute", + "output_bytes=", + expected_batch_count_after_repartition + ); + } + assert_metrics!( &formatted, "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434", - "output_bytes=" + "metrics=[output_rows=99, elapsed_compute=", + "output_bytes=", + "output_batches=1" ); + assert_metrics!( &formatted, "FilterExec: c13@1 != C2GT5KVyOPZpgKVl110TyZO0NcJ434", "selectivity=99% (99/100)" ); - assert_metrics!( - &formatted, - "ProjectionExec: expr=[]", - "metrics=[output_rows=5, elapsed_compute=" - ); - assert_metrics!(&formatted, "ProjectionExec: expr=[]", "output_bytes="); - assert_metrics!( - &formatted, - "CoalesceBatchesExec: target_batch_size=4096", - "metrics=[output_rows=5, elapsed_compute" - ); - assert_metrics!( - &formatted, - "CoalesceBatchesExec: target_batch_size=4096", - "output_bytes=" - ); + assert_metrics!( &formatted, "UnionExec", - "metrics=[output_rows=3, elapsed_compute=" + "metrics=[output_rows=3, elapsed_compute=", + "output_bytes=", + "output_batches=3" ); - assert_metrics!(&formatted, "UnionExec", "output_bytes="); + assert_metrics!( &formatted, "WindowAggExec", - "metrics=[output_rows=1, elapsed_compute=" + "metrics=[output_rows=1, elapsed_compute=", + "output_bytes=", + "output_batches=1" ); - assert_metrics!(&formatted, "WindowAggExec", "output_bytes="); fn expected_to_have_metrics(plan: &dyn ExecutionPlan) -> bool { use datafusion::physical_plan; @@ -228,9 +242,13 @@ async fn explain_analyze_level() { for (level, needle, should_contain) in [ (ExplainAnalyzeLevel::Summary, "spill_count", false), + (ExplainAnalyzeLevel::Summary, "output_batches", false), (ExplainAnalyzeLevel::Summary, "output_rows", true), + (ExplainAnalyzeLevel::Summary, "output_bytes", true), (ExplainAnalyzeLevel::Dev, "spill_count", true), (ExplainAnalyzeLevel::Dev, "output_rows", true), + (ExplainAnalyzeLevel::Dev, "output_bytes", true), + (ExplainAnalyzeLevel::Dev, "output_batches", true), ] { let plan = collect_plan(sql, level).await; assert_eq!( diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 743c8750b521..426ec213b324 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -40,18 +40,24 @@ use std::io::Write; use std::path::PathBuf; use tempfile::TempDir; -/// A macro to assert that some particular line contains two substrings +/// A macro to assert that some particular line contains the given substrings /// -/// Usage: `assert_metrics!(actual, operator_name, metrics)` +/// Usage: `assert_metrics!(actual, operator_name, metrics_1, metrics_2, ...)` macro_rules! assert_metrics { - ($ACTUAL: expr, $OPERATOR_NAME: expr, $METRICS: expr) => { + ($ACTUAL: expr, $OPERATOR_NAME: expr, $($METRICS: expr),+) => { let found = $ACTUAL .lines() - .any(|line| line.contains($OPERATOR_NAME) && line.contains($METRICS)); + .any(|line| line.contains($OPERATOR_NAME) $( && line.contains($METRICS))+); + + let mut metrics = String::new(); + $(metrics.push_str(format!(" '{}',", $METRICS).as_str());)+ + // remove the last `,` from the string + metrics.pop(); + assert!( found, - "Can not find a line with both '{}' and '{}' in\n\n{}", - $OPERATOR_NAME, $METRICS, $ACTUAL + "Cannot find a line with operator name '{}' and metrics containing values {} in :\n\n{}", + $OPERATOR_NAME, metrics, $ACTUAL ); }; } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index fc32bb6fc94c..2c531786c9c2 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -650,7 +650,6 @@ impl CrossJoinStream { self.left_index += 1; } - self.join_metrics.output_batches.add(1); return Ok(StatefulStreamResult::Ready(Some(batch))); } } diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index bb3465365ec9..1f4aeecb2972 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -494,7 +494,6 @@ impl HashJoinStream { &self.column_indices, self.join_type, )?; - self.join_metrics.output_batches.add(1); timer.done(); self.state = HashJoinStreamState::FetchProbeBatch; @@ -597,7 +596,6 @@ impl HashJoinStream { )? }; - self.join_metrics.output_batches.add(1); timer.done(); if next_offset.is_none() { @@ -653,8 +651,6 @@ impl HashJoinStream { if let Ok(ref batch) = result { self.join_metrics.input_batches.add(1); self.join_metrics.input_rows.add(batch.num_rows()); - - self.join_metrics.output_batches.add(1); } timer.done(); diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index 1f0cdf391c1f..9377ace33a1b 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -1483,10 +1483,6 @@ impl NestedLoopJoinStream { fn maybe_flush_ready_batch(&mut self) -> Option>>> { if self.output_buffer.has_completed_batch() { if let Some(batch) = self.output_buffer.next_completed_batch() { - // HACK: this is not part of `BaselineMetrics` yet, so update it - // manually - self.metrics.join_metrics.output_batches.add(1); - // Update output rows for selectivity metric let output_rows = batch.num_rows(); self.metrics.selectivity.add_part(output_rows); diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs b/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs index 5920cd663a77..ac476853d5d7 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs @@ -31,8 +31,6 @@ pub(super) struct SortMergeJoinMetrics { input_batches: Count, /// Number of rows consumed by this operator input_rows: Count, - /// Number of batches produced by this operator - output_batches: Count, /// Execution metrics baseline_metrics: BaselineMetrics, /// Peak memory used for buffered data. @@ -49,8 +47,6 @@ impl SortMergeJoinMetrics { let input_batches = MetricBuilder::new(metrics).counter("input_batches", partition); let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); - let output_batches = - MetricBuilder::new(metrics).counter("output_batches", partition); let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", partition); let spill_metrics = SpillMetrics::new(metrics, partition); @@ -60,7 +56,6 @@ impl SortMergeJoinMetrics { join_time, input_batches, input_rows, - output_batches, baseline_metrics, peak_mem_used, spill_metrics, @@ -82,9 +77,6 @@ impl SortMergeJoinMetrics { pub fn input_rows(&self) -> Count { self.input_rows.clone() } - pub fn output_batches(&self) -> Count { - self.output_batches.clone() - } pub fn peak_mem_used(&self) -> Gauge { self.peak_mem_used.clone() diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs index 1185866b9f46..28020450c427 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs @@ -35,6 +35,7 @@ use std::task::{Context, Poll}; use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics; use crate::joins::utils::{compare_join_arrays, JoinFilter}; +use crate::metrics::RecordOutput; use crate::spill::spill_manager::SpillManager; use crate::{PhysicalExpr, RecordBatchStream, SendableRecordBatchStream}; @@ -1462,10 +1463,7 @@ impl SortMergeJoinStream { fn output_record_batch_and_reset(&mut self) -> Result { let record_batch = concat_batches(&self.schema, &self.staging_output_record_batches.batches)?; - self.join_metrics.output_batches().add(1); - self.join_metrics - .baseline_metrics() - .record_output(record_batch.num_rows()); + (&record_batch).record_output(&self.join_metrics.baseline_metrics()); // If join filter exists, `self.output_size` is not accurate as we don't know the exact // number of rows in the output record batch. If streamed row joined with buffered rows, // once join filter is applied, the number of output rows may be more than 1. diff --git a/datafusion/physical-plan/src/joins/stream_join_utils.rs b/datafusion/physical-plan/src/joins/stream_join_utils.rs index 80221a77992c..f4a3cd92f16d 100644 --- a/datafusion/physical-plan/src/joins/stream_join_utils.rs +++ b/datafusion/physical-plan/src/joins/stream_join_utils.rs @@ -682,8 +682,6 @@ pub struct StreamJoinMetrics { pub(crate) right: StreamJoinSideMetrics, /// Memory used by sides in bytes pub(crate) stream_memory_usage: metrics::Gauge, - /// Number of batches produced by this operator - pub(crate) output_batches: metrics::Count, /// Number of rows produced by this operator pub(crate) baseline_metrics: BaselineMetrics, } @@ -709,13 +707,9 @@ impl StreamJoinMetrics { let stream_memory_usage = MetricBuilder::new(metrics).gauge("stream_memory_usage", partition); - let output_batches = - MetricBuilder::new(metrics).counter("output_batches", partition); - Self { left, right, - output_batches, stream_memory_usage, baseline_metrics: BaselineMetrics::new(metrics, partition), } diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index be4646e88bd7..a9a2bbff42c6 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -1376,7 +1376,6 @@ impl SymmetricHashJoinStream { } } Some((batch, _)) => { - self.metrics.output_batches.add(1); return self .metrics .baseline_metrics diff --git a/datafusion/physical-plan/src/joins/utils.rs b/datafusion/physical-plan/src/joins/utils.rs index 9b589b674cc5..6ff829815451 100644 --- a/datafusion/physical-plan/src/joins/utils.rs +++ b/datafusion/physical-plan/src/joins/utils.rs @@ -1327,8 +1327,6 @@ pub(crate) struct BuildProbeJoinMetrics { pub(crate) input_batches: metrics::Count, /// Number of rows consumed by probe-side this operator pub(crate) input_rows: metrics::Count, - /// Number of batches produced by this operator - pub(crate) output_batches: metrics::Count, } // This Drop implementation updates the elapsed compute part of the metrics. @@ -1372,9 +1370,6 @@ impl BuildProbeJoinMetrics { let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); - let output_batches = - MetricBuilder::new(metrics).counter("output_batches", partition); - Self { build_time, build_input_batches, @@ -1383,7 +1378,6 @@ impl BuildProbeJoinMetrics { join_time, input_batches, input_rows, - output_batches, baseline, } } diff --git a/datafusion/physical-plan/src/metrics/baseline.rs b/datafusion/physical-plan/src/metrics/baseline.rs index 858773b94664..8dc2f30d9f79 100644 --- a/datafusion/physical-plan/src/metrics/baseline.rs +++ b/datafusion/physical-plan/src/metrics/baseline.rs @@ -63,6 +63,9 @@ pub struct BaselineMetrics { /// multiple times. /// Issue: output_bytes: Count, + + /// output batches: the total output batch count + output_batches: Count, // Remember to update `docs/source/user-guide/metrics.md` when updating comments // or adding new metrics } @@ -86,6 +89,9 @@ impl BaselineMetrics { output_bytes: MetricBuilder::new(metrics) .with_type(super::MetricType::SUMMARY) .output_bytes(partition), + output_batches: MetricBuilder::new(metrics) + .with_type(super::MetricType::DEV) + .output_batches(partition), } } @@ -100,6 +106,7 @@ impl BaselineMetrics { elapsed_compute: self.elapsed_compute.clone(), output_rows: Default::default(), output_bytes: Default::default(), + output_batches: Default::default(), } } @@ -113,6 +120,11 @@ impl BaselineMetrics { &self.output_rows } + /// return the metric for the total number of output batches produced + pub fn output_batches(&self) -> &Count { + &self.output_batches + } + /// Records the fact that this operator's execution is complete /// (recording the `end_time` metric). /// @@ -229,6 +241,7 @@ impl RecordOutput for RecordBatch { bm.record_output(self.num_rows()); let n_bytes = get_record_batch_memory_size(&self); bm.output_bytes.add(n_bytes); + bm.output_batches.add(1); self } } @@ -238,6 +251,7 @@ impl RecordOutput for &RecordBatch { bm.record_output(self.num_rows()); let n_bytes = get_record_batch_memory_size(self); bm.output_bytes.add(n_bytes); + bm.output_batches.add(1); self } } diff --git a/datafusion/physical-plan/src/metrics/builder.rs b/datafusion/physical-plan/src/metrics/builder.rs index 6ea947b6d21b..91b2440122f0 100644 --- a/datafusion/physical-plan/src/metrics/builder.rs +++ b/datafusion/physical-plan/src/metrics/builder.rs @@ -161,6 +161,14 @@ impl<'a> MetricBuilder<'a> { count } + /// Consume self and create a new counter for recording total output batches + pub fn output_batches(self, partition: usize) -> Count { + let count = Count::new(); + self.with_partition(partition) + .build(MetricValue::OutputBatches(count.clone())); + count + } + /// Consume self and create a new gauge for reporting current memory usage pub fn mem_used(self, partition: usize) -> Gauge { let gauge = Gauge::new(); diff --git a/datafusion/physical-plan/src/metrics/mod.rs b/datafusion/physical-plan/src/metrics/mod.rs index 4e98af722d4e..613c031808cb 100644 --- a/datafusion/physical-plan/src/metrics/mod.rs +++ b/datafusion/physical-plan/src/metrics/mod.rs @@ -299,6 +299,7 @@ impl MetricsSet { MetricValue::SpillCount(_) => false, MetricValue::SpilledBytes(_) => false, MetricValue::OutputBytes(_) => false, + MetricValue::OutputBatches(_) => false, MetricValue::SpilledRows(_) => false, MetricValue::CurrentMemoryUsage(_) => false, MetricValue::Gauge { name, .. } => name == metric_name, diff --git a/datafusion/physical-plan/src/metrics/value.rs b/datafusion/physical-plan/src/metrics/value.rs index 298d63e5e216..7f31f757944d 100644 --- a/datafusion/physical-plan/src/metrics/value.rs +++ b/datafusion/physical-plan/src/metrics/value.rs @@ -551,6 +551,8 @@ pub enum MetricValue { SpilledBytes(Count), /// Total size of output bytes produced: "output_bytes" metric OutputBytes(Count), + /// Total number of output batches produced: "output_batches" metric + OutputBatches(Count), /// Total size of spilled rows produced: "spilled_rows" metric SpilledRows(Count), /// Current memory used @@ -618,6 +620,9 @@ impl PartialEq for MetricValue { (MetricValue::OutputBytes(count), MetricValue::OutputBytes(other)) => { count == other } + (MetricValue::OutputBatches(count), MetricValue::OutputBatches(other)) => { + count == other + } (MetricValue::SpilledRows(count), MetricValue::SpilledRows(other)) => { count == other } @@ -699,6 +704,7 @@ impl MetricValue { Self::SpillCount(_) => "spill_count", Self::SpilledBytes(_) => "spilled_bytes", Self::OutputBytes(_) => "output_bytes", + Self::OutputBatches(_) => "output_batches", Self::SpilledRows(_) => "spilled_rows", Self::CurrentMemoryUsage(_) => "mem_used", Self::ElapsedCompute(_) => "elapsed_compute", @@ -721,6 +727,7 @@ impl MetricValue { Self::SpillCount(count) => count.value(), Self::SpilledBytes(bytes) => bytes.value(), Self::OutputBytes(bytes) => bytes.value(), + Self::OutputBatches(count) => count.value(), Self::SpilledRows(count) => count.value(), Self::CurrentMemoryUsage(used) => used.value(), Self::ElapsedCompute(time) => time.value(), @@ -755,6 +762,7 @@ impl MetricValue { Self::SpillCount(_) => Self::SpillCount(Count::new()), Self::SpilledBytes(_) => Self::SpilledBytes(Count::new()), Self::OutputBytes(_) => Self::OutputBytes(Count::new()), + Self::OutputBatches(_) => Self::OutputBatches(Count::new()), Self::SpilledRows(_) => Self::SpilledRows(Count::new()), Self::CurrentMemoryUsage(_) => Self::CurrentMemoryUsage(Gauge::new()), Self::ElapsedCompute(_) => Self::ElapsedCompute(Time::new()), @@ -802,6 +810,7 @@ impl MetricValue { | (Self::SpillCount(count), Self::SpillCount(other_count)) | (Self::SpilledBytes(count), Self::SpilledBytes(other_count)) | (Self::OutputBytes(count), Self::OutputBytes(other_count)) + | (Self::OutputBatches(count), Self::OutputBatches(other_count)) | (Self::SpilledRows(count), Self::SpilledRows(other_count)) | ( Self::Count { count, .. }, @@ -879,6 +888,7 @@ impl MetricValue { Self::OutputRows(_) => 0, Self::ElapsedCompute(_) => 1, Self::OutputBytes(_) => 2, + Self::OutputBatches(_) => 3, // Other metrics Self::PruningMetrics { name, .. } => match name.as_ref() { // The following metrics belong to `DataSourceExec` with a Parquet data source. @@ -888,23 +898,23 @@ impl MetricValue { // You may update these metrics as long as their relative order remains unchanged. // // Reference PR: - "files_ranges_pruned_statistics" => 3, - "row_groups_pruned_statistics" => 4, - "row_groups_pruned_bloom_filter" => 5, - "page_index_rows_pruned" => 6, - _ => 7, + "files_ranges_pruned_statistics" => 4, + "row_groups_pruned_statistics" => 5, + "row_groups_pruned_bloom_filter" => 6, + "page_index_rows_pruned" => 7, + _ => 8, }, - Self::SpillCount(_) => 8, - Self::SpilledBytes(_) => 9, - Self::SpilledRows(_) => 10, - Self::CurrentMemoryUsage(_) => 11, - Self::Count { .. } => 12, - Self::Gauge { .. } => 13, - Self::Time { .. } => 14, - Self::Ratio { .. } => 15, - Self::StartTimestamp(_) => 16, // show timestamps last - Self::EndTimestamp(_) => 17, - Self::Custom { .. } => 18, + Self::SpillCount(_) => 9, + Self::SpilledBytes(_) => 10, + Self::SpilledRows(_) => 11, + Self::CurrentMemoryUsage(_) => 12, + Self::Count { .. } => 13, + Self::Gauge { .. } => 14, + Self::Time { .. } => 15, + Self::Ratio { .. } => 16, + Self::StartTimestamp(_) => 17, // show timestamps last + Self::EndTimestamp(_) => 18, + Self::Custom { .. } => 19, } } @@ -919,6 +929,7 @@ impl Display for MetricValue { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { Self::OutputRows(count) + | Self::OutputBatches(count) | Self::SpillCount(count) | Self::SpilledRows(count) | Self::Count { count, .. } => { diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index 74cf79889599..8f73fe86cfef 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -915,6 +915,7 @@ impl ExecutionPlan for RepartitionExec { Arc::clone(&reservation), spill_stream, 1, // Each receiver handles one input partition + BaselineMetrics::new(&metrics, partition), )) as SendableRecordBatchStream }) .collect::>(); @@ -952,6 +953,7 @@ impl ExecutionPlan for RepartitionExec { reservation, spill_stream, num_input_partitions, + BaselineMetrics::new(&metrics, partition), )) as SendableRecordBatchStream) } }) @@ -1402,6 +1404,9 @@ struct PerPartitionStream { /// In non-preserve-order mode, multiple input partitions send to the same channel, /// each sending None when complete. We must wait for all of them. remaining_partitions: usize, + + /// Execution metrics + baseline_metrics: BaselineMetrics, } impl PerPartitionStream { @@ -1412,6 +1417,7 @@ impl PerPartitionStream { reservation: SharedMemoryReservation, spill_stream: SendableRecordBatchStream, num_input_partitions: usize, + baseline_metrics: BaselineMetrics, ) -> Self { Self { schema, @@ -1421,18 +1427,17 @@ impl PerPartitionStream { spill_stream, state: StreamState::ReadingMemory, remaining_partitions: num_input_partitions, + baseline_metrics, } } -} - -impl Stream for PerPartitionStream { - type Item = Result; - fn poll_next( - mut self: Pin<&mut Self>, + fn poll_next_inner( + self: &mut Pin<&mut Self>, cx: &mut Context<'_>, - ) -> Poll> { + ) -> Poll>> { use futures::StreamExt; + let cloned_time = self.baseline_metrics.elapsed_compute().clone(); + let _timer = cloned_time.timer(); loop { match self.state { @@ -1508,6 +1513,18 @@ impl Stream for PerPartitionStream { } } +impl Stream for PerPartitionStream { + type Item = Result; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + let poll = self.poll_next_inner(cx); + self.baseline_metrics.record_poll(poll) + } +} + impl RecordBatchStream for PerPartitionStream { /// Get the schema fn schema(&self) -> SchemaRef { diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index a95fad19f614..2b31ff3da9f0 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -34,7 +34,8 @@ use crate::filter_pushdown::{ }; use crate::limit::LimitStream; use crate::metrics::{ - BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics, SplitMetrics, + BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RecordOutput, SpillMetrics, + SplitMetrics, }; use crate::projection::{make_with_child, update_ordering, ProjectionExec}; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; @@ -738,7 +739,7 @@ impl ExternalSorter { let sorted = sort_batch(&batch, &expressions, None)?; - metrics.record_output(sorted.num_rows()); + (&sorted).record_output(&metrics); drop(batch); drop(reservation); Ok(sorted) diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs index 9435de1cc448..0b5ab784df67 100644 --- a/datafusion/physical-plan/src/topk/mod.rs +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -26,7 +26,9 @@ use datafusion_expr::{ColumnarValue, Operator}; use std::mem::size_of; use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc}; -use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder}; +use super::metrics::{ + BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, RecordOutput, +}; use crate::spill::get_record_batch_memory_size; use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; @@ -596,7 +598,7 @@ impl TopK { // break into record batches as needed let mut batches = vec![]; if let Some(mut batch) = heap.emit()? { - metrics.baseline.output_rows().add(batch.num_rows()); + (&batch).record_output(&metrics.baseline); loop { if batch.num_rows() <= batch_size { diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index 7212c764130e..22132f2f8639 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -277,8 +277,6 @@ struct UnnestMetrics { input_batches: metrics::Count, /// Number of rows consumed input_rows: metrics::Count, - /// Number of batches produced - output_batches: metrics::Count, } impl UnnestMetrics { @@ -288,14 +286,10 @@ impl UnnestMetrics { let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition); - let output_batches = - MetricBuilder::new(metrics).counter("output_batches", partition); - Self { baseline_metrics: BaselineMetrics::new(metrics, partition), input_batches, input_rows, - output_batches, } } } @@ -361,7 +355,6 @@ impl UnnestStream { let Some(result_batch) = result else { continue; }; - self.metrics.output_batches.add(1); (&result_batch).record_output(&self.metrics.baseline_metrics); // Empty record batches should not be emitted. @@ -375,7 +368,7 @@ impl UnnestStream { produced {} output batches containing {} rows in {}", self.metrics.input_batches, self.metrics.input_rows, - self.metrics.output_batches, + self.metrics.baseline_metrics.output_batches(), self.metrics.baseline_metrics.output_rows(), self.metrics.baseline_metrics.elapsed_compute(), ); diff --git a/docs/source/user-guide/metrics.md b/docs/source/user-guide/metrics.md index 1fb2f4a5c770..43bfcd2afec2 100644 --- a/docs/source/user-guide/metrics.md +++ b/docs/source/user-guide/metrics.md @@ -32,6 +32,7 @@ DataFusion operators expose runtime metrics so you can understand where time is | elapsed_compute | CPU time the operator actively spends processing work. | | output_rows | Total number of rows the operator produces. | | output_bytes | Memory usage of all output batches. Note: This value may be overestimated. If multiple output `RecordBatch` instances share underlying memory buffers, their sizes will be counted multiple times. | +| output_batches | Total number of output batches the operator produces. | ## Operator-specific Metrics