Skip to content

Commit 821aed3

Browse files
committed
Shared client and server gRPC settings
1 parent 95f0131 commit 821aed3

File tree

10 files changed

+877
-13
lines changed

10 files changed

+877
-13
lines changed

rust/otap-dataflow/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ weaver_resolved_schema = { git = "https://github.com/open-telemetry/weaver.git",
133133
weaver_resolver = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.17.0"}
134134
weaver_semconv = { git = "https://github.com/open-telemetry/weaver.git", tag = "v0.17.0"}
135135
zip = "=4.2.0"
136+
byte-unit = "5.2.0"
136137

137138
# Azure Monnitor Exporter
138139
azure_identity = "0.30.0"
@@ -230,4 +231,3 @@ inherits = "release"
230231
debug = true # Or 2 for full debug info
231232
strip = "none" # Keep symbols and debug info
232233
panic = "unwind"
233-

rust/otap-dataflow/crates/config/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ serde_yaml = { workspace = true }
2020
miette = { workspace = true }
2121
urn = { workspace = true }
2222
schemars = { workspace = true }
23+
byte-unit = { workspace = true }
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Support for byte units like "KB / KiB", "MB / MiB", "GB / GiB" in configuration files.
5+
6+
use byte_unit::Byte;
7+
use serde::de::Error as DeError;
8+
use serde::{Deserialize, Deserializer};
9+
10+
#[derive(Deserialize)]
11+
#[serde(untagged)]
12+
enum Value {
13+
Number(u64),
14+
String(String),
15+
}
16+
17+
/// Deserialize an optional byte size that can be specified either as a number (in bytes)
18+
/// or as a string with units (e.g. "1 KB", "2 MiB").
19+
pub fn deserialize<'de, D>(deserializer: D) -> Result<Option<u32>, D::Error>
20+
where
21+
D: Deserializer<'de>,
22+
{
23+
let value = Option::<Value>::deserialize(deserializer)?;
24+
let Some(value) = value else {
25+
return Ok(None);
26+
};
27+
28+
let (bytes, repr) = match value {
29+
Value::Number(value) => (value as u128, value.to_string()),
30+
Value::String(text) => {
31+
let parsed: Byte = text.parse().map_err(DeError::custom)?;
32+
(parsed.as_u64() as u128, text)
33+
}
34+
};
35+
36+
if bytes > u32::MAX as u128 {
37+
return Err(DeError::custom(format!(
38+
"byte size '{}' ({} bytes) exceeds u32::MAX ({} bytes)",
39+
repr,
40+
bytes,
41+
u32::MAX
42+
)));
43+
}
44+
45+
Ok(Some(bytes as u32))
46+
}
47+
48+
#[cfg(test)]
49+
mod tests {
50+
use super::deserialize;
51+
use serde::Deserialize;
52+
53+
#[derive(Debug, Deserialize)]
54+
struct Holder {
55+
#[serde(default, deserialize_with = "deserialize")]
56+
value: Option<u32>,
57+
}
58+
59+
fn de_yaml(input: &str) -> Result<Holder, serde_yaml::Error> {
60+
serde_yaml::from_str::<Holder>(input)
61+
}
62+
63+
#[test]
64+
fn parses_number_as_bytes() {
65+
let cfg = de_yaml("value: 1024").expect("should parse numeric bytes");
66+
assert_eq!(cfg.value, Some(1024));
67+
}
68+
69+
#[test]
70+
fn parses_string_with_iec_units() {
71+
// 1 KiB == 1024 bytes
72+
let cfg = de_yaml("value: 1 KiB").expect("should parse 1 KiB");
73+
assert_eq!(cfg.value, Some(1024));
74+
75+
// 2 MiB == 2 * 1024 * 1024 bytes
76+
let cfg = de_yaml("value: '2 MiB'").expect("should parse 2 MiB");
77+
assert_eq!(cfg.value, Some(2 * 1024 * 1024));
78+
}
79+
80+
#[test]
81+
fn parses_plain_string_number() {
82+
let cfg = de_yaml("value: '2048'").expect("should parse plain numeric string");
83+
assert_eq!(cfg.value, Some(2048));
84+
}
85+
86+
#[test]
87+
fn missing_value_is_none() {
88+
let cfg = de_yaml("{}").expect("should parse with missing field as None");
89+
assert_eq!(cfg.value, None);
90+
}
91+
92+
#[test]
93+
fn overflow_is_rejected() {
94+
// 4 GiB == 4 * 1024^3 bytes = 4_294_967_296 > u32::MAX (4_294_967_295)
95+
let err = de_yaml("value: 4 GiB").expect_err("should error for overflow");
96+
let msg = err.to_string();
97+
assert!(
98+
msg.contains("exceeds u32::MAX"),
99+
"unexpected error: {}",
100+
msg
101+
);
102+
}
103+
104+
#[test]
105+
fn parses_no_space_decimal_units() {
106+
let cfg = de_yaml("value: 1KB").expect("should parse 1KB without space");
107+
assert_eq!(cfg.value, Some(1000));
108+
109+
let cfg = de_yaml("value: 10MB").expect("should parse 10MB without space");
110+
assert_eq!(cfg.value, Some(10_000_000));
111+
112+
// Lowercase 'b' should still be treated as bytes per crate behavior
113+
let cfg = de_yaml("value: 1kb").expect("should parse 1kb as 1000 bits => 125 bytes");
114+
assert_eq!(cfg.value, Some(125));
115+
}
116+
117+
#[test]
118+
fn parses_fractional_values_and_rounding() {
119+
// Decimal unit with fraction
120+
let cfg = de_yaml("value: '1.5 MB'").expect("should parse 1.5 MB");
121+
assert_eq!(cfg.value, Some(1_500_000));
122+
123+
// Binary unit with fraction (exact)
124+
let cfg = de_yaml("value: '0.5 KiB'").expect("should parse 0.5 KiB to 512 bytes");
125+
assert_eq!(cfg.value, Some(512));
126+
}
127+
}

rust/otap-dataflow/crates/config/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
use serde::{Deserialize, Serialize};
1616
use std::borrow::Cow;
1717

18+
pub mod byte_units;
1819
pub mod engine;
1920
pub mod error;
2021
pub mod health;

rust/otap-dataflow/crates/otap/src/compression.rs

Lines changed: 95 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
//!
5-
//! Defines a compression enum to abstract from tonic and allows the exporter and receiver to get the respective tonic equivalent
6-
//!
4+
//! Defines a compression enum to abstract from tonic and allows the exporter and receiver to get
5+
//! the respective tonic equivalent.
76
8-
use serde::{Deserialize, Serialize};
7+
use serde::{Deserialize, Deserializer, Serialize};
98
use tonic::codec::CompressionEncoding;
109

1110
/// Enum to represent various compression methods
12-
#[derive(Debug, Clone, Serialize, Deserialize)]
11+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
1312
#[serde(rename_all = "snake_case")]
1413
pub enum CompressionMethod {
1514
/// Fastest compression
@@ -26,16 +25,73 @@ impl CompressionMethod {
2625
#[must_use]
2726
pub const fn map_to_compression_encoding(&self) -> CompressionEncoding {
2827
match *self {
29-
CompressionMethod::Gzip => CompressionEncoding::Gzip,
3028
CompressionMethod::Zstd => CompressionEncoding::Zstd,
29+
CompressionMethod::Gzip => CompressionEncoding::Gzip,
3130
CompressionMethod::Deflate => CompressionEncoding::Deflate,
3231
}
3332
}
3433
}
3534

35+
/// Default set of compression methods that are accepted when no configuration is provided.
36+
pub const DEFAULT_COMPRESSION_METHODS: [CompressionMethod; 3] = [
37+
CompressionMethod::Zstd,
38+
CompressionMethod::Gzip,
39+
CompressionMethod::Deflate,
40+
];
41+
42+
#[derive(Deserialize)]
43+
#[serde(untagged)]
44+
enum CompressionConfigValue {
45+
Single(CompressionMethod),
46+
List(Vec<CompressionMethod>),
47+
NoneKeyword(CompressionNone),
48+
}
49+
50+
#[derive(Deserialize)]
51+
#[serde(rename_all = "snake_case")]
52+
enum CompressionNone {
53+
None,
54+
}
55+
56+
/// Deserializer that accepts either a single compression method, a list, or the string `"none"`.
57+
/// Absence of the field keeps the default behaviour (all methods).
58+
pub fn deserialize_compression_methods<'de, D>(
59+
deserializer: D,
60+
) -> Result<Option<Vec<CompressionMethod>>, D::Error>
61+
where
62+
D: Deserializer<'de>,
63+
{
64+
let value = Option::<CompressionConfigValue>::deserialize(deserializer)?;
65+
let Some(value) = value else {
66+
return Ok(None);
67+
};
68+
69+
let methods = match value {
70+
CompressionConfigValue::Single(method) => vec![method],
71+
CompressionConfigValue::List(methods) => methods,
72+
CompressionConfigValue::NoneKeyword(CompressionNone::None) => Vec::new(),
73+
};
74+
75+
let mut deduped = Vec::with_capacity(methods.len());
76+
for method in methods {
77+
if !deduped.contains(&method) {
78+
deduped.push(method);
79+
}
80+
}
81+
82+
Ok(Some(deduped))
83+
}
84+
3685
#[cfg(test)]
3786
mod tests {
3887
use super::*;
88+
use serde::Deserialize;
89+
90+
#[derive(Debug, Deserialize)]
91+
struct ConfWithCompression {
92+
#[serde(default, deserialize_with = "deserialize_compression_methods")]
93+
methods: Option<Vec<CompressionMethod>>,
94+
}
3995

4096
#[test]
4197
fn compression_method_accepts_snake_case_only() {
@@ -52,4 +108,37 @@ mod tests {
52108
assert!(serde_json::from_str::<CompressionMethod>("\"Zstd\"").is_err());
53109
assert!(serde_json::from_str::<CompressionMethod>("\"Deflate\"").is_err());
54110
}
111+
112+
#[test]
113+
fn deserialize_supports_single_value() {
114+
let conf: ConfWithCompression = serde_json::from_str(r#"{ "methods": "gzip" }"#).unwrap();
115+
assert_eq!(conf.methods, Some(vec![CompressionMethod::Gzip]));
116+
}
117+
118+
#[test]
119+
fn deserialize_supports_list() {
120+
let conf: ConfWithCompression =
121+
serde_json::from_str(r#"{ "methods": ["gzip", "zstd", "gzip"] }"#).unwrap();
122+
assert_eq!(
123+
conf.methods,
124+
Some(vec![CompressionMethod::Gzip, CompressionMethod::Zstd])
125+
);
126+
}
127+
128+
#[test]
129+
fn deserialize_supports_none_keyword() {
130+
let conf: ConfWithCompression = serde_json::from_str(r#"{ "methods": "none" }"#).unwrap();
131+
assert_eq!(conf.methods, Some(vec![]));
132+
}
133+
134+
#[test]
135+
fn deserialize_supports_absence() {
136+
#[derive(Debug, Deserialize)]
137+
struct Conf {
138+
#[serde(default, deserialize_with = "deserialize_compression_methods")]
139+
methods: Option<Vec<CompressionMethod>>,
140+
}
141+
let conf: Conf = serde_json::from_str("{}").unwrap();
142+
assert_eq!(conf.methods, None);
143+
}
55144
}

rust/otap-dataflow/crates/otap/src/otap_grpc.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use crate::{
3434

3535
pub mod middleware;
3636
pub mod otlp;
37+
pub mod server_settings;
3738

3839
/// Common settings for OTLP receivers.
3940
#[derive(Clone, Debug)]

0 commit comments

Comments
 (0)