Skip to content

Commit 4e959c8

Browse files
lquereljmacd
andauthored
OTLP receiver improvements/optimizations (#1480)
Key changes: - Use the new GrpcServerSettings config - Tune gRPC Tonic settings - New AckRegistry - New global concurrency limiter - Reduce copy/allocation on the hot path - Clean up the control loop Notes: - In order to keep the OTAP Receiver unchanged, I had to duplicate some data structures (e.g. Settings vs NewSettings). These duplications will be removed in the next PR. - Perf improvements are not visible with the current continuous benchmark because the level of pressure is too low. However on high-load, the number of memory allocations is significantly smaller leading to better performance. Manual pipeline perf test results: https://github.com/open-telemetry/otel-arrow/actions/runs/19776412483/job/56669638225 No regression observed. --------- Co-authored-by: Joshua MacDonald <[email protected]>
1 parent db52405 commit 4e959c8

File tree

14 files changed

+1238
-239
lines changed

14 files changed

+1238
-239
lines changed

rust/otap-dataflow/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ num_enum = "0.7"
8585
object_store = "0.12.3"
8686
once_cell = "1.20.2"
8787
opentelemetry-proto = { version = "0.31", default-features = false, features = ["gen-tonic-messages", "logs"]} #TODO - use it from submodule instead of crate(?)
88-
parking_lot = "0.12.4"
88+
parking_lot = "0.12.5"
8989
paste = "1"
9090
parquet = { version = "57.0", default-features = false, features = ["arrow", "async", "object_store"]}
9191
portpicker = "0.1.1"
@@ -96,10 +96,10 @@ quote = "1.0"
9696
rand = "0.9.2"
9797
roaring = "0.11.2"
9898
schemars = { version = "1.0.0" }
99-
serde = { version = "1.0.219", features = ["derive", "rc"] }
99+
serde = { version = "1.0.228", features = ["derive", "rc"] }
100100
serde_cbor = "0.11.2"
101-
serde_json = { version = "1.0.142" }
102-
serde_with = { version = "3.14.1", features = ["std", "macros", "json"] }
101+
serde_json = { version = "1.0.145" }
102+
serde_with = { version = "3.16.0", features = ["std", "macros", "json"] }
103103
serde_yaml = "0.9.34-deprecated" # Deprecated, but no good alternative yet
104104
replace_with = "0.1.8"
105105
simdutf8 = "0.1.5"

rust/otap-dataflow/crates/engine/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ mod attributes;
3939
pub mod config;
4040
pub mod context;
4141
pub mod control;
42-
mod effect_handler;
42+
pub mod effect_handler;
4343
pub mod local;
4444
pub mod node;
4545
pub mod pipeline_ctrl;

rust/otap-dataflow/crates/otap/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ prost = { workspace = true }
4040
smallvec = { workspace = true }
4141
bitflags = { workspace = true }
4242
bytes = { workspace = true }
43+
parking_lot = { workspace = true }
4344

4445
otap-df-engine = { path = "../engine" }
4546
otap-df-engine-macros = { path = "../engine-macros" }
@@ -61,6 +62,7 @@ weaver_resolved_schema.workspace = true
6162
weaver_resolver.workspace = true
6263
rand.workspace = true
6364
zip.workspace = true
65+
tower = { workspace = true }
6466

6567
# Geneva exporter dependencies
6668
geneva-uploader = { version = "0.3.0", optional = true }

rust/otap-dataflow/crates/otap/src/accessory/slots.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ impl<UData> State<UData> {
4949
#[must_use]
5050
pub fn new(max_size: usize) -> Self {
5151
Self {
52-
slots: SlotMap::with_key(),
52+
// Pre-size the slot map so we do not allocate on the hot path for
53+
// the common case where concurrency stays within the configured
54+
// limit.
55+
slots: SlotMap::with_capacity_and_key(max_size),
5356
max_size,
5457
}
5558
}
@@ -131,7 +134,8 @@ mod tests {
131134
fn test_take_current() {
132135
let mut state = create_test_state();
133136

134-
assert_eq!(state.slots.capacity(), 0);
137+
// Pre-fill to capacity
138+
assert_eq!(state.slots.capacity(), 3);
135139

136140
let (key, rx) = state.allocate(|| oneshot::channel()).unwrap();
137141
assert_eq!(state.slots.len(), 1);

rust/otap-dataflow/crates/otap/src/otap_grpc.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4+
//! Provides a set of structs and enums that interact with the gRPC Server with BiDirectional
5+
//! streaming.
46
//!
5-
//! Provides a set of structs and enums that interact with the gRPC Server with BiDirectional streaming
6-
//!
7-
//! Implements the necessary service traits for OTLP data
7+
//! Implements the necessary service traits for OTLP data.
88
//!
99
//! ToDo: Modify OTAPData -> Optimize message transport
1010
//! ToDo: Handle Ack and Nack, return proper batch status
1111
//! ToDo: Change how channel sizes are handled? Currently defined when creating otap_receiver -> passing channel size to the ServiceImpl
12-
//!
1312
1413
use otap_df_engine::{Interests, ProducerEffectHandlerExtension, shared::receiver as shared};
1514
use otap_df_pdata::{
@@ -27,16 +26,27 @@ use tokio_stream::Stream;
2726
use tokio_stream::wrappers::ReceiverStream;
2827
use tonic::{Request, Response, Status};
2928

30-
use crate::{
31-
otap_grpc::otlp::server::{SharedState, SlotGuard},
32-
pdata::{Context, OtapPdata},
33-
};
29+
use crate::pdata::{Context, OtapPdata};
3430

3531
pub mod client_settings;
32+
pub mod common;
3633
pub mod middleware;
3734
pub mod otlp;
3835
pub mod server_settings;
3936

37+
use crate::otap_grpc::otlp::server::SharedState;
38+
pub use client_settings::GrpcClientSettings;
39+
pub use server_settings::GrpcServerSettings;
40+
41+
/// Common settings for OTLP receivers.
42+
#[derive(Clone, Debug)]
43+
pub struct NewSettings {
44+
/// Maximum concurrent requests per receiver instance (per core).
45+
pub max_concurrent_requests: usize,
46+
/// Whether the receiver should wait.
47+
pub wait_for_result: bool,
48+
}
49+
4050
/// Common settings for OTLP receivers.
4151
#[derive(Clone, Debug)]
4252
pub struct Settings {
@@ -326,7 +336,7 @@ where
326336
key.into(),
327337
&mut otap_pdata,
328338
);
329-
Some((SlotGuard { key, state }, rx))
339+
Some((otlp::server::SlotGuard { key, state }, rx))
330340
} else {
331341
None
332342
};
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Shared helpers for gRPC receivers.
5+
//!
6+
//! Request lifecycle (see `otlp::server` for the per-signal services):
7+
//! - decode: gRPC body stays as OTLP bytes, wrapped into `OtapPdata`
8+
//! - subscribe: when `wait_for_result` is enabled, a slot is allocated and calldata recorded
9+
//! - send: payload is forwarded into the pipeline
10+
//! - wait (optional): task awaits an ACK or NACK routed by the subscription maps
11+
//! - respond: ACK/NACK is mapped back to the waiting gRPC request
12+
13+
use crate::otap_grpc::otlp::server_new::{AckSlot, RouteResponse};
14+
use crate::otap_grpc::server_settings::GrpcServerSettings;
15+
use crate::pdata::OtapPdata;
16+
use otap_df_config::SignalType;
17+
use otap_df_engine::control::{AckMsg, NackMsg};
18+
use tonic::transport::Server;
19+
20+
/// Aggregates the per-signal ACK subscription maps that let us route responses back to callers.
21+
#[derive(Clone, Default)]
22+
pub struct AckRegistry {
23+
/// Subscription map for log acknowledgements.
24+
pub logs: Option<AckSlot>,
25+
/// Subscription map for metric acknowledgements.
26+
pub metrics: Option<AckSlot>,
27+
/// Subscription map for trace acknowledgements.
28+
pub traces: Option<AckSlot>,
29+
}
30+
31+
impl AckRegistry {
32+
/// Creates a new bundle of optional subscription maps.
33+
#[must_use]
34+
pub fn new(logs: Option<AckSlot>, metrics: Option<AckSlot>, traces: Option<AckSlot>) -> Self {
35+
Self {
36+
logs,
37+
metrics,
38+
traces,
39+
}
40+
}
41+
}
42+
43+
/// Routes an Ack message to the appropriate signal's subscription map.
44+
#[must_use]
45+
pub fn route_ack_response(states: &AckRegistry, ack: AckMsg<OtapPdata>) -> RouteResponse {
46+
let calldata = ack.calldata;
47+
let resp = Ok(());
48+
let state = match ack.accepted.signal_type() {
49+
SignalType::Logs => states.logs.as_ref(),
50+
SignalType::Metrics => states.metrics.as_ref(),
51+
SignalType::Traces => states.traces.as_ref(),
52+
};
53+
54+
state
55+
.map(|s| s.route_response(calldata, resp))
56+
.unwrap_or(RouteResponse::None)
57+
}
58+
59+
/// Routes a Nack message to the appropriate shared state.
60+
#[must_use]
61+
pub fn route_nack_response(states: &AckRegistry, mut nack: NackMsg<OtapPdata>) -> RouteResponse {
62+
let calldata = std::mem::take(&mut nack.calldata);
63+
let signal_type = nack.refused.signal_type();
64+
let resp = Err(nack);
65+
let state = match signal_type {
66+
SignalType::Logs => states.logs.as_ref(),
67+
SignalType::Metrics => states.metrics.as_ref(),
68+
SignalType::Traces => states.traces.as_ref(),
69+
};
70+
71+
state
72+
.map(|s| s.route_response(calldata, resp))
73+
.unwrap_or(RouteResponse::None)
74+
}
75+
76+
/// Handles the outcome from routing an Ack/Nack response.
77+
pub fn handle_route_response<T, F, G>(
78+
resp: RouteResponse,
79+
state: &mut T,
80+
mut on_sent: F,
81+
mut on_expired_or_invalid: G,
82+
) where
83+
F: FnMut(&mut T),
84+
G: FnMut(&mut T),
85+
{
86+
match resp {
87+
RouteResponse::Sent => on_sent(state),
88+
RouteResponse::Expired | RouteResponse::Invalid => on_expired_or_invalid(state),
89+
RouteResponse::None => {}
90+
}
91+
}
92+
93+
/// Tunes the maximum concurrent requests relative to the downstream capacity (channel connecting
94+
/// the receiver to the rest of the pipeline).
95+
pub fn tune_max_concurrent_requests(config: &mut GrpcServerSettings, downstream_capacity: usize) {
96+
// Fall back to the downstream channel capacity when it is tighter than the user setting.
97+
let safe_capacity = downstream_capacity.max(1);
98+
if config.max_concurrent_requests == 0 || config.max_concurrent_requests > safe_capacity {
99+
config.max_concurrent_requests = safe_capacity;
100+
}
101+
}
102+
103+
/// Applies the shared server tuning options to a tonic server builder.
104+
pub fn apply_server_tuning<L>(builder: Server<L>, config: &GrpcServerSettings) -> Server<L> {
105+
let transport_limit = config
106+
.transport_concurrency_limit
107+
.and_then(|limit| if limit == 0 { None } else { Some(limit) })
108+
.unwrap_or(config.max_concurrent_requests)
109+
.max(1);
110+
111+
let fallback_streams = config.max_concurrent_requests.min(u32::MAX as usize) as u32;
112+
113+
let mut builder = builder
114+
.concurrency_limit_per_connection(transport_limit)
115+
.load_shed(config.load_shed)
116+
.initial_stream_window_size(config.initial_stream_window_size)
117+
.initial_connection_window_size(config.initial_connection_window_size)
118+
.max_frame_size(config.max_frame_size)
119+
.http2_adaptive_window(Some(config.http2_adaptive_window))
120+
.http2_keepalive_interval(config.http2_keepalive_interval)
121+
.http2_keepalive_timeout(config.http2_keepalive_timeout);
122+
123+
let mut max_concurrent_streams = config
124+
.max_concurrent_streams
125+
.map(|value| if value == 0 { fallback_streams } else { value })
126+
.unwrap_or(fallback_streams);
127+
if max_concurrent_streams == 0 {
128+
max_concurrent_streams = 1;
129+
}
130+
builder = builder.max_concurrent_streams(Some(max_concurrent_streams));
131+
132+
// Apply timeout if configured
133+
if let Some(timeout) = config.timeout {
134+
builder = builder.timeout(timeout);
135+
}
136+
137+
builder
138+
}

rust/otap-dataflow/crates/otap/src/otap_grpc/otlp.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
pub mod client;
88

99
pub mod server;
10+
pub mod server_new;
1011

1112
const LOGS_SERVICE_NAME: &str = "opentelemetry.proto.collector.logs.v1.LogsService";
1213
const LOGS_SERVICE_EXPORT_PATH: &str = "/opentelemetry.proto.collector.logs.v1.LogsService/Export";

0 commit comments

Comments
 (0)