Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 105 additions & 22 deletions rust/otap-dataflow/crates/pdata/src/otlp/attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,16 @@ pub(crate) fn encode_any_value(
) -> Result<()> {
match value_type {
AttributeValueType::Str => {
if let Some(attr_str) = &attr_arrays.attr_str {
if let Some(val) = attr_str.str_at(index) {
result_buf.encode_string(ANY_VALUE_STRING_VALUE, val);
}
}
let val = attr_arrays
.attr_str
.as_ref()
.and_then(|col| col.str_at(index))
.unwrap_or_default();
result_buf.encode_string(ANY_VALUE_STRING_VALUE, val);
}
AttributeValueType::Bool => {
// TODO handle case when bool column is missing we correct the default value handling
// https://github.com/open-telemetry/otel-arrow/issues/1449
if let Some(attr_bool) = &attr_arrays.attr_bool {
if let Some(val) = attr_bool.value_at(index) {
result_buf.encode_field_tag(ANY_VALUE_BOOL_VALUE, wire_types::VARINT);
Expand All @@ -127,27 +130,30 @@ pub(crate) fn encode_any_value(
}
}
AttributeValueType::Int => {
if let Some(attr_int) = &attr_arrays.attr_int {
if let Some(val) = attr_int.value_at(index) {
result_buf.encode_field_tag(ANY_VALUE_INT_VALUE, wire_types::VARINT);
result_buf.encode_varint(val as u64);
}
}
let val = attr_arrays
.attr_int
.as_ref()
.and_then(|col| col.value_at(index))
.unwrap_or_default();
result_buf.encode_field_tag(ANY_VALUE_INT_VALUE, wire_types::VARINT);
result_buf.encode_varint(val as u64);
}
AttributeValueType::Double => {
if let Some(attr_double) = &attr_arrays.attr_double {
if let Some(val) = attr_double.value_at(index) {
result_buf.encode_field_tag(ANY_VALUE_DOUBLE_VALUE, wire_types::FIXED64);
result_buf.extend_from_slice(&val.to_le_bytes());
}
}
let val = attr_arrays
.attr_double
.as_ref()
.and_then(|col| col.value_at(index))
.unwrap_or_default();
result_buf.encode_field_tag(ANY_VALUE_DOUBLE_VALUE, wire_types::FIXED64);
result_buf.extend_from_slice(&val.to_le_bytes());
}
AttributeValueType::Bytes => {
if let Some(attr_bytes) = &attr_arrays.attr_bytes {
if let Some(val) = attr_bytes.slice_at(index) {
result_buf.encode_bytes(ANY_VALUE_BYTES_VALUE, val);
}
}
let val = attr_arrays
.attr_bytes
.as_ref()
.and_then(|col| col.slice_at(index))
.unwrap_or_default();
result_buf.encode_bytes(ANY_VALUE_BYTES_VALUE, val);
}
AttributeValueType::Map | AttributeValueType::Slice => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need to do the same for these types?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect yes.

I would say this follows from https://protobuf.dev/programming-guides/proto3/
because AnyValue is a oneof field

If you set a oneof field to the default value (such as setting an int32 oneof field to 0), the “case” of that oneof field will be set, and the value will be serialized on the wire.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to handle this the same way we do for the primitive types.

This attr_ser column is an optional binary column, which OTAP encoding will omit if either all values are null, or they're all zero-length byte arrays.

But for maps and lists, what ends up in the attr_ser array is cbor encoded. Even a default (empty) map or list will still be serialized as a non zero length value with this encoding scheme.

For example:

let log_rec = LogsData {
    resource_logs: vec![ResourceLogs {
        scope_logs: vec![ScopeLogs {
            log_records: vec![
                LogRecord {
                    body: Some(AnyValue::new_array(ArrayValue::default())),
                    ..Default::default()
                },
                LogRecord {
                    body: Some(AnyValue::new_kvlist(KeyValueList::default())),
                    ..Default::default()
                }
            ],
            ..Default::default()
        }],
        ..Default::default()
    }],
    ..Default::default()
};

let otap_batch = encode_logs(&log_rec);
let logs_rb = otap_batch.get(ArrowPayloadType::Logs).unwrap();
arrow::util::pretty::print_batches(&[logs_rb.clone()]).unwrap();

prints:

+----------+---------+---------------------+-------------------------+----------------------+
| resource | scope   | time_unix_nano      | observed_time_unix_nano | body                 |
+----------+---------+---------------------+-------------------------+----------------------+
| {id: 0}  | {id: 0} | 1970-01-01T00:00:00 | 1970-01-01T00:00:00     | {type: 6, ser: 9fff} |
| {id: 0}  | {id: 0} | 1970-01-01T00:00:00 | 1970-01-01T00:00:00     | {type: 5, ser: bfff} |
+----------+---------+---------------------+-------------------------+----------------------+

It would actually be unusual if the attribute type column indicated the the value type should be map/list and then the column wasn't present in the attributes record batch, so I think we should just keep the current logic for these types and let it return null if it happens.

if let Some(ser_bytes) = &attr_arrays.attr_ser {
Expand All @@ -163,3 +169,80 @@ pub(crate) fn encode_any_value(

Ok(())
}

#[cfg(test)]
mod test {
use super::*;

use arrow::array::RecordBatch;
use arrow::datatypes::Schema;
use prost::Message;
use std::sync::Arc;

use crate::encode::record::attributes::AnyValuesRecordsBuilder;
use crate::otlp::ProtoBuffer;
use crate::otlp::common::AnyValueArrays;
use crate::proto::opentelemetry::common::v1::{AnyValue, ArrayValue};

#[test]
fn test_default_anyvalue_encoded_when_column_missing() {
// append a bunch of "default" values
let mut rb_builder = AnyValuesRecordsBuilder::new();
rb_builder.append_str(b"");
rb_builder.append_bytes(b"");
rb_builder.append_double(0.0);
rb_builder.append_int(0);

// TODO include test cases for bool once we've corrected the default value handling:
// https://github.com/open-telemetry/otel-arrow/issues/1449

let mut fields = vec![];
let mut columns = vec![];
rb_builder.finish(&mut columns, &mut fields).unwrap();
let schema = Arc::new(Schema::new(fields));
let rb = RecordBatch::try_new(schema, columns).unwrap();

// assert the columns are not present
assert!(
rb.column_by_name(consts::ATTRIBUTE_BYTES).is_none(),
"Bytes column should be omitted"
);
assert!(
rb.column_by_name(consts::ATTRIBUTE_STR).is_none(),
"Str column should be omitted"
);
assert!(
rb.column_by_name(consts::ATTRIBUTE_INT).is_none(),
"Int column should be omitted"
);
assert!(
rb.column_by_name(consts::ATTRIBUTE_DOUBLE).is_none(),
"Double column should be omitted"
);

let any_val_arrays = AnyValueArrays::try_from(&rb).unwrap();
let mut protobuf = ProtoBuffer::new();

for i in 0..rb.num_rows() {
if let Some(value_type) = any_val_arrays.attr_type.value_at(i) {
if let Ok(value_type) = AttributeValueType::try_from(value_type) {
proto_encode_len_delimited_unknown_size!(
1, // the values field in ArrayValue message
encode_any_value(&any_val_arrays, i, value_type, &mut protobuf).unwrap(),
&mut protobuf
);
}
}
}

let results = ArrayValue::decode(protobuf.as_ref()).unwrap().values;
let expected = vec![
AnyValue::new_string(""),
AnyValue::new_bytes(b""),
AnyValue::new_double(0.0),
AnyValue::new_int(0),
];

assert_eq!(results, expected);
}
}
5 changes: 5 additions & 0 deletions rust/otap-dataflow/crates/pdata/src/otlp/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -969,6 +969,11 @@ mod test {
test_logs_round_trip(logs_with_empty_log_records());
}

#[test]
fn test_logs_with_body_empty_string() {
test_logs_round_trip(logs_with_body_empty_string());
}

//
// Traces Tests
//
Expand Down
14 changes: 14 additions & 0 deletions rust/otap-dataflow/crates/pdata/src/testing/fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,20 @@ pub fn logs_with_empty_log_records() -> LogsData {
)])
}

/// LogRecord whose body is an empty string
#[must_use]
pub fn logs_with_body_empty_string() -> LogsData {
LogsData::new(vec![ResourceLogs::new(
Resource::build().finish(),
vec![ScopeLogs::new(
InstrumentationScope::build()
.name("scope".to_string())
.finish(),
vec![LogRecord::build().body(AnyValue::new_string("")).finish()],
)],
)])
}

/// Multiple log records with no resource
#[must_use]
pub fn logs_multiple_records_no_resource() -> LogsData {
Expand Down
Loading