Skip to content

Commit 3081202

Browse files
Support protobuf encoding default AnyValues from missing optional columns (#1447)
fixes: #1445 We have a handful of columns that that represent `AnyValue`'s value that are optional columns. If the column is full of default values, we omit these columns. However there was a bug in the encoding of these fields into OTLP protobuf, we treated the missing column as a null value which is not correct if the value_type column is not `ValueType::Empty`. These values must be encoded in OTLP, despite their default value, because they are protobuf `oneof` fields.
1 parent 0969a1a commit 3081202

File tree

3 files changed

+124
-22
lines changed

3 files changed

+124
-22
lines changed

rust/otap-dataflow/crates/pdata/src/otlp/attributes.rs

Lines changed: 105 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,16 @@ pub(crate) fn encode_any_value(
112112
) -> Result<()> {
113113
match value_type {
114114
AttributeValueType::Str => {
115-
if let Some(attr_str) = &attr_arrays.attr_str {
116-
if let Some(val) = attr_str.str_at(index) {
117-
result_buf.encode_string(ANY_VALUE_STRING_VALUE, val);
118-
}
119-
}
115+
let val = attr_arrays
116+
.attr_str
117+
.as_ref()
118+
.and_then(|col| col.str_at(index))
119+
.unwrap_or_default();
120+
result_buf.encode_string(ANY_VALUE_STRING_VALUE, val);
120121
}
121122
AttributeValueType::Bool => {
123+
// TODO handle case when bool column is missing we correct the default value handling
124+
// https://github.com/open-telemetry/otel-arrow/issues/1449
122125
if let Some(attr_bool) = &attr_arrays.attr_bool {
123126
if let Some(val) = attr_bool.value_at(index) {
124127
result_buf.encode_field_tag(ANY_VALUE_BOOL_VALUE, wire_types::VARINT);
@@ -127,27 +130,30 @@ pub(crate) fn encode_any_value(
127130
}
128131
}
129132
AttributeValueType::Int => {
130-
if let Some(attr_int) = &attr_arrays.attr_int {
131-
if let Some(val) = attr_int.value_at(index) {
132-
result_buf.encode_field_tag(ANY_VALUE_INT_VALUE, wire_types::VARINT);
133-
result_buf.encode_varint(val as u64);
134-
}
135-
}
133+
let val = attr_arrays
134+
.attr_int
135+
.as_ref()
136+
.and_then(|col| col.value_at(index))
137+
.unwrap_or_default();
138+
result_buf.encode_field_tag(ANY_VALUE_INT_VALUE, wire_types::VARINT);
139+
result_buf.encode_varint(val as u64);
136140
}
137141
AttributeValueType::Double => {
138-
if let Some(attr_double) = &attr_arrays.attr_double {
139-
if let Some(val) = attr_double.value_at(index) {
140-
result_buf.encode_field_tag(ANY_VALUE_DOUBLE_VALUE, wire_types::FIXED64);
141-
result_buf.extend_from_slice(&val.to_le_bytes());
142-
}
143-
}
142+
let val = attr_arrays
143+
.attr_double
144+
.as_ref()
145+
.and_then(|col| col.value_at(index))
146+
.unwrap_or_default();
147+
result_buf.encode_field_tag(ANY_VALUE_DOUBLE_VALUE, wire_types::FIXED64);
148+
result_buf.extend_from_slice(&val.to_le_bytes());
144149
}
145150
AttributeValueType::Bytes => {
146-
if let Some(attr_bytes) = &attr_arrays.attr_bytes {
147-
if let Some(val) = attr_bytes.slice_at(index) {
148-
result_buf.encode_bytes(ANY_VALUE_BYTES_VALUE, val);
149-
}
150-
}
151+
let val = attr_arrays
152+
.attr_bytes
153+
.as_ref()
154+
.and_then(|col| col.slice_at(index))
155+
.unwrap_or_default();
156+
result_buf.encode_bytes(ANY_VALUE_BYTES_VALUE, val);
151157
}
152158
AttributeValueType::Map | AttributeValueType::Slice => {
153159
if let Some(ser_bytes) = &attr_arrays.attr_ser {
@@ -163,3 +169,80 @@ pub(crate) fn encode_any_value(
163169

164170
Ok(())
165171
}
172+
173+
#[cfg(test)]
174+
mod test {
175+
use super::*;
176+
177+
use arrow::array::RecordBatch;
178+
use arrow::datatypes::Schema;
179+
use prost::Message;
180+
use std::sync::Arc;
181+
182+
use crate::encode::record::attributes::AnyValuesRecordsBuilder;
183+
use crate::otlp::ProtoBuffer;
184+
use crate::otlp::common::AnyValueArrays;
185+
use crate::proto::opentelemetry::common::v1::{AnyValue, ArrayValue};
186+
187+
#[test]
188+
fn test_default_anyvalue_encoded_when_column_missing() {
189+
// append a bunch of "default" values
190+
let mut rb_builder = AnyValuesRecordsBuilder::new();
191+
rb_builder.append_str(b"");
192+
rb_builder.append_bytes(b"");
193+
rb_builder.append_double(0.0);
194+
rb_builder.append_int(0);
195+
196+
// TODO include test cases for bool once we've corrected the default value handling:
197+
// https://github.com/open-telemetry/otel-arrow/issues/1449
198+
199+
let mut fields = vec![];
200+
let mut columns = vec![];
201+
rb_builder.finish(&mut columns, &mut fields).unwrap();
202+
let schema = Arc::new(Schema::new(fields));
203+
let rb = RecordBatch::try_new(schema, columns).unwrap();
204+
205+
// assert the columns are not present
206+
assert!(
207+
rb.column_by_name(consts::ATTRIBUTE_BYTES).is_none(),
208+
"Bytes column should be omitted"
209+
);
210+
assert!(
211+
rb.column_by_name(consts::ATTRIBUTE_STR).is_none(),
212+
"Str column should be omitted"
213+
);
214+
assert!(
215+
rb.column_by_name(consts::ATTRIBUTE_INT).is_none(),
216+
"Int column should be omitted"
217+
);
218+
assert!(
219+
rb.column_by_name(consts::ATTRIBUTE_DOUBLE).is_none(),
220+
"Double column should be omitted"
221+
);
222+
223+
let any_val_arrays = AnyValueArrays::try_from(&rb).unwrap();
224+
let mut protobuf = ProtoBuffer::new();
225+
226+
for i in 0..rb.num_rows() {
227+
if let Some(value_type) = any_val_arrays.attr_type.value_at(i) {
228+
if let Ok(value_type) = AttributeValueType::try_from(value_type) {
229+
proto_encode_len_delimited_unknown_size!(
230+
1, // the values field in ArrayValue message
231+
encode_any_value(&any_val_arrays, i, value_type, &mut protobuf).unwrap(),
232+
&mut protobuf
233+
);
234+
}
235+
}
236+
}
237+
238+
let results = ArrayValue::decode(protobuf.as_ref()).unwrap().values;
239+
let expected = vec![
240+
AnyValue::new_string(""),
241+
AnyValue::new_bytes(b""),
242+
AnyValue::new_double(0.0),
243+
AnyValue::new_int(0),
244+
];
245+
246+
assert_eq!(results, expected);
247+
}
248+
}

rust/otap-dataflow/crates/pdata/src/otlp/common.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -969,6 +969,11 @@ mod test {
969969
test_logs_round_trip(logs_with_empty_log_records());
970970
}
971971

972+
#[test]
973+
fn test_logs_with_body_empty_string() {
974+
test_logs_round_trip(logs_with_body_empty_string());
975+
}
976+
972977
//
973978
// Traces Tests
974979
//

rust/otap-dataflow/crates/pdata/src/testing/fixtures.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,20 @@ pub fn logs_with_empty_log_records() -> LogsData {
160160
)])
161161
}
162162

163+
/// LogRecord whose body is an empty string
164+
#[must_use]
165+
pub fn logs_with_body_empty_string() -> LogsData {
166+
LogsData::new(vec![ResourceLogs::new(
167+
Resource::build().finish(),
168+
vec![ScopeLogs::new(
169+
InstrumentationScope::build()
170+
.name("scope".to_string())
171+
.finish(),
172+
vec![LogRecord::build().body(AnyValue::new_string("")).finish()],
173+
)],
174+
)])
175+
}
176+
163177
/// Multiple log records with no resource
164178
#[must_use]
165179
pub fn logs_multiple_records_no_resource() -> LogsData {

0 commit comments

Comments
 (0)