Skip to content

Commit 29735b1

Browse files
Add filtering benchmarks (#1499)
Adds benchmarks for the filtering code used in the filter processor and for the columnar query engine, with similar filtering criteria. My primary goal was to confirm that we get similar performance using the both approaches. The filter processor code uses arrow's compute kernels, whereas the columnar query engine uses a hybrid approach of datafusion's physical expression and compute kernels. **filter pattern** | **batch_size** | **pdata filter code (µs)** | **col. query engine (µs)** :--- | :---: | :---: | :---: `severity_text == "WARN"` | 32 | 8.5488 | 7.6645 " | 1024 | 37.295 | 22.341 " | 8192 | 233.7 | 153.21 `attrs["code.namespace"] == "main"` | 32 | 9.9075 | 10.202 " | 1024 | 52.496 | 43.148 " | 8192 | 408.4 | 331.72 `attrs["code.namespace"] == "main" or attrs["code.line.number"] == 2` | 32 | 11.666 | 13.387 " | 1024 | 70.035 | 63.779 " | 8192 | 572.73 | 510.23 `severity_text == "WARN" and attrs["code.namespace"] == "main"` | 32 | 11.705 | 10.719 " | 1024 | 48.919 | 35.641 " | 8192 | 377.17 | 274.38 The results leads me to believe the approach the columnar query engine is taking for filtering is on the right track. My secondary goal here was simply to get some benchmarks in place for the columnar query engine's filtering code, so we have a baseline for which to measure the effect of future performance optimizations and to ensure we're not introducing regressions as we add more sophistication to support additional filtering use cases.
1 parent a8534da commit 29735b1

File tree

5 files changed

+271
-5
lines changed

5 files changed

+271
-5
lines changed

rust/experimental/query_engine/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ datafusion = { version = "51.0.0", default-features = false }
2626
bytes = "1.10.1"
2727
caseless = "0.2.2"
2828
chrono = "0.4.41"
29+
criterion = "0.7.0"
2930
futures-core = "0.3"
3031
hex = "0.4.3"
3132
opentelemetry-proto = "0.31.0"

rust/experimental/query_engine/engine-columnar/Cargo.toml

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,20 @@ otap-df-pdata = { path = "../../../otap-dataflow/crates/pdata" }
2121
data_engine_expressions = { path = "../expressions" }
2222
data_engine_kql_parser = { path = "../kql-parser" }
2323

24-
2524
[dev-dependencies]
25+
criterion = { workspace = true, features = ["async_tokio"] }
2626
pretty_assertions = { workspace = true }
2727
prost = { workspace = true }
28-
tokio = { workspace = true }
28+
tokio = { workspace = true }
29+
30+
[[bench]]
31+
name = "filter"
32+
harness = false
33+
34+
[profile.bench]
35+
opt-level = 3
36+
debug = false
37+
incremental = false
38+
lto = "fat"
39+
codegen-units = 1
40+
panic = "abort"
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use std::time::Instant;
5+
6+
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
7+
use data_engine_columnar::pipeline::Pipeline;
8+
use data_engine_kql_parser::{KqlParser, Parser};
9+
use otap_df_pdata::OtapArrowRecords;
10+
use otap_df_pdata::proto::OtlpProtoMessage;
11+
use otap_df_pdata::testing::fixtures::logs_with_varying_attributes_and_properties;
12+
use otap_df_pdata::testing::round_trip::otlp_to_otap;
13+
use tokio::runtime::Runtime;
14+
15+
fn generate_logs_batch(batch_size: usize) -> OtapArrowRecords {
16+
let logs_data = logs_with_varying_attributes_and_properties(batch_size);
17+
otlp_to_otap(&OtlpProtoMessage::Logs(logs_data))
18+
}
19+
20+
fn bench_log_pipeline(
21+
c: &mut Criterion,
22+
rt: &Runtime,
23+
batch_sizes: &[usize],
24+
bench_group_name: &str,
25+
bench_pipeline_kql: &str,
26+
) {
27+
let mut group = c.benchmark_group(bench_group_name);
28+
for batch_size in batch_sizes {
29+
let benchmark_id = BenchmarkId::new("batch_size", batch_size);
30+
let _ = group.bench_with_input(benchmark_id, &batch_size, |b, batch_size| {
31+
b.iter_custom(|iters| {
32+
let batch = generate_logs_batch(**batch_size);
33+
let query = KqlParser::parse(bench_pipeline_kql).expect("can parse pipeline");
34+
let mut pipeline = Pipeline::new(query);
35+
rt.block_on(async move {
36+
// execute the query once to initiate planning
37+
pipeline.execute(batch.clone()).await.unwrap();
38+
39+
let start = Instant::now();
40+
for _ in 0..iters {
41+
let result = pipeline.execute(batch.clone()).await.unwrap();
42+
std::hint::black_box(result);
43+
}
44+
start.elapsed()
45+
})
46+
});
47+
});
48+
}
49+
group.finish();
50+
}
51+
52+
fn bench_filter_pipelines(c: &mut Criterion) {
53+
let rt = tokio::runtime::Builder::new_current_thread()
54+
.enable_all()
55+
.build()
56+
.expect("can build tokio single threaded runtime");
57+
58+
let batch_sizes = [32, 1024, 8192];
59+
bench_log_pipeline(
60+
c,
61+
&rt,
62+
&batch_sizes,
63+
"simple_field_filter",
64+
"logs | where severity_text == \"WARN\"",
65+
);
66+
bench_log_pipeline(
67+
c,
68+
&rt,
69+
&batch_sizes,
70+
"simple_attr_filter",
71+
"logs | where attributes[\"code.namespace\"] == \"main\"",
72+
);
73+
bench_log_pipeline(
74+
c,
75+
&rt,
76+
&batch_sizes,
77+
"attr_or_filter",
78+
"logs | where attributes[\"code.namespace\"] == \"main\" or attributes[\"code.line.number\"] == 2",
79+
);
80+
bench_log_pipeline(
81+
c,
82+
&rt,
83+
&batch_sizes,
84+
"attr_and_prop_filter",
85+
"logs | where attributes[\"code.namespace\"] == \"main\" and severity_text == \"WARN\"",
86+
);
87+
}
88+
89+
#[allow(missing_docs)]
90+
mod benches {
91+
use super::*;
92+
93+
criterion_group!(
94+
name = benches;
95+
config = Criterion::default();
96+
targets = bench_filter_pipelines
97+
);
98+
}
99+
100+
criterion_main!(benches::benches);

rust/otap-dataflow/benchmarks/benches/pdata_filter/main.rs

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,115 @@ use std::hint::black_box;
77
use std::sync::Arc;
88

99
use arrow::array::Array;
10-
use criterion::{Criterion, criterion_group, criterion_main};
11-
use otap_df_pdata::otap::filter::build_uint16_id_filter;
10+
use criterion::{BatchSize, BenchmarkId, Criterion, criterion_group, criterion_main};
11+
use otap_df_pdata::OtapArrowRecords;
12+
use otap_df_pdata::otap::filter::logs::{LogFilter, LogMatchProperties};
13+
use otap_df_pdata::otap::filter::{self, MatchType, build_uint16_id_filter};
1214
use otap_df_pdata::proto::OtlpProtoMessage;
1315
use otap_df_pdata::proto::opentelemetry::arrow::v1::ArrowPayloadType;
1416
use otap_df_pdata::proto::opentelemetry::common::v1::{AnyValue, KeyValue};
1517
use otap_df_pdata::proto::opentelemetry::logs::v1::{LogRecord, LogsData, ResourceLogs, ScopeLogs};
18+
use otap_df_pdata::testing::fixtures::logs_with_varying_attributes_and_properties;
1619
use otap_df_pdata::testing::round_trip::otlp_to_otap;
1720
use roaring::RoaringBitmap;
1821

22+
fn generate_logs_batch(batch_size: usize) -> OtapArrowRecords {
23+
let logs_data = logs_with_varying_attributes_and_properties(batch_size);
24+
otlp_to_otap(&OtlpProtoMessage::Logs(logs_data))
25+
}
26+
27+
fn bench_log_filter(
28+
c: &mut Criterion,
29+
batch_sizes: &[usize],
30+
bench_group_name: &str,
31+
include: Option<LogMatchProperties>,
32+
exclude: Option<LogMatchProperties>,
33+
) {
34+
let mut group = c.benchmark_group(format!("log_filter/{bench_group_name}"));
35+
for batch_size in batch_sizes {
36+
let benchmark_id = BenchmarkId::new("batch_size", batch_size);
37+
38+
let filter = LogFilter::new(include.clone(), exclude.clone(), Vec::new());
39+
let batch = generate_logs_batch(*batch_size);
40+
_ = group.bench_with_input(benchmark_id, &(batch, filter), |b, input| {
41+
b.iter_batched(
42+
|| input,
43+
|input| {
44+
let (batch, filter) = &input;
45+
let (result, _, _) = filter.filter(batch.clone()).expect("shouldn't fail");
46+
black_box(result)
47+
},
48+
BatchSize::SmallInput,
49+
);
50+
})
51+
}
52+
53+
group.finish();
54+
}
55+
56+
fn bench_filter(c: &mut Criterion) {
57+
let batch_sizes = [32, 1024, 8092];
58+
59+
let include = LogMatchProperties::new(
60+
MatchType::Strict,
61+
Vec::new(), // no resource attr filter,
62+
Vec::new(), // no record attrs filter,
63+
vec!["WARN".into()], // severity_text = "WARN",
64+
None, // no severity number filter,
65+
Vec::new(), // no bodies filter
66+
);
67+
bench_log_filter(c, &batch_sizes, "simple_field_filter", Some(include), None);
68+
69+
let include = LogMatchProperties::new(
70+
MatchType::Strict,
71+
Vec::new(), // no resource attr filter,
72+
vec![
73+
// attrs["code.namespace"] == "main"
74+
filter::KeyValue::new(
75+
"code.namespace".into(),
76+
filter::AnyValue::String("main".into()),
77+
),
78+
],
79+
Vec::new(), // no severity text filter
80+
None, // no severity number filter,
81+
Vec::new(), // no bodies filter
82+
);
83+
bench_log_filter(c, &batch_sizes, "simple_attrs_filter", Some(include), None);
84+
85+
let include = LogMatchProperties::new(
86+
MatchType::Strict,
87+
Vec::new(),
88+
vec![
89+
// attrs["code.namespace"] == "main" or attrs["code.line.number"] == 2
90+
filter::KeyValue::new(
91+
"code.namespace".into(),
92+
filter::AnyValue::String("main".into()),
93+
),
94+
filter::KeyValue::new("code.line.number".into(), filter::AnyValue::Int(2)),
95+
],
96+
Vec::new(), // no severity text filter
97+
None, // no severity number filter,
98+
Vec::new(), // no bodies filter
99+
);
100+
bench_log_filter(c, &batch_sizes, "attrs_or_filter", Some(include), None);
101+
102+
let include = LogMatchProperties::new(
103+
MatchType::Strict,
104+
Vec::new(),
105+
vec![
106+
// attrs["code.namespace"] == "main"
107+
filter::KeyValue::new(
108+
"code.namespace".into(),
109+
filter::AnyValue::String("main".into()),
110+
),
111+
],
112+
vec!["WARN".into()], // severity_text == "WARN"
113+
None, // no severity number filter,
114+
Vec::new(), // no bodies filter
115+
);
116+
bench_log_filter(c, &batch_sizes, "attr_and_prop_filter", Some(include), None);
117+
}
118+
19119
/// Benchmark for [`build_uint16_id_filter`]
20120
///
21121
/// # Motivation:
@@ -155,7 +255,7 @@ mod benches {
155255
criterion_group!(
156256
name = benches;
157257
config = Criterion::default();
158-
targets = bench_build_uint16_id_filter
258+
targets = bench_filter, bench_build_uint16_id_filter
159259
);
160260
}
161261

rust/otap-dataflow/crates/pdata/src/testing/fixtures.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,59 @@ pub fn logs_multiple_resources_mixed_content() -> LogsData {
273273
])
274274
}
275275

276+
/// Generate logs with varying attributes and properties that follow some semantic
277+
/// conventions. This can be used to generate somewhat realistic set of records that
278+
/// of various batch sizes that could be used to test transformations such as filtering
279+
#[must_use]
280+
pub fn logs_with_varying_attributes_and_properties(batch_size: usize) -> LogsData {
281+
let log_records = (0..batch_size)
282+
.map(|i| {
283+
// generate some log attributes that somewhat follow semantic conventions
284+
let attrs = vec![
285+
KeyValue::new(
286+
"code.namespace",
287+
AnyValue::new_string(match i % 3 {
288+
0 => "main",
289+
1 => "otap_dataflow_engine",
290+
_ => "arrow::array",
291+
}),
292+
),
293+
KeyValue::new("code.line.number", AnyValue::new_int((i % 5) as i64)),
294+
];
295+
296+
// cycle through severity numbers
297+
// 5 = DEBUG, 9 = INFO, 13 = WARN, 17 = ERROR
298+
let severity_number =
299+
SeverityNumber::try_from(((i % 4) * 4 + 1) as i32).expect("valid severity_number");
300+
let severity_text = severity_number
301+
.as_str_name()
302+
.split("_") // Note: this splitting something like SEVERITY_NUMBER_INFO
303+
.nth(2)
304+
.expect("can parse severity_text");
305+
let event_name = format!("event {}", i);
306+
let time_unix_nano = i as u64;
307+
308+
LogRecord::build()
309+
.attributes(attrs)
310+
.event_name(event_name)
311+
.severity_number(severity_number)
312+
.severity_text(severity_text)
313+
.time_unix_nano(time_unix_nano)
314+
.finish()
315+
})
316+
.collect::<Vec<_>>();
317+
318+
LogsData {
319+
resource_logs: vec![ResourceLogs {
320+
scope_logs: vec![ScopeLogs {
321+
log_records,
322+
..Default::default()
323+
}],
324+
..Default::default()
325+
}],
326+
}
327+
}
328+
276329
//
277330
// Traces Fixtures
278331
//

0 commit comments

Comments
 (0)