Skip to content

Commit 38bfa01

Browse files
committed
tested
1 parent d9742f7 commit 38bfa01

File tree

1 file changed

+129
-7
lines changed

1 file changed

+129
-7
lines changed

rust/otap-dataflow/crates/otap/src/batch_processor.rs

Lines changed: 129 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,21 @@ impl Config {
117117
}
118118
}
119119

120+
if self.timeout.is_none() {
121+
if let Some(batch_size) = self.send_batch_size {
122+
return Err(ConfigError::InvalidUserConfig {
123+
error: format!("send_batch_size ({}) requires a timeout", batch_size),
124+
});
125+
}
126+
if self.send_batch_max_size.is_none() {
127+
return Err(ConfigError::InvalidUserConfig {
128+
error: format!(
129+
"send_batch_max_size required for split-only (no timeout) configuration"
130+
),
131+
});
132+
}
133+
}
134+
120135
Ok(())
121136
}
122137
}
@@ -396,8 +411,9 @@ impl BatchProcessor {
396411
}
397412

398413
let count = input.len();
399-
let limit = self.config.send_batch_max_size;
400-
let mut output_batches = match make_output_batches(signal, nzu_to_nz64(limit), input) {
414+
let upper_limit = self.config.send_batch_max_size;
415+
let mut output_batches = match make_output_batches(signal, nzu_to_nz64(upper_limit), input)
416+
{
401417
Ok(v) => v,
402418
Err(e) => {
403419
self.metrics.batching_errors.add(count as u64);
@@ -408,19 +424,21 @@ impl BatchProcessor {
408424
}
409425
};
410426

411-
// If size-triggered and we requested splitting (limit is Some), re-buffer the last partial
427+
// If size-triggered and we requested splitting (upper_limit is Some), re-buffer the last partial
412428
// output if it is smaller than the configured lower_limit. Timer/Shutdown flush everything.
413-
if reason == FlushReason::Size && limit.is_some() && !output_batches.is_empty() {
429+
if let Some(timeout) = self.config.timeout
430+
&& reason == FlushReason::Size
431+
&& upper_limit.is_some()
432+
&& !output_batches.is_empty()
433+
{
414434
if let Some(last_items) = output_batches.last().map(|last| last.batch_length()) {
415435
if last_items < self.lower_limit.get() {
416436
let remainder = output_batches.pop().expect("last exists");
417437

418438
// We use the latest arrival time as the new arrival for timeout purposes.
419439
buffer.items = last_items;
420440
buffer.pending.push(remainder);
421-
if let Some(timeout) = self.config.timeout {
422-
buffer.set_arrival(signal, now, timeout, effect).await?;
423-
}
441+
buffer.set_arrival(signal, now, timeout, effect).await?;
424442
}
425443
}
426444
}
@@ -1145,4 +1163,108 @@ mod tests {
11451163
let mut datagen = DataGenerator::new(1);
11461164
test_concatenate((0..4).map(|_| datagen.generate_logs().into()));
11471165
}
1166+
1167+
/// Generic test helper for split-only mode (no timeout, no send_batch_size)
1168+
fn test_split_only(inputs_otlp: impl Iterator<Item = OtlpProtoMessage>) {
1169+
let cfg = json!({
1170+
"send_batch_size": null,
1171+
"send_batch_max_size": 2,
1172+
"timeout": null,
1173+
});
1174+
1175+
let (metrics_registry, metrics_reporter, phase) = setup_test_runtime(cfg);
1176+
1177+
let inputs_otlp: Vec<_> = inputs_otlp.collect();
1178+
let signal = inputs_otlp[0].signal_type();
1179+
let input_item_count: usize = inputs_otlp.iter().map(|m| m.batch_length()).sum();
1180+
1181+
phase
1182+
.run_test(move |mut ctx| async move {
1183+
let (pipeline_tx, mut pipeline_rx) = pipeline_ctrl_msg_channel(10);
1184+
ctx.set_pipeline_ctrl_sender(pipeline_tx);
1185+
1186+
let controller_task = tokio::spawn(async move {
1187+
while let Ok(msg) = pipeline_rx.recv().await {
1188+
if let PipelineControlMsg::DelayData { .. } = msg {}
1189+
}
1190+
});
1191+
1192+
for input_otlp in &inputs_otlp {
1193+
let otap_records = match input_otlp {
1194+
OtlpProtoMessage::Traces(t) => encode_spans_otap_batch(t).expect("encode"),
1195+
OtlpProtoMessage::Logs(l) => encode_logs_otap_batch(l).expect("encode"),
1196+
OtlpProtoMessage::Metrics(_) => panic!("metrics not supported"),
1197+
};
1198+
let pdata = OtapPdata::new_default(otap_records.into());
1199+
ctx.process(Message::PData(pdata)).await.expect("process");
1200+
}
1201+
1202+
// With no send_batch_size and no timeout, flushes immediately on every input
1203+
// With send_batch_max_size=2, each 3-item input splits to [2, 1]
1204+
let emitted = ctx.drain_pdata().await;
1205+
assert_eq!(emitted.len(), 4, "should emit 4 batches: 2, 1, 2, 1");
1206+
1207+
let batch_sizes: Vec<usize> = emitted
1208+
.iter()
1209+
.map(|p| {
1210+
let rec: OtapArrowRecords = p.clone().payload().try_into().unwrap();
1211+
rec.batch_length()
1212+
})
1213+
.collect();
1214+
assert_eq!(
1215+
batch_sizes,
1216+
vec![2, 1, 2, 1],
1217+
"split pattern without rebuffering"
1218+
);
1219+
1220+
// Shutdown should have nothing remaining (all flushed)
1221+
ctx.process(Message::Control(NodeControlMsg::Shutdown {
1222+
deadline: Instant::now() + Duration::from_millis(100),
1223+
reason: "test".into(),
1224+
}))
1225+
.await
1226+
.expect("shutdown");
1227+
1228+
let final_emitted = ctx.drain_pdata().await;
1229+
assert_eq!(final_emitted.len(), 0, "no remaining data");
1230+
1231+
// Verify equivalence
1232+
let all_outputs: Vec<OtlpProtoMessage> = emitted
1233+
.into_iter()
1234+
.map(|p| {
1235+
let rec: OtapArrowRecords = p.payload().try_into().unwrap();
1236+
otap_to_otlp(&rec)
1237+
})
1238+
.collect();
1239+
assert_equivalent(&inputs_otlp, &all_outputs);
1240+
1241+
// Trigger telemetry collection
1242+
ctx.process(Message::Control(NodeControlMsg::CollectTelemetry {
1243+
metrics_reporter,
1244+
}))
1245+
.await
1246+
.expect("collect telemetry");
1247+
1248+
controller_task.abort();
1249+
})
1250+
.validate(move |_| async move {
1251+
// Allow metrics to be reported
1252+
tokio::time::sleep(Duration::from_millis(50)).await;
1253+
1254+
// Verify metrics
1255+
verify_item_metrics(&metrics_registry, signal, input_item_count);
1256+
});
1257+
}
1258+
1259+
#[test]
1260+
fn test_split_only_traces() {
1261+
let mut datagen = DataGenerator::new(1);
1262+
test_split_only((0..2).map(|_| datagen.generate_traces().into()));
1263+
}
1264+
1265+
#[test]
1266+
fn test_split_only_logs() {
1267+
let mut datagen = DataGenerator::new(1);
1268+
test_split_only((0..2).map(|_| datagen.generate_logs().into()));
1269+
}
11481270
}

0 commit comments

Comments
 (0)