Skip to content

Commit b646543

Browse files
committed
Allow to configure timeout settings in OpenDAL
1 parent d70b7a0 commit b646543

File tree

2 files changed

+121
-11
lines changed

2 files changed

+121
-11
lines changed

crates/iceberg/src/io/file_io.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -629,4 +629,76 @@ mod tests {
629629
let file_io = builder.build();
630630
assert!(file_io.is_ok());
631631
}
632+
633+
#[tokio::test]
634+
async fn test_invalid_timeout_config_returns_error() {
635+
let file_io = FileIOBuilder::new("memory")
636+
.with_prop(IO_TIMEOUT_SECONDS, "not_a_number")
637+
.build()
638+
.unwrap();
639+
640+
let path = "memory://test/file.txt";
641+
let result = file_io.new_input(path);
642+
643+
assert!(result.is_err());
644+
let err = result.unwrap_err();
645+
assert_eq!(err.kind(), crate::ErrorKind::DataInvalid);
646+
assert!(err
647+
.to_string()
648+
.contains("cannot be parsed as a positive integer"));
649+
}
650+
651+
#[tokio::test]
652+
async fn test_invalid_max_retries_config_returns_error() {
653+
let file_io = FileIOBuilder::new("memory")
654+
.with_prop(IO_MAX_RETRIES, "invalid")
655+
.build()
656+
.unwrap();
657+
658+
let path = "memory://test/file.txt";
659+
let result = file_io.new_input(path);
660+
661+
assert!(result.is_err());
662+
let err = result.unwrap_err();
663+
assert_eq!(err.kind(), crate::ErrorKind::DataInvalid);
664+
assert!(err
665+
.to_string()
666+
.contains("cannot be parsed as a positive integer"));
667+
}
668+
669+
#[tokio::test]
670+
async fn test_invalid_retry_min_delay_config_returns_error() {
671+
let file_io = FileIOBuilder::new("memory")
672+
.with_prop(IO_RETRY_MIN_DELAY_MS, "-100")
673+
.build()
674+
.unwrap();
675+
676+
let path = "memory://test/file.txt";
677+
let result = file_io.new_input(path);
678+
679+
assert!(result.is_err());
680+
let err = result.unwrap_err();
681+
assert_eq!(err.kind(), crate::ErrorKind::DataInvalid);
682+
assert!(err
683+
.to_string()
684+
.contains("cannot be parsed as a positive integer"));
685+
}
686+
687+
#[tokio::test]
688+
async fn test_invalid_retry_max_delay_config_returns_error() {
689+
let file_io = FileIOBuilder::new("memory")
690+
.with_prop(IO_RETRY_MAX_DELAY_MS, "abc123")
691+
.build()
692+
.unwrap();
693+
694+
let path = "memory://test/file.txt";
695+
let result = file_io.new_input(path);
696+
697+
assert!(result.is_err());
698+
let err = result.unwrap_err();
699+
assert_eq!(err.kind(), crate::ErrorKind::DataInvalid);
700+
assert!(err
701+
.to_string()
702+
.contains("cannot be parsed as a positive integer"));
703+
}
632704
}

crates/iceberg/src/io/storage.rs

Lines changed: 49 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -246,11 +246,18 @@ impl Storage {
246246

247247
// Configure timeout layer
248248
let operator = if let Some(timeout_str) = config.get(IO_TIMEOUT_SECONDS) {
249-
if let Ok(timeout_secs) = timeout_str.parse::<u64>() {
250-
operator
251-
.layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(timeout_secs)))
252-
} else {
253-
operator.layer(TimeoutLayer::new())
249+
match timeout_str.parse::<u64>() {
250+
Ok(timeout_secs) => operator
251+
.layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(timeout_secs))),
252+
Err(_) => {
253+
return Err(Error::new(
254+
ErrorKind::DataInvalid,
255+
format!(
256+
"Invalid {}: '{}' cannot be parsed as a positive integer",
257+
IO_TIMEOUT_SECONDS, timeout_str
258+
),
259+
))
260+
}
254261
}
255262
} else {
256263
operator.layer(TimeoutLayer::new())
@@ -260,20 +267,51 @@ impl Storage {
260267
let mut retry_layer = RetryLayer::new();
261268

262269
if let Some(max_retries_str) = config.get(IO_MAX_RETRIES) {
263-
if let Ok(max_retries) = max_retries_str.parse::<usize>() {
264-
retry_layer = retry_layer.with_max_times(max_retries);
270+
match max_retries_str.parse::<usize>() {
271+
Ok(max_retries) => retry_layer = retry_layer.with_max_times(max_retries),
272+
Err(_) => {
273+
return Err(Error::new(
274+
ErrorKind::DataInvalid,
275+
format!(
276+
"Invalid {}: '{}' cannot be parsed as a positive integer",
277+
IO_MAX_RETRIES, max_retries_str
278+
),
279+
))
280+
}
265281
}
266282
}
267283

268284
if let Some(min_delay_str) = config.get(IO_RETRY_MIN_DELAY_MS) {
269-
if let Ok(min_delay_ms) = min_delay_str.parse::<u64>() {
270-
retry_layer = retry_layer.with_min_delay(Duration::from_millis(min_delay_ms));
285+
match min_delay_str.parse::<u64>() {
286+
Ok(min_delay_ms) => {
287+
retry_layer = retry_layer.with_min_delay(Duration::from_millis(min_delay_ms))
288+
}
289+
Err(_) => {
290+
return Err(Error::new(
291+
ErrorKind::DataInvalid,
292+
format!(
293+
"Invalid {}: '{}' cannot be parsed as a positive integer",
294+
IO_RETRY_MIN_DELAY_MS, min_delay_str
295+
),
296+
))
297+
}
271298
}
272299
}
273300

274301
if let Some(max_delay_str) = config.get(IO_RETRY_MAX_DELAY_MS) {
275-
if let Ok(max_delay_ms) = max_delay_str.parse::<u64>() {
276-
retry_layer = retry_layer.with_max_delay(Duration::from_millis(max_delay_ms));
302+
match max_delay_str.parse::<u64>() {
303+
Ok(max_delay_ms) => {
304+
retry_layer = retry_layer.with_max_delay(Duration::from_millis(max_delay_ms))
305+
}
306+
Err(_) => {
307+
return Err(Error::new(
308+
ErrorKind::DataInvalid,
309+
format!(
310+
"Invalid {}: '{}' cannot be parsed as a positive integer",
311+
IO_RETRY_MAX_DELAY_MS, max_delay_str
312+
),
313+
))
314+
}
277315
}
278316
}
279317

0 commit comments

Comments
 (0)