Skip to content

Commit a5c2017

Browse files
authored
Merge branch 'master' into message_bus_final
2 parents 3f5a553 + 41d329b commit a5c2017

File tree

18 files changed

+529
-269
lines changed

18 files changed

+529
-269
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/common/src/configs/mod.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use figment::{
3434
use serde::{Serialize, de::DeserializeOwned};
3535
use std::{env, fmt::Display, future::Future, marker::PhantomData, path::Path};
3636
use toml::{Value as TomlValue, map::Map as TomlMap};
37+
use tracing::{error, info, warn};
3738

3839
const SECRET_MASK: &str = "******";
3940
const ARRAY_SEPARATOR: char = '_';
@@ -162,7 +163,7 @@ impl<T: ConfigurationType> CustomEnvProvider<T> {
162163
value = SECRET_MASK.to_string();
163164
}
164165

165-
println!("{env_key} value changed to: {value} from environment variable");
166+
info!("{env_key} value changed to: {value} from environment variable");
166167
Self::insert_environment_override(&source_dict, &mut target_dict, keys, env_var_value);
167168
}
168169

@@ -752,28 +753,30 @@ fn file_exists<P: AsRef<Path>>(path: P) -> bool {
752753

753754
impl<T: ConfigurationType, P: Provider + Clone> ConfigProvider<T> for FileConfigProvider<P> {
754755
async fn load_config(&self) -> Result<T, ConfigurationError> {
755-
println!("Loading config from path: '{}'...", self.file_path);
756+
info!("Loading config from path: '{}'...", self.file_path);
756757

757758
// Start with the default configuration if provided
758759
let mut config_builder = Figment::new();
759760
let has_default = self.default_config.is_some();
760761
if let Some(default) = &self.default_config {
761762
config_builder = config_builder.merge(default);
762763
} else {
763-
println!("No default configuration provided.");
764+
warn!("No default configuration provided.");
764765
}
765766

766767
// If the config file exists, merge it into the configuration
767768
if file_exists(&self.file_path) {
768-
println!("Found configuration file at path: '{}'.", self.file_path);
769+
info!("Found configuration file at path: '{}'.", self.file_path);
769770
config_builder = config_builder.merge(Toml::file(&self.file_path));
770771
} else {
771-
println!(
772+
warn!(
772773
"Configuration file not found at path: '{}'.",
773774
self.file_path
774775
);
775776
if has_default {
776-
println!("Using default configuration as no config file was found.");
777+
info!(
778+
"Using default configuration embedded into server, as no config file was found."
779+
);
777780
}
778781
}
779782

@@ -785,17 +788,17 @@ impl<T: ConfigurationType, P: Provider + Clone> ConfigProvider<T> for FileConfig
785788

786789
match config_result {
787790
Ok(config) => {
788-
println!("Config loaded successfully.");
791+
info!("Config loaded successfully.");
789792
let display_config = env::var(DISPLAY_CONFIG_ENV)
790793
.map(|val| val == "1" || val.to_lowercase() == "true")
791794
.unwrap_or(self.display_config);
792795
if display_config {
793-
println!("Using Config: {config}");
796+
info!("Using Config: {config}");
794797
}
795798
Ok(config)
796799
}
797800
Err(e) => {
798-
println!("Failed to load config: {e}");
801+
error!("Failed to load config: {e}");
799802
Err(ConfigurationError::CannotLoadConfiguration)
800803
}
801804
}

core/configs/server.toml

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -340,9 +340,16 @@ path = "runtime"
340340
# Path for storing log files.
341341
path = "logs"
342342

343-
# Level of logging detail. Options: "debug", "info", "warn", "error".
343+
# Log filtering directive using the same syntax as the RUST_LOG environment variable.
344+
# Supports simple levels ("trace", "debug", "info", "warn", "error", "off" or "none")
345+
# as well as complex directives like "warn,server=debug,iggy=trace".
346+
# Note: RUST_LOG environment variable always takes precedence over this setting.
344347
level = "info"
345348

349+
# Whether to write logs to file. When false, logs are only written to stdout.
350+
# When enabled, logs are stored in {system.path}/{system.logging.path} (default: local_data/logs).
351+
file_enabled = true
352+
346353
# Maximum size of the log files before rotation.
347354
max_size = "512 MB"
348355

core/connectors/runtime/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ mimalloc = { workspace = true }
4747
once_cell = { workspace = true }
4848
postcard = { workspace = true }
4949
reqwest = { workspace = true }
50+
reqwest-middleware = { workspace = true }
51+
reqwest-retry = { workspace = true }
5052
serde = { workspace = true }
5153
serde_json = { workspace = true }
5254
serde_with = { workspace = true }

core/connectors/runtime/README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@ timeout = "10s"
6565
[connectors.request_headers]
6666
api-key = "your-api-key"
6767

68+
[connectors.retry]
69+
enabled = true
70+
max_attempts = 3
71+
initial_backoff = "1 s"
72+
max_backoff = "30 s"
73+
backoff_multiplier = 2
74+
6875
[connectors.url_templates]
6976
# Optional: Customize URL templates for specific operations
7077
# If not specified, default RESTful URL patterns are used

core/connectors/runtime/src/configs/connectors.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ pub async fn create_connectors_config_provider(
242242
&config.request_headers,
243243
&config.url_templates,
244244
&config.response,
245+
&config.retry,
245246
)?;
246247
Ok(Box::new(provider))
247248
}

core/connectors/runtime/src/configs/connectors/http_provider.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,12 @@ use crate::configs::connectors::{
2626
ConnectorConfigVersions, ConnectorsConfig, ConnectorsConfigProvider, CreateSinkConfig,
2727
CreateSourceConfig, SinkConfig, SourceConfig,
2828
};
29-
use crate::configs::runtime::ResponseConfig;
29+
use crate::configs::runtime::{ResponseConfig, RetryConfig};
3030
use crate::error::RuntimeError;
3131
use async_trait::async_trait;
3232
use reqwest;
33+
use reqwest_middleware::{ClientBuilder, ClientWithMiddleware};
34+
use reqwest_retry::{Jitter, RetryTransientMiddleware, policies::ExponentialBackoff};
3335
use serde::{Deserialize, Serialize};
3436
use std::collections::HashMap;
3537
use std::str::FromStr;
@@ -43,7 +45,7 @@ struct SetActiveVersionRequest {
4345
pub struct HttpConnectorsConfigProvider {
4446
url_builder: UrlBuilder,
4547
response_extractor: ResponseExtractor,
46-
client: reqwest::Client,
48+
client: ClientWithMiddleware,
4749
}
4850

4951
impl HttpConnectorsConfigProvider {
@@ -53,6 +55,7 @@ impl HttpConnectorsConfigProvider {
5355
request_headers: &HashMap<String, String>,
5456
url_templates: &HashMap<String, String>,
5557
response_config: &ResponseConfig,
58+
retry_config: &RetryConfig,
5659
) -> Result<Self, RuntimeError> {
5760
let mut headers = reqwest::header::HeaderMap::new();
5861
for (key, value) in request_headers {
@@ -75,13 +78,35 @@ impl HttpConnectorsConfigProvider {
7578
RuntimeError::InvalidConfiguration(format!("Failed to build HTTP client: {err}"))
7679
})?;
7780

81+
let mut client_with_middleware = ClientBuilder::new(client);
82+
83+
if retry_config.enabled {
84+
tracing::trace!("Apply retry config: {:?}", retry_config);
85+
86+
let retry_policy = ExponentialBackoff::builder()
87+
.retry_bounds(
88+
retry_config.initial_backoff.get_duration(),
89+
retry_config.max_backoff.get_duration(),
90+
)
91+
.base(retry_config.backoff_multiplier)
92+
.jitter(Jitter::Bounded)
93+
.build_with_max_retries(retry_config.max_attempts);
94+
95+
let retry_transient_middleware =
96+
RetryTransientMiddleware::new_with_policy(retry_policy);
97+
98+
client_with_middleware = client_with_middleware.with(retry_transient_middleware);
99+
}
100+
101+
let final_client = client_with_middleware.build();
102+
78103
let url_builder = UrlBuilder::new(base_url, url_templates);
79104
let response_extractor = ResponseExtractor::new(response_config);
80105

81106
Ok(Self {
82107
url_builder,
83108
response_extractor,
84-
client,
109+
client: final_client,
85110
})
86111
}
87112

core/connectors/runtime/src/configs/runtime.rs

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,44 @@ pub struct IggyTlsConfig {
5353
pub domain: Option<String>,
5454
}
5555

56+
#[serde_as]
57+
#[derive(Debug, Clone, Serialize, Deserialize)]
58+
pub struct RetryConfig {
59+
pub enabled: bool,
60+
pub max_attempts: u32,
61+
#[serde_as(as = "DisplayFromStr")]
62+
pub initial_backoff: IggyDuration,
63+
#[serde_as(as = "DisplayFromStr")]
64+
pub max_backoff: IggyDuration,
65+
pub backoff_multiplier: u32,
66+
}
67+
68+
impl Default for RetryConfig {
69+
fn default() -> Self {
70+
Self {
71+
enabled: true,
72+
max_attempts: 3,
73+
initial_backoff: IggyDuration::new_from_secs(1),
74+
max_backoff: IggyDuration::new_from_secs(30),
75+
backoff_multiplier: 2,
76+
}
77+
}
78+
}
79+
80+
impl Display for RetryConfig {
81+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
82+
write!(
83+
f,
84+
"{{ enabled: {}, max_attempts: {}, initial_backoff: {}, max_backoff: {}, backoff_multiplier: {} }}",
85+
self.enabled,
86+
self.max_attempts,
87+
self.initial_backoff,
88+
self.max_backoff,
89+
self.backoff_multiplier
90+
)
91+
}
92+
}
93+
5694
#[derive(Debug, Default, Clone, Deserialize, Serialize)]
5795
#[serde(default)]
5896
pub struct LocalConnectorsConfig {
@@ -72,18 +110,21 @@ pub struct HttpConnectorsConfig {
72110
pub url_templates: HashMap<String, String>,
73111
#[serde(default)]
74112
pub response: ResponseConfig,
113+
#[serde(default)]
114+
pub retry: RetryConfig,
75115
}
76116

77117
impl Display for HttpConnectorsConfig {
78118
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
79119
write!(
80120
f,
81-
"{{ type: \"http\", base_url: {:?}, request_headers: {:?}, timeout: {}, url_templates: {:?}, response: {:?} }}",
121+
"{{ type: \"http\", base_url: {:?}, request_headers: {:?}, timeout: {}, url_templates: {:?}, response: {:?}, retry: {} }}",
82122
self.base_url,
83123
self.request_headers.keys(),
84124
self.timeout,
85125
self.url_templates,
86-
self.response
126+
self.response,
127+
self.retry
87128
)
88129
}
89130
}
@@ -99,6 +140,7 @@ pub struct ResponseConfig {
99140
pub error_path: Option<String>,
100141
}
101142

143+
#[allow(clippy::large_enum_variant)]
102144
#[derive(Debug, Clone, Deserialize, Serialize)]
103145
#[serde(tag = "config_type", rename_all = "lowercase")]
104146
pub enum ConnectorsConfig {

core/server/src/configs/defaults.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,7 @@ impl Default for LoggingConfig {
399399
LoggingConfig {
400400
path: SERVER_CONFIG.system.logging.path.parse().unwrap(),
401401
level: SERVER_CONFIG.system.logging.level.parse().unwrap(),
402+
file_enabled: SERVER_CONFIG.system.logging.file_enabled,
402403
max_size: SERVER_CONFIG.system.logging.max_size.parse().unwrap(),
403404
retention: SERVER_CONFIG.system.logging.retention.parse().unwrap(),
404405
sysinfo_print_interval: SERVER_CONFIG

core/server/src/configs/displays.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,10 @@ impl Display for LoggingConfig {
248248
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
249249
write!(
250250
f,
251-
"{{ path: {}, level: {}, max_size: {}, retention: {} }}",
251+
"{{ path: {}, level: {}, file_enabled: {}, max_size: {}, retention: {} }}",
252252
self.path,
253253
self.level,
254+
self.file_enabled,
254255
self.max_size.as_human_string_with_zero_as_unlimited(),
255256
self.retention
256257
)

0 commit comments

Comments
 (0)