Skip to content

Commit 07ff380

Browse files
authored
Azure support for parquet exporter and cloud_auth module (#1517)
The primary motivation for this PR is adding support for azure backends for the parquet exporter. This work seems potentially re-usable for anything that interacts with object storage and/or anything that needs an azure token credential, so I also did a bit of module shuffling. Major changes: - Added a `cloud_auth` module with support for an initial set of azure token credential types - Moved the `object_store` module up and added support for azure token credential types - Added a new `azure` feature flag that gates all things azure across both `cloud_auth` and `object_store` - Fixed up tests where appropriate Something that I did not do at this stage was explore moving all of this auth stuff into some kind of `identity` extension. That might be the next step and I think much of the code here could be easily folded into such an extension. Related to: #501
1 parent c738bf8 commit 07ff380

File tree

11 files changed

+826
-177
lines changed

11 files changed

+826
-177
lines changed

rust/otap-dataflow/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ wiremock = "0.6.5"
159159

160160
[features]
161161
default = []
162+
azure = ["otap-df-otap/azure"]
162163
unsafe-optimizations = ["unchecked-index", "unchecked-arithmetic"]
163164
unchecked-index = []
164165
unchecked-arithmetic = []

rust/otap-dataflow/configs/fake-parquet.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ nodes:
2222
kind: exporter
2323
plugin_urn: "urn:otel:otap:parquet:exporter"
2424
config:
25-
base_uri: /tmp
25+
storage:
26+
file:
27+
base_uri: /tmp
2628
partitioning_strategies:
2729
- schema_metadata: ["_part_id"]
2830
writer_options:
2931
flush_when_older_than: 10s
30-

rust/otap-dataflow/crates/otap/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ azure-monitor-exporter = [
9696
"dep:reqwest",
9797
"dep:azure_core"
9898
]
99+
azure = ["object_store/azure", "object_store/cloud", "dep:azure_identity", "dep:azure_core"]
99100

100101
[dev-dependencies]
101102
flume.workspace = true
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
/// Azure auth utilities
5+
#[cfg(feature = "azure")]
6+
pub mod azure;
Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use std::{path::PathBuf, sync::Arc};
5+
6+
use azure_core::credentials::TokenCredential;
7+
use azure_identity::AzureCliCredential;
8+
use serde::{Deserialize, Serialize};
9+
10+
/// Azure authentication methods. This can be leveraged in component
11+
/// configuration objects for a consistent way to specify Azure auth information.
12+
/// The next step here may be to add an equivalent to the Go collector's auth
13+
/// extensions rather thatn borrow this across component configs.
14+
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
15+
#[serde(tag = "type", rename_all = "snake_case")]
16+
pub enum AuthMethod {
17+
/// See [azure_identity::AzureCliCredential].
18+
AzureCli {
19+
/// Additional tenants that the credential should be allowed to
20+
/// authenticate in.
21+
#[serde(default)]
22+
additionally_allowed_tenants: Vec<String>,
23+
24+
/// Set this to specify a subscription other than the currently active
25+
/// one in the Azure cli.
26+
subscription: Option<String>,
27+
28+
/// Tenant ID to authenticate in. Defaults to the value of the azure
29+
/// cli's default tenant.
30+
tenant_id: Option<String>,
31+
},
32+
/// See [azure_identity::ManagedIdentityCredential].
33+
ManagedIdentity {
34+
/// User assigned identity to use when authenticating, otherwise the
35+
/// system assigned identity will be used if available.
36+
user_assigned_id: Option<UserAssignedId>,
37+
},
38+
/// See [azure_identity::WorkloadIdentityCredential].
39+
WorkloadIdentity {
40+
/// Client ID of the Entra identity. Defaults to the value of the
41+
/// `AZURE_CLIENT_ID` environment variable.
42+
client_id: Option<String>,
43+
44+
/// Tenant ID of the Entra identity. Defaults to the value of the
45+
/// `AZURE_TENANT_ID` environment variable.
46+
tenant_id: Option<String>,
47+
48+
/// Path to the token file to read the assertion from. Defaults to the
49+
/// value of the AZURE_FEDERATED_TOKEN_FILE environment variable.
50+
token_file_path: Option<PathBuf>,
51+
},
52+
}
53+
54+
/// Equivalent of [azure_identity::UserAssignedId]
55+
#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize)]
56+
#[serde(rename_all = "snake_case")]
57+
pub enum UserAssignedId {
58+
/// The client ID of a user-assigned identity
59+
ClientId(String),
60+
/// The object or principal ID of a user-assigned identity
61+
ObjectId(String),
62+
/// The Azure resource ID of a user-assigned identity
63+
ResourceId(String),
64+
}
65+
66+
impl From<UserAssignedId> for azure_identity::UserAssignedId {
67+
fn from(value: UserAssignedId) -> Self {
68+
match value {
69+
UserAssignedId::ClientId(id) => azure_identity::UserAssignedId::ClientId(id),
70+
UserAssignedId::ObjectId(id) => azure_identity::UserAssignedId::ObjectId(id),
71+
UserAssignedId::ResourceId(id) => azure_identity::UserAssignedId::ResourceId(id),
72+
}
73+
}
74+
}
75+
76+
/// Create a [TokenCredential] from the given [AuthMethod].
77+
pub fn from_auth_method(value: AuthMethod) -> Result<Arc<dyn TokenCredential>, azure_core::Error> {
78+
match value {
79+
AuthMethod::AzureCli {
80+
additionally_allowed_tenants,
81+
subscription,
82+
tenant_id,
83+
} => {
84+
let options = Some(azure_identity::AzureCliCredentialOptions {
85+
additionally_allowed_tenants,
86+
subscription,
87+
tenant_id,
88+
..Default::default()
89+
});
90+
Ok(AzureCliCredential::new(options)?)
91+
}
92+
AuthMethod::ManagedIdentity { user_assigned_id } => {
93+
let options = azure_identity::ManagedIdentityCredentialOptions {
94+
user_assigned_id: user_assigned_id.map(|u| u.into()),
95+
..Default::default()
96+
};
97+
Ok(azure_identity::ManagedIdentityCredential::new(Some(
98+
options,
99+
))?)
100+
}
101+
AuthMethod::WorkloadIdentity {
102+
client_id,
103+
tenant_id,
104+
token_file_path,
105+
} => {
106+
let options = azure_identity::WorkloadIdentityCredentialOptions {
107+
client_id,
108+
tenant_id,
109+
token_file_path,
110+
..Default::default()
111+
};
112+
Ok(azure_identity::WorkloadIdentityCredential::new(Some(
113+
options,
114+
))?)
115+
}
116+
}
117+
}
118+
119+
#[cfg(test)]
120+
mod test {
121+
use super::*;
122+
use serde_json::json;
123+
124+
#[test]
125+
fn test_user_assigned_id() {
126+
let json = json!({
127+
"type": "managed_identity",
128+
"user_assigned_id": {
129+
"client_id": "foo"
130+
}
131+
})
132+
.to_string();
133+
134+
let expected = AuthMethod::ManagedIdentity {
135+
user_assigned_id: Some(UserAssignedId::ClientId("foo".to_string())),
136+
};
137+
test_auth_method(&json, expected);
138+
}
139+
140+
#[test]
141+
fn test_workload_identity() {
142+
let json = json!({
143+
"type": "workload_identity"
144+
})
145+
.to_string();
146+
let expected = AuthMethod::WorkloadIdentity {
147+
client_id: None,
148+
tenant_id: None,
149+
token_file_path: None,
150+
};
151+
152+
// Workload identity requires some env vars to be present to create the
153+
// credential and the test methods to override that are not exposed
154+
// outside of `azure_identity`.
155+
let method: AuthMethod =
156+
serde_json::from_str(&json).expect("Failed to deserialize AuthMethod");
157+
assert_eq!(method, expected);
158+
}
159+
160+
#[test]
161+
fn test_managed_identity() {
162+
let json = json!({
163+
"type": "managed_identity"
164+
})
165+
.to_string();
166+
let expected = AuthMethod::ManagedIdentity {
167+
user_assigned_id: None,
168+
};
169+
170+
test_auth_method(&json, expected);
171+
}
172+
173+
#[test]
174+
fn test_azure_cli() {
175+
let json = json!({
176+
"type": "azure_cli"
177+
})
178+
.to_string();
179+
let expected = AuthMethod::AzureCli {
180+
additionally_allowed_tenants: vec![],
181+
subscription: None,
182+
tenant_id: None,
183+
};
184+
185+
test_auth_method(&json, expected);
186+
}
187+
188+
fn test_auth_method(json: &str, expected: AuthMethod) {
189+
let method: AuthMethod =
190+
serde_json::from_str(json).expect("Failed to deserialize AuthMethod");
191+
assert_eq!(method, expected);
192+
193+
assert!(from_auth_method(method).is_ok());
194+
}
195+
}

rust/otap-dataflow/crates/otap/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ mod metrics;
7979
/// gRPC service implementation
8080
pub mod otlp_grpc;
8181

82+
/// Cloud specific auth utilities
83+
pub mod cloud_auth;
84+
85+
/// Object storage utilities including integrations for different cloud
86+
/// providers
87+
pub mod object_store;
8288
/// TLS utilities
8389
#[cfg(feature = "experimental-tls")]
8490
pub mod tls_utils;

0 commit comments

Comments
 (0)