Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
cb085cb
wip
jmacd Nov 6, 2025
3ebee67
Merge branch 'main' of github.com:open-telemetry/otel-arrow into jmac…
jmacd Nov 6, 2025
2981314
failing
jmacd Nov 6, 2025
15edb06
bugfixes
jmacd Nov 6, 2025
339cedc
rename
jmacd Nov 6, 2025
91f9e30
cleanup
jmacd Nov 6, 2025
ae456bc
cleanup
jmacd Nov 6, 2025
e86cb94
tidy
jmacd Nov 6, 2025
08e148b
better
jmacd Nov 7, 2025
13b1397
simplify
jmacd Nov 7, 2025
dff86a2
wip with a new autotest
jmacd Nov 7, 2025
14bbd75
Merge branch 'main' of github.com:open-telemetry/otel-arrow into jmac…
jmacd Nov 11, 2025
61be827
handle dictionary
jmacd Nov 11, 2025
a52d143
tested
jmacd Nov 11, 2025
629f0de
wip
jmacd Nov 11, 2025
f8c80dc
wip
jmacd Nov 12, 2025
7fc5b69
Merge branch 'main' of github.com:open-telemetry/otel-arrow into jmac…
jmacd Nov 12, 2025
35a34b6
restore orig tests
jmacd Nov 12, 2025
7d85571
separate tests
jmacd Nov 12, 2025
536b006
ckpt
jmacd Nov 12, 2025
8a29f3e
cleanup
jmacd Nov 12, 2025
37c1a3f
Merge branch 'main' of github.com:open-telemetry/otel-arrow into jmac…
jmacd Nov 12, 2025
77c4f99
wip
jmacd Nov 12, 2025
a4b039c
Merge branch 'main' of github.com:open-telemetry/otel-arrow into jmac…
jmacd Nov 12, 2025
9304282
traces
jmacd Nov 12, 2025
1d0a672
cleanup
jmacd Nov 12, 2025
feb3864
simplified
jmacd Nov 13, 2025
9b08a53
simplify
jmacd Nov 13, 2025
fd52d35
reduce
jmacd Nov 13, 2025
fcb809f
tidy
jmacd Nov 13, 2025
5c3546e
corner case avoidance
jmacd Nov 13, 2025
57b30b8
wip
jmacd Nov 13, 2025
9b6d76c
revert groups.rs logic upstream
jmacd Nov 13, 2025
329112f
minimal fix
jmacd Nov 13, 2025
ab315a8
fmt
jmacd Nov 13, 2025
c69dca5
Merge branch 'main' of github.com:open-telemetry/otel-arrow into jmac…
jmacd Nov 14, 2025
ea963ad
Merge commit '4825091864b9fccfe482c6edf8fa51b4a74c280c' of github.com…
jmacd Nov 17, 2025
849ac68
Merge branch 'main' of github.com:open-telemetry/otel-arrow into jmac…
jmacd Nov 17, 2025
20f9281
add assert helpers
jmacd Nov 17, 2025
d4a3a3e
correct reindex_record_batch (not tested)
jmacd Nov 17, 2025
0cda42c
move batching_tests fixtures into crate::testing
jmacd Nov 18, 2025
62952ca
comments
jmacd Nov 18, 2025
a0879a6
restructure and comment split_metric_points
jmacd Nov 20, 2025
f9cb23d
Merge branch 'main' of github.com:open-telemetry/otel-arrow into jmac…
jmacd Nov 20, 2025
546f1fb
revert upstream component (for later)
jmacd Nov 20, 2025
cf9d7e6
comments
jmacd Nov 20, 2025
cfdbd7a
comment/correction in sort_record_batch
jmacd Nov 20, 2025
ab6ea78
more comment
jmacd Nov 20, 2025
c26a038
new test fail
jmacd Nov 20, 2025
ad8eb36
narrowed
jmacd Nov 21, 2025
f9df254
ckpt
jmacd Nov 21, 2025
fc208c8
scale back
jmacd Nov 21, 2025
0902454
ignore this test
jmacd Nov 21, 2025
84baf69
issue num
jmacd Nov 21, 2025
d072748
clippy
jmacd Nov 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 94 additions & 100 deletions rust/otap-dataflow/crates/otap/src/batch_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,14 +364,15 @@ impl OtapBatchProcessor {
let max = requested_split_max(&input, max_val);
if let Some(max_nz) = max {
self.metrics.split_requests.inc();
let mut output_batches = match make_output_batches(Some(max_nz), input) {
Ok(v) => v,
Err(e) => {
self.metrics.batching_errors.inc();
log_batching_failed(effect, SIG_LOGS, &e).await;
Vec::new()
}
};
let mut output_batches =
match make_output_batches(SignalType::Logs, Some(max_nz), input) {
Ok(v) => v,
Err(e) => {
self.metrics.batching_errors.inc();
log_batching_failed(effect, SIG_LOGS, &e).await;
Vec::new()
}
};

// If size-triggered and we requested splitting (max is Some), re-buffer the last partial
// output if it is smaller than the configured max. Timer/Shutdown flush everything.
Expand Down Expand Up @@ -407,14 +408,15 @@ impl OtapBatchProcessor {
// No split requested (safe path)
if input.len() > 1 {
// Coalesce upstream only when there are multiple records to merge
let output_batches = match make_output_batches(None, input) {
Ok(v) => v,
Err(e) => {
self.metrics.batching_errors.inc();
log_batching_failed(effect, SIG_LOGS, &e).await;
Vec::new()
}
};
let output_batches =
match make_output_batches(SignalType::Logs, None, input) {
Ok(v) => v,
Err(e) => {
self.metrics.batching_errors.inc();
log_batching_failed(effect, SIG_LOGS, &e).await;
Vec::new()
}
};
for records in output_batches {
let pdata = OtapPdata::new_todo_context(records.into());
effect.send_message(pdata).await?;
Expand Down Expand Up @@ -462,14 +464,15 @@ impl OtapBatchProcessor {
let max = requested_split_max(&input, max_val);
if let Some(max_nz) = max {
self.metrics.split_requests.inc();
let mut output_batches = match make_output_batches(Some(max_nz), input) {
Ok(v) => v,
Err(e) => {
self.metrics.batching_errors.inc();
log_batching_failed(effect, SIG_METRICS, &e).await;
Vec::new()
}
};
let mut output_batches =
match make_output_batches(SignalType::Metrics, Some(max_nz), input) {
Ok(v) => v,
Err(e) => {
self.metrics.batching_errors.inc();
log_batching_failed(effect, SIG_METRICS, &e).await;
Vec::new()
}
};

let mut rebuffered = false;
if reason == FlushReason::Size && !output_batches.is_empty() {
Expand Down Expand Up @@ -502,14 +505,15 @@ impl OtapBatchProcessor {
// No split requested (safe path)
if input.len() > 1 {
// Coalesce upstream only when there are multiple records to merge
let output_batches = match make_output_batches(None, input) {
Ok(v) => v,
Err(e) => {
self.metrics.batching_errors.inc();
log_batching_failed(effect, SIG_METRICS, &e).await;
Vec::new()
}
};
let output_batches =
match make_output_batches(SignalType::Metrics, None, input) {
Ok(v) => v,
Err(e) => {
self.metrics.batching_errors.inc();
log_batching_failed(effect, SIG_METRICS, &e).await;
Vec::new()
}
};
for records in output_batches {
let pdata = OtapPdata::new_todo_context(records.into());
effect.send_message(pdata).await?;
Expand Down Expand Up @@ -557,14 +561,15 @@ impl OtapBatchProcessor {
let max = requested_split_max(&input, max_val);
if let Some(max_nz) = max {
self.metrics.split_requests.inc();
let mut output_batches = match make_output_batches(Some(max_nz), input) {
Ok(v) => v,
Err(e) => {
self.metrics.batching_errors.inc();
log_batching_failed(effect, SIG_TRACES, &e).await;
Vec::new()
}
};
let mut output_batches =
match make_output_batches(SignalType::Traces, Some(max_nz), input) {
Ok(v) => v,
Err(e) => {
self.metrics.batching_errors.inc();
log_batching_failed(effect, SIG_TRACES, &e).await;
Vec::new()
}
};

let mut rebuffered = false;
if reason == FlushReason::Size && !output_batches.is_empty() {
Expand Down Expand Up @@ -597,14 +602,15 @@ impl OtapBatchProcessor {
// No split requested (safe path)
if input.len() > 1 {
// Coalesce upstream only when there are multiple records to merge
let output_batches = match make_output_batches(None, input) {
Ok(v) => v,
Err(e) => {
self.metrics.batching_errors.inc();
log_batching_failed(effect, SIG_TRACES, &e).await;
Vec::new()
}
};
let output_batches =
match make_output_batches(SignalType::Traces, None, input) {
Ok(v) => v,
Err(e) => {
self.metrics.batching_errors.inc();
log_batching_failed(effect, SIG_TRACES, &e).await;
Vec::new()
}
};
for records in output_batches {
let pdata = OtapPdata::new_todo_context(records.into());
effect.send_message(pdata).await?;
Expand Down Expand Up @@ -971,7 +977,6 @@ mod tests {
use otap_df_engine::testing::test_node;
use otap_df_pdata::OtlpProtoBytes;
use otap_df_pdata::otap::OtapArrowRecords;
use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType;
use otap_df_telemetry::registry::MetricsRegistryHandle;
use serde_json::json;
use std::collections::HashMap;
Expand Down Expand Up @@ -1512,57 +1517,6 @@ mod tests {
validation.validate(|_vctx| async move {});
}

// Test constants for batching smoke tests
#[test]
#[ignore = "upstream-batching-bug"]
// NOTE: Validates cross-signal partitioning and expected row totals.
fn test_make_output_batches_partitions_and_splits() {
// Build mixed input: 3 traces (1 row each), 2 metrics (1 dp each), interleaved
let input = vec![
one_trace_record(),
one_metric_record(),
one_trace_record(),
one_metric_record(),
one_trace_record(),
];

// Request no split to validate partitioning only
let outputs = make_output_batches(None, input).expect("batching ok");

// Expect 2 outputs: one metrics (2 rows), one traces (3 rows)
let mut metrics_batches = 0usize;
let mut traces_batches = 0usize;
let mut total_metrics_rows = 0usize;
let mut total_traces_rows = 0usize;

for out in &outputs {
match out {
OtapArrowRecords::Metrics(_) => {
metrics_batches += 1;
let rb = out
.get(ArrowPayloadType::UnivariateMetrics)
.expect("metrics rb");
assert!(rb.num_rows() > 0, "metrics batch must be non-empty");
total_metrics_rows += rb.num_rows();
}
OtapArrowRecords::Traces(_) => {
traces_batches += 1;
let rb = out.get(ArrowPayloadType::Spans).expect("spans rb");
assert!(rb.num_rows() > 0, "traces batch must be non-empty");
total_traces_rows += rb.num_rows();
}
OtapArrowRecords::Logs(_) => {
panic!("unexpected logs batch in outputs");
}
}
}

assert_eq!(metrics_batches, 1, "expected one metrics output");
assert_eq!(traces_batches, 1, "expected one traces output");
assert_eq!(total_metrics_rows, 2, "expected two metric rows total");
assert_eq!(total_traces_rows, 3, "expected three trace rows total");
}

// TODO: Add a positive-path test that simulates a size-triggered split leaving
// a small remainder in the buffer, and assert that a subsequent TimerTick
// flushes that remainder (dirty flag remains set). Crafting this requires
Expand Down Expand Up @@ -1621,6 +1575,46 @@ mod tests {
validation.validate(|_vctx| async move {});
}

#[test]
fn test_metrics_batching_with_split() {
// Test that metrics can be properly batched and split
let cfg = json!({
"send_batch_size": 2,
"send_batch_max_size": 2,
"timeout": "10ms"
});
let processor_config = ProcessorConfig::new("otap_batch_metrics_test");
let test_rt = TestRuntime::new();
let node = test_node(processor_config.name.clone());
let proc = from_config(node, &cfg, &processor_config).expect("proc from config");

let phase = test_rt.set_processor(proc);

let validation = phase.run_test(|mut ctx| async move {
// Process two metric records to trigger flush
let pdata1 = OtapPdata::new_default(one_metric_record().into());
ctx.process(Message::PData(pdata1))
.await
.expect("process metric 1");

let pdata2 = OtapPdata::new_default(one_metric_record().into());
ctx.process(Message::PData(pdata2))
.await
.expect("process metric 2");

let emitted = ctx.drain_pdata().await;
assert_eq!(emitted.len(), 1, "metrics should flush at threshold");

// Verify the batched output is metrics
let first = emitted.into_iter().next().unwrap().payload();
let first_rec: OtapArrowRecords = first.try_into().unwrap();
assert!(matches!(first_rec, OtapArrowRecords::Metrics(_)));
assert_eq!(2, first_rec.batch_length());
});

validation.validate(|_vctx| async move {});
}

#[test]
fn test_batch_with_out_port() {
let id = PipelineId::from("batch-with-out-port".to_string());
Expand Down
2 changes: 1 addition & 1 deletion rust/otap-dataflow/crates/otap/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod otlp_receiver;
/// Implementation of OTLP exporter that implements the exporter trait
pub mod otlp_exporter;

// OTAP batch processor
/// Batch processor
pub mod batch_processor;

// Retry processor that is aware of the OTAP PData/context.
Expand Down
3 changes: 3 additions & 0 deletions rust/otap-dataflow/crates/pdata/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ pub enum Error {
)]
InvalidId { expected: usize, given: usize },

#[error("Mixed signals")]
MixedSignals,

#[error("Encoding error: {}", error)]
Encoding {
#[from]
Expand Down
4 changes: 4 additions & 0 deletions rust/otap-dataflow/crates/pdata/src/otap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ use crate::{
};

pub mod batching;

#[cfg(test)]
pub mod batching_tests;

/// filter support for the filter processor
pub mod filter;
pub mod groups;
Expand Down
45 changes: 18 additions & 27 deletions rust/otap-dataflow/crates/pdata/src/otap/batching.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,31 @@
//!

use super::{OtapArrowRecords, error::Result, groups::RecordsGroup};
use otap_df_config::SignalType;
use std::num::NonZeroU64;

/// merge and combine batches to the appropriate size
/// Rebatch records to the appropriate size in a single pass.
/// Returns error if not the same signal type.
pub fn make_output_batches(
signal: SignalType,
max_output_batch: Option<NonZeroU64>,
records: Vec<OtapArrowRecords>,
) -> Result<Vec<OtapArrowRecords>> {
// We have to deal with three complications here:
// * batches that are too small
// * batches that are too big
// * cases where we have different types (logs/metrics/traces) intermingled

// We deal with the last issue first, by splitting the input into three lists of the appropriate
// types.
let [mut logs, mut metrics, mut traces] = RecordsGroup::split_by_type(records);

if let Some(max_output_batch) = max_output_batch {
logs = logs.split(max_output_batch)?;
metrics = metrics.split(max_output_batch)?;
traces = traces.split(max_output_batch)?;
// Separate by signal type.
let mut records = match signal {
SignalType::Logs => RecordsGroup::separate_logs(records),
SignalType::Metrics => RecordsGroup::separate_metrics(records),
SignalType::Traces => RecordsGroup::separate_traces(records),
}?;

// Split large batches so they can be reassembled into
// limited-size batches.
if let Some(limit) = max_output_batch {
records = records.split(limit)?;
}
logs = logs.concatenate(max_output_batch)?;
metrics = metrics.concatenate(max_output_batch)?;
traces = traces.concatenate(max_output_batch)?;

let mut result = Vec::new();
result.extend(logs.into_otap_arrow_records());
result.extend(metrics.into_otap_arrow_records());
result.extend(traces.into_otap_arrow_records());

// By splitting into 3 different lists, we've probably scrambled the ordering. We can't really
// fix that problem in a general sense because each `OtapArrowRecords` will contain many rows ot
// different times, but we can improve matters slightly by sorting on the smallest record time.
// Join batches in sequence.
records = records.concatenate(max_output_batch)?;

// FIXME: sort here
Ok(result)
Ok(records.into_otap_arrow_records())
}
Loading
Loading