Skip to content

Commit 36ac5e0

Browse files
committed
refactor: include metric output_batches into BaselineMetrics
1 parent 2d5c101 commit 36ac5e0

File tree

14 files changed

+52
-55
lines changed

14 files changed

+52
-55
lines changed

datafusion/physical-plan/src/joins/cross_join.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,6 @@ impl<T: BatchTransformer> CrossJoinStream<T> {
650650
self.left_index += 1;
651651
}
652652

653-
self.join_metrics.output_batches.add(1);
654653
return Ok(StatefulStreamResult::Ready(Some(batch)));
655654
}
656655
}

datafusion/physical-plan/src/joins/hash_join/stream.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,6 @@ impl HashJoinStream {
494494
&self.column_indices,
495495
self.join_type,
496496
)?;
497-
self.join_metrics.output_batches.add(1);
498497
timer.done();
499498

500499
self.state = HashJoinStreamState::FetchProbeBatch;
@@ -597,7 +596,6 @@ impl HashJoinStream {
597596
)?
598597
};
599598

600-
self.join_metrics.output_batches.add(1);
601599
timer.done();
602600

603601
if next_offset.is_none() {
@@ -653,8 +651,6 @@ impl HashJoinStream {
653651
if let Ok(ref batch) = result {
654652
self.join_metrics.input_batches.add(1);
655653
self.join_metrics.input_rows.add(batch.num_rows());
656-
657-
self.join_metrics.output_batches.add(1);
658654
}
659655
timer.done();
660656

datafusion/physical-plan/src/joins/nested_loop_join.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1453,10 +1453,6 @@ impl NestedLoopJoinStream {
14531453
fn maybe_flush_ready_batch(&mut self) -> Option<Poll<Option<Result<RecordBatch>>>> {
14541454
if self.output_buffer.has_completed_batch() {
14551455
if let Some(batch) = self.output_buffer.next_completed_batch() {
1456-
// HACK: this is not part of `BaselineMetrics` yet, so update it
1457-
// manually
1458-
self.join_metrics.output_batches.add(1);
1459-
14601456
return Some(Poll::Ready(Some(Ok(batch))));
14611457
}
14621458
}

datafusion/physical-plan/src/joins/sort_merge_join/metrics.rs

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@ pub(super) struct SortMergeJoinMetrics {
3131
input_batches: Count,
3232
/// Number of rows consumed by this operator
3333
input_rows: Count,
34-
/// Number of batches produced by this operator
35-
output_batches: Count,
3634
/// Execution metrics
3735
baseline_metrics: BaselineMetrics,
3836
/// Peak memory used for buffered data.
@@ -49,8 +47,6 @@ impl SortMergeJoinMetrics {
4947
let input_batches =
5048
MetricBuilder::new(metrics).counter("input_batches", partition);
5149
let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
52-
let output_batches =
53-
MetricBuilder::new(metrics).counter("output_batches", partition);
5450
let peak_mem_used = MetricBuilder::new(metrics).gauge("peak_mem_used", partition);
5551
let spill_metrics = SpillMetrics::new(metrics, partition);
5652

@@ -60,7 +56,6 @@ impl SortMergeJoinMetrics {
6056
join_time,
6157
input_batches,
6258
input_rows,
63-
output_batches,
6459
baseline_metrics,
6560
peak_mem_used,
6661
spill_metrics,
@@ -82,9 +77,6 @@ impl SortMergeJoinMetrics {
8277
pub fn input_rows(&self) -> Count {
8378
self.input_rows.clone()
8479
}
85-
pub fn output_batches(&self) -> Count {
86-
self.output_batches.clone()
87-
}
8880

8981
pub fn peak_mem_used(&self) -> Gauge {
9082
self.peak_mem_used.clone()

datafusion/physical-plan/src/joins/sort_merge_join/stream.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1462,7 +1462,6 @@ impl SortMergeJoinStream {
14621462
fn output_record_batch_and_reset(&mut self) -> Result<RecordBatch> {
14631463
let record_batch =
14641464
concat_batches(&self.schema, &self.staging_output_record_batches.batches)?;
1465-
self.join_metrics.output_batches().add(1);
14661465
self.join_metrics
14671466
.baseline_metrics()
14681467
.record_output(record_batch.num_rows());

datafusion/physical-plan/src/joins/stream_join_utils.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -682,8 +682,6 @@ pub struct StreamJoinMetrics {
682682
pub(crate) right: StreamJoinSideMetrics,
683683
/// Memory used by sides in bytes
684684
pub(crate) stream_memory_usage: metrics::Gauge,
685-
/// Number of batches produced by this operator
686-
pub(crate) output_batches: metrics::Count,
687685
/// Number of rows produced by this operator
688686
pub(crate) baseline_metrics: BaselineMetrics,
689687
}
@@ -709,13 +707,9 @@ impl StreamJoinMetrics {
709707
let stream_memory_usage =
710708
MetricBuilder::new(metrics).gauge("stream_memory_usage", partition);
711709

712-
let output_batches =
713-
MetricBuilder::new(metrics).counter("output_batches", partition);
714-
715710
Self {
716711
left,
717712
right,
718-
output_batches,
719713
stream_memory_usage,
720714
baseline_metrics: BaselineMetrics::new(metrics, partition),
721715
}

datafusion/physical-plan/src/joins/symmetric_hash_join.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1376,7 +1376,6 @@ impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
13761376
}
13771377
}
13781378
Some((batch, _)) => {
1379-
self.metrics.output_batches.add(1);
13801379
return self
13811380
.metrics
13821381
.baseline_metrics

datafusion/physical-plan/src/joins/utils.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1327,8 +1327,6 @@ pub(crate) struct BuildProbeJoinMetrics {
13271327
pub(crate) input_batches: metrics::Count,
13281328
/// Number of rows consumed by probe-side this operator
13291329
pub(crate) input_rows: metrics::Count,
1330-
/// Number of batches produced by this operator
1331-
pub(crate) output_batches: metrics::Count,
13321330
}
13331331

13341332
// This Drop implementation updates the elapsed compute part of the metrics.
@@ -1372,9 +1370,6 @@ impl BuildProbeJoinMetrics {
13721370

13731371
let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);
13741372

1375-
let output_batches =
1376-
MetricBuilder::new(metrics).counter("output_batches", partition);
1377-
13781373
Self {
13791374
build_time,
13801375
build_input_batches,
@@ -1383,7 +1378,6 @@ impl BuildProbeJoinMetrics {
13831378
join_time,
13841379
input_batches,
13851380
input_rows,
1386-
output_batches,
13871381
baseline,
13881382
}
13891383
}

datafusion/physical-plan/src/metrics/baseline.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@ pub struct BaselineMetrics {
6363
/// multiple times.
6464
/// Issue: <https://github.com/apache/datafusion/issues/16841>
6565
output_bytes: Count,
66+
67+
/// output batches: the total batch count
68+
output_batches: Count,
6669
// Remember to update `docs/source/user-guide/metrics.md` when updating comments
6770
// or adding new metrics
6871
}
@@ -86,6 +89,9 @@ impl BaselineMetrics {
8689
output_bytes: MetricBuilder::new(metrics)
8790
.with_type(super::MetricType::SUMMARY)
8891
.output_bytes(partition),
92+
output_batches: MetricBuilder::new(metrics)
93+
.with_type(super::MetricType::SUMMARY)
94+
.output_batches(partition),
8995
}
9096
}
9197

@@ -100,6 +106,7 @@ impl BaselineMetrics {
100106
elapsed_compute: self.elapsed_compute.clone(),
101107
output_rows: Default::default(),
102108
output_bytes: Default::default(),
109+
output_batches: Default::default(),
103110
}
104111
}
105112

@@ -113,6 +120,11 @@ impl BaselineMetrics {
113120
&self.output_rows
114121
}
115122

123+
/// return the metric for the total number of output batches produced
124+
pub fn output_batches(&self) -> &Count {
125+
&self.output_batches
126+
}
127+
116128
/// Records the fact that this operator's execution is complete
117129
/// (recording the `end_time` metric).
118130
///
@@ -229,6 +241,7 @@ impl RecordOutput for RecordBatch {
229241
bm.record_output(self.num_rows());
230242
let n_bytes = get_record_batch_memory_size(&self);
231243
bm.output_bytes.add(n_bytes);
244+
bm.output_batches.add(1);
232245
self
233246
}
234247
}
@@ -238,6 +251,7 @@ impl RecordOutput for &RecordBatch {
238251
bm.record_output(self.num_rows());
239252
let n_bytes = get_record_batch_memory_size(self);
240253
bm.output_bytes.add(n_bytes);
254+
bm.output_batches.add(1);
241255
self
242256
}
243257
}

datafusion/physical-plan/src/metrics/builder.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,14 @@ impl<'a> MetricBuilder<'a> {
161161
count
162162
}
163163

164+
/// Consume self and create a new counter for recording total output batches
165+
pub fn output_batches(self, partition: usize) -> Count {
166+
let count = Count::new();
167+
self.with_partition(partition)
168+
.build(MetricValue::OutputBatches(count.clone()));
169+
count
170+
}
171+
164172
/// Consume self and create a new gauge for reporting current memory usage
165173
pub fn mem_used(self, partition: usize) -> Gauge {
166174
let gauge = Gauge::new();

0 commit comments

Comments
 (0)