Skip to content

Commit 306c2f6

Browse files
authored
Merge branch 'main' into cijothomas/perf-k8s
2 parents f07e645 + ba4102a commit 306c2f6

File tree

19 files changed

+698
-282
lines changed

19 files changed

+698
-282
lines changed

rust/otel-arrow-rust/src/arrays.rs

Lines changed: 139 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@ use crate::error::{self, InvalidListArraySnafu};
1414
use arrow::array::{
1515
Array, ArrayRef, ArrowPrimitiveType, BinaryArray, BooleanArray, DictionaryArray,
1616
FixedSizeBinaryArray, Float32Array, Float64Array, Int8Array, Int16Array, Int32Array,
17-
Int64Array, PrimitiveArray, RecordBatch, StringArray, TimestampNanosecondArray, UInt8Array,
18-
UInt16Array, UInt32Array, UInt64Array,
17+
Int64Array, PrimitiveArray, RecordBatch, StringArray, StructArray, TimestampNanosecondArray,
18+
UInt8Array, UInt16Array, UInt32Array, UInt64Array,
19+
};
20+
use arrow::datatypes::{
21+
ArrowDictionaryKeyType, ArrowNativeType, DataType, TimeUnit, UInt8Type, UInt16Type,
1922
};
20-
use arrow::datatypes::{ArrowDictionaryKeyType, TimeUnit};
21-
use arrow::datatypes::{ArrowNativeType, DataType, UInt8Type, UInt16Type};
2223
use paste::paste;
2324
use snafu::{OptionExt, ensure};
2425

@@ -139,9 +140,7 @@ macro_rules! impl_downcast {
139140

140141
pub fn [<get_ $suffix _array> ]<'a>(rb: &'a RecordBatch, name: &str) -> error::Result<&'a $array_type> {
141142
use arrow::datatypes::DataType::*;
142-
let arr = rb.column_by_name(name).context(error::ColumnNotFoundSnafu {
143-
name,
144-
})?;
143+
let arr = get_required_array(rb, name)?;
145144

146145
arr.as_any()
147146
.downcast_ref::<$array_type>()
@@ -177,6 +176,17 @@ impl_downcast!(
177176
TimestampNanosecondArray
178177
);
179178

179+
/// Get reference to array that the caller requires to be in the record batch.
180+
/// If the column is not in the record batch, returns `ColumnNotFound` error
181+
pub fn get_required_array<'a>(
182+
record_batch: &'a RecordBatch,
183+
column_name: &str,
184+
) -> error::Result<&'a ArrayRef> {
185+
record_batch
186+
.column_by_name(column_name)
187+
.context(error::ColumnNotFoundSnafu { name: column_name })
188+
}
189+
180190
trait NullableInt64ArrayAccessor {
181191
fn i64_at(&self, idx: usize) -> error::Result<Option<i64>>;
182192
}
@@ -243,6 +253,13 @@ pub enum ByteArrayAccessor<'a> {
243253
}
244254

245255
impl<'a> ByteArrayAccessor<'a> {
256+
pub fn try_new_for_column(
257+
record_batch: &'a RecordBatch,
258+
column_name: &str,
259+
) -> error::Result<Self> {
260+
Self::try_new(get_required_array(record_batch, column_name)?)
261+
}
262+
246263
pub fn try_new(arr: &'a ArrayRef) -> error::Result<Self> {
247264
match arr.data_type() {
248265
DataType::Binary => {
@@ -339,7 +356,11 @@ where
339356
fn try_new_with_datatype(data_type: DataType, arr: &'a ArrayRef) -> error::Result<Self> {
340357
// if the type isn't a dictionary, we treat it as an unencoded array
341358
if *arr.data_type() == data_type {
342-
return Ok(Self::Native(arr.as_any().downcast_ref::<T>().unwrap()));
359+
return Ok(Self::Native(
360+
arr.as_any()
361+
.downcast_ref::<T>()
362+
.expect("array can be downcast to it's native datatype"),
363+
));
343364
}
344365

345366
// determine if the type is a dictionary where the value is the desired datatype
@@ -356,13 +377,13 @@ where
356377
DataType::UInt8 => Self::Dictionary8(DictionaryArrayAccessor::new(
357378
arr.as_any()
358379
.downcast_ref::<DictionaryArray<UInt8Type>>()
359-
.unwrap(),
360-
)),
380+
.expect("array can be downcast to DictionaryArray<UInt8Type"),
381+
)?),
361382
DataType::UInt16 => Self::Dictionary16(DictionaryArrayAccessor::new(
362383
arr.as_any()
363384
.downcast_ref::<DictionaryArray<UInt16Type>>()
364-
.unwrap(),
365-
)),
385+
.expect("array can be downcast to DictionaryArray<UInt16Type>"),
386+
)?),
366387
_ => {
367388
return error::UnsupportedDictionaryKeyTypeSnafu {
368389
expect_oneof: vec![DataType::UInt8, DataType::UInt16],
@@ -394,6 +415,13 @@ where
394415
pub fn try_new(arr: &'a ArrayRef) -> error::Result<Self> {
395416
Self::try_new_with_datatype(V::DATA_TYPE, arr)
396417
}
418+
419+
pub fn try_new_for_column(
420+
record_batch: &'a RecordBatch,
421+
column_name: &str,
422+
) -> error::Result<Self> {
423+
Self::try_new(get_required_array(record_batch, column_name)?)
424+
}
397425
}
398426

399427
impl<'a> MaybeDictArrayAccessor<'a, BinaryArray> {
@@ -412,6 +440,13 @@ impl<'a> MaybeDictArrayAccessor<'a, StringArray> {
412440
pub fn try_new(arr: &'a ArrayRef) -> error::Result<Self> {
413441
Self::try_new_with_datatype(StringArray::DATA_TYPE, arr)
414442
}
443+
444+
pub fn try_new_for_column(
445+
record_batch: &'a RecordBatch,
446+
column_name: &str,
447+
) -> error::Result<Self> {
448+
Self::try_new(get_required_array(record_batch, column_name)?)
449+
}
415450
}
416451

417452
pub type Int32ArrayAccessor<'a> = MaybeDictArrayAccessor<'a, Int32Array>;
@@ -431,22 +466,109 @@ where
431466
K: ArrowDictionaryKeyType,
432467
V: Array + NullableArrayAccessor + 'static,
433468
{
434-
pub fn new(a: &'a DictionaryArray<K>) -> Self {
435-
let dict = a.as_any().downcast_ref::<DictionaryArray<K>>().unwrap();
436-
let value = dict.values().as_any().downcast_ref::<V>().unwrap();
437-
Self { inner: dict, value }
469+
pub fn new(dict: &'a DictionaryArray<K>) -> error::Result<Self> {
470+
let value = dict
471+
.values()
472+
.as_any()
473+
.downcast_ref::<V>()
474+
.with_context(|| error::InvalidListArraySnafu {
475+
expect_oneof: Vec::new(),
476+
actual: dict.values().data_type().clone(),
477+
})?;
478+
Ok(Self { inner: dict, value })
438479
}
439480

440481
pub fn value_at(&self, idx: usize) -> Option<V::Native> {
441482
if self.inner.is_valid(idx) {
442-
let offset = self.inner.key(idx).unwrap();
483+
let offset = self
484+
.inner
485+
.key(idx)
486+
.expect("dictionary should be valid at index");
443487
self.value.value_at(offset)
444488
} else {
445489
None
446490
}
447491
}
448492
}
449493

494+
/// Helper for accessing columns of a struct array
495+
///
496+
/// Methods return various errors into this crate's Error type if
497+
/// if callers requirments for the struct columns are not met (for
498+
/// example `ColumnDataTypeMismatchSnafu`)
499+
pub struct StructColumnAccessor<'a> {
500+
inner: &'a StructArray,
501+
}
502+
503+
impl<'a> StructColumnAccessor<'a> {
504+
pub fn new(arr: &'a StructArray) -> Self {
505+
Self { inner: arr }
506+
}
507+
508+
pub fn primitive_column<T: ArrowPrimitiveType + 'static>(
509+
&self,
510+
column_name: &str,
511+
) -> error::Result<&'a PrimitiveArray<T>> {
512+
self.primitive_column_op(column_name)?
513+
.with_context(|| error::ColumnNotFoundSnafu {
514+
name: column_name.to_string(),
515+
})
516+
}
517+
518+
pub fn primitive_column_op<T: ArrowPrimitiveType + 'static>(
519+
&self,
520+
column_name: &str,
521+
) -> error::Result<Option<&'a PrimitiveArray<T>>> {
522+
self.inner
523+
.column_by_name(column_name)
524+
.map(|arr| {
525+
arr.as_any()
526+
.downcast_ref::<PrimitiveArray<T>>()
527+
.with_context(|| error::ColumnDataTypeMismatchSnafu {
528+
name: column_name.to_string(),
529+
expect: T::DATA_TYPE,
530+
actual: arr.data_type().clone(),
531+
})
532+
})
533+
.transpose()
534+
}
535+
536+
pub fn bool_column_op(&self, column_name: &str) -> error::Result<Option<&'a BooleanArray>> {
537+
self.inner
538+
.column_by_name(column_name)
539+
.map(|arr| {
540+
arr.as_any()
541+
.downcast_ref()
542+
.with_context(|| error::ColumnDataTypeMismatchSnafu {
543+
name: column_name.to_string(),
544+
expect: DataType::Boolean,
545+
actual: arr.data_type().clone(),
546+
})
547+
})
548+
.transpose()
549+
}
550+
551+
pub fn string_column_op(
552+
&self,
553+
column_name: &str,
554+
) -> error::Result<Option<StringArrayAccessor<'a>>> {
555+
self.inner
556+
.column_by_name(column_name)
557+
.map(StringArrayAccessor::try_new)
558+
.transpose()
559+
}
560+
561+
pub fn byte_array_column_op(
562+
&self,
563+
column_name: &str,
564+
) -> error::Result<Option<ByteArrayAccessor<'a>>> {
565+
self.inner
566+
.column_by_name(column_name)
567+
.map(ByteArrayAccessor::try_new)
568+
.transpose()
569+
}
570+
}
571+
450572
#[cfg(test)]
451573
mod tests {
452574
use crate::arrays::{NullableArrayAccessor, StringArrayAccessor};

rust/otel-arrow-rust/src/decode/decoder.rs

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212

1313
use crate::decode::record_message::RecordMessage;
1414
use crate::error;
15-
use crate::otlp::metric::metrics_from;
16-
use crate::otlp::related_data::RelatedData;
15+
use crate::otlp::logs::logs_from;
16+
use crate::otlp::logs::related_data::RelatedData as LogsRelatedData;
17+
use crate::otlp::metrics::{metrics_from, related_data::RelatedData as MetricsRelatedData};
1718
use crate::proto::opentelemetry::arrow::v1::{ArrowPayload, ArrowPayloadType, BatchArrowRecords};
19+
use crate::proto::opentelemetry::collector::logs::v1::ExportLogsServiceRequest;
1820
use crate::proto::opentelemetry::collector::metrics::v1::ExportMetricsServiceRequest;
1921
use arrow::array::RecordBatch;
2022
use arrow::error::ArrowError;
@@ -107,40 +109,55 @@ impl Consumer {
107109
&mut self,
108110
records: &mut BatchArrowRecords,
109111
) -> error::Result<ExportMetricsServiceRequest> {
110-
ensure!(!records.arrow_payloads.is_empty(), error::EmptyBatchSnafu);
111-
112-
let main_record_type = records.arrow_payloads[0].r#type;
113-
let payload_type = ArrowPayloadType::try_from(main_record_type).map_err(|_| {
114-
error::UnsupportedPayloadTypeSnafu {
115-
actual: main_record_type,
116-
}
117-
.build()
118-
})?;
119-
match payload_type {
112+
match get_main_payload_type(records)? {
120113
ArrowPayloadType::UnivariateMetrics => {
121114
let record_message = self.consume_bar(records)?;
122115
let (mut related_data, metric_record) =
123-
RelatedData::from_record_messages(&record_message)?;
116+
MetricsRelatedData::from_record_messages(&record_message)?;
124117
let metric_rec_idx = metric_record.context(error::MetricRecordNotFoundSnafu)?;
125118
metrics_from(&record_message[metric_rec_idx].record, &mut related_data)
126119
}
127-
128-
ArrowPayloadType::Logs => error::UnsupportedPayloadTypeSnafu {
120+
main_record_type => error::UnsupportedPayloadTypeSnafu {
129121
actual: main_record_type,
130122
}
131123
.fail(),
132-
ArrowPayloadType::Spans => error::UnsupportedPayloadTypeSnafu {
133-
actual: main_record_type,
124+
}
125+
}
126+
127+
pub fn consume_logs_batches(
128+
&mut self,
129+
records: &mut BatchArrowRecords,
130+
) -> error::Result<ExportLogsServiceRequest> {
131+
match get_main_payload_type(records)? {
132+
ArrowPayloadType::Logs => {
133+
let record_message = self.consume_bar(records)?;
134+
let (mut related_data, log_record) =
135+
LogsRelatedData::from_record_messages(&record_message)?;
136+
let log_rec_idx = log_record.context(error::LogRecordNotFoundSnafu)?;
137+
logs_from(&record_message[log_rec_idx].record, &mut related_data)
134138
}
135-
.fail(),
136-
_ => error::UnsupportedPayloadTypeSnafu {
139+
main_record_type => error::UnsupportedPayloadTypeSnafu {
137140
actual: main_record_type,
138141
}
139142
.fail(),
140143
}
141144
}
142145
}
143146

147+
/// Get the main logs, metrics, or traces from a received BatchArrowRecords message.
148+
fn get_main_payload_type(records: &BatchArrowRecords) -> error::Result<ArrowPayloadType> {
149+
ensure!(!records.arrow_payloads.is_empty(), error::EmptyBatchSnafu);
150+
151+
// Per the specification, the main record type is the first payload
152+
let main_record_type = records.arrow_payloads[0].r#type;
153+
ArrowPayloadType::try_from(main_record_type).map_err(|_| {
154+
error::UnsupportedPayloadTypeSnafu {
155+
actual: main_record_type,
156+
}
157+
.build()
158+
})
159+
}
160+
144161
#[cfg(test)]
145162
mod tests {
146163
use crate::test_util::{create_record_batch, create_test_schema};

rust/otel-arrow-rust/src/error.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
// limitations under the License.
1212

1313
use crate::otlp::attributes::store::AttributeValueType;
14-
use crate::otlp::metric::MetricType;
14+
use crate::otlp::metrics::MetricType;
1515
use arrow::datatypes::DataType;
1616
use arrow::error::ArrowError;
1717
use num_enum::TryFromPrimitiveError;
@@ -137,6 +137,12 @@ pub enum Error {
137137
location: Location,
138138
},
139139

140+
#[snafu(display("Log record not found"))]
141+
LogRecordNotFound {
142+
#[snafu(implicit)]
143+
location: Location,
144+
},
145+
140146
#[snafu(display("Metric record not found"))]
141147
MetricRecordNotFound {
142148
#[snafu(implicit)]

rust/otel-arrow-rust/src/otlp.rs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,8 @@
1010
// See the License for the specific language governing permissions and
1111
// limitations under the License.
1212

13-
mod exemplar;
14-
pub mod metric;
15-
1613
pub mod attributes;
17-
pub mod data_points;
18-
pub mod related_data;
14+
pub mod logs;
15+
pub mod metrics;
16+
17+
mod common;

rust/otel-arrow-rust/src/otlp/attributes/store.rs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@
1111
// limitations under the License.
1212

1313
use crate::arrays::{
14-
Int64ArrayAccessor, MaybeDictArrayAccessor, NullableArrayAccessor, StringArrayAccessor,
15-
get_binary_array_opt, get_bool_array_opt, get_f64_array_opt, get_u8_array,
14+
ByteArrayAccessor, Int64ArrayAccessor, MaybeDictArrayAccessor, NullableArrayAccessor,
15+
StringArrayAccessor, get_bool_array_opt, get_f64_array_opt, get_u8_array,
1616
};
1717
use crate::error;
1818
use crate::otlp::attributes::parent_id::ParentId;
@@ -80,22 +80,11 @@ where
8080
.transpose()?;
8181
let value_type_arr = get_u8_array(rb, consts::ATTRIBUTE_TYPE)?;
8282

83-
let value_str_arr = StringArrayAccessor::try_new(
84-
rb.column_by_name(consts::ATTRIBUTE_STR)
85-
.context(error::ColumnNotFoundSnafu {
86-
name: consts::ATTRIBUTE_STR,
87-
})?,
88-
)?;
89-
90-
let value_int_arr = Int64ArrayAccessor::try_new(
91-
rb.column_by_name(consts::ATTRIBUTE_INT)
92-
.context(error::ColumnNotFoundSnafu {
93-
name: consts::ATTRIBUTE_STR,
94-
})?,
95-
)?;
83+
let value_str_arr = StringArrayAccessor::try_new_for_column(rb, consts::ATTRIBUTE_STR)?;
84+
let value_int_arr = Int64ArrayAccessor::try_new_for_column(rb, consts::ATTRIBUTE_INT)?;
9685
let value_double_arr = get_f64_array_opt(rb, consts::ATTRIBUTE_DOUBLE)?;
9786
let value_bool_arr = get_bool_array_opt(rb, consts::ATTRIBUTE_BOOL)?;
98-
let value_bytes_arr = get_binary_array_opt(rb, consts::ATTRIBUTE_BYTES)?;
87+
let value_bytes_arr = ByteArrayAccessor::try_new_for_column(rb, consts::ATTRIBUTE_BYTES)?;
9988

10089
for idx in 0..rb.num_rows() {
10190
let key = key_arr.value_at_or_default(idx);

0 commit comments

Comments
 (0)