Skip to content

Commit 45ea0bc

Browse files
committed
fix: OCI Pull timeout
Signed-off-by: Lucas Fontes <[email protected]>
1 parent fcfb30b commit 45ea0bc

File tree

9 files changed

+162
-92
lines changed

9 files changed

+162
-92
lines changed

Cargo.lock

Lines changed: 8 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ dialoguer = { workspace = true, features = ["editor", "password", "zeroize", "fu
4343
etcetera = { workspace = true }
4444
figment = { workspace = true, features = ["json", "env"] }
4545
flate2 = { workspace = true, features = ["rust_backend"] }
46+
humantime = { workspace = true }
4647
indicatif = { workspace = true }
4748
notify = { workspace = true, features = ["macos_fsevent"] }
4849
reqwest = { workspace = true, features = ["json", "rustls-tls"] }
@@ -89,6 +90,7 @@ figment = { version = "0.10.19", default-features = false }
8990
futures = { version = "0.3", default-features = false }
9091
flate2 = { version = "1.0", default-features = false }
9192
hyper = { version = "1.6.0", default-features = false }
93+
humantime = { version = "2.3.0" }
9294
indicatif = { version = "0.18.0", default-features = false }
9395
k8s-openapi = { version = "0.25", default-features = false }
9496
kube = { version = "1", default-features = false }

crates/wash-runtime/src/host/mod.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
use std::collections::{HashMap, HashSet};
4444
use std::future::Future;
4545
use std::sync::Arc;
46+
use std::time::Duration;
4647

4748
use anyhow::{Context, bail};
4849
use names::{Generator, Name};
@@ -597,9 +598,19 @@ impl std::fmt::Debug for Host {
597598
}
598599

599600
/// Config for the [`Host`]
600-
#[derive(Clone, Debug, Default)]
601+
#[derive(Clone, Debug)]
601602
pub struct HostConfig {
602603
pub allow_oci_insecure: bool,
604+
pub oci_pull_timeout: Option<Duration>,
605+
}
606+
607+
impl Default for HostConfig {
608+
fn default() -> Self {
609+
Self {
610+
allow_oci_insecure: false,
611+
oci_pull_timeout: Duration::from_secs(30).into(),
612+
}
613+
}
603614
}
604615

605616
/// Builder for the [`Host`]

crates/wash-runtime/src/washlet/mod.rs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ pub struct ClusterHostBuilder {
3434
host_group: Option<String>,
3535
host_name: Option<String>,
3636
heartbeat_interval: Option<Duration>,
37+
host_config: Option<HostConfig>,
3738
}
3839

3940
impl ClusterHostBuilder {
@@ -47,6 +48,11 @@ impl ClusterHostBuilder {
4748
self
4849
}
4950

51+
pub fn with_host_config(mut self, host_config: HostConfig) -> Self {
52+
self.host_config = Some(host_config);
53+
self
54+
}
55+
5056
pub fn with_host_builder(mut self, host_builder: crate::host::HostBuilder) -> Self {
5157
self.host_builder = host_builder;
5258
self
@@ -85,6 +91,11 @@ impl ClusterHostBuilder {
8591
if let Some(host_name) = self.host_name {
8692
builder = builder.with_hostname(host_name)
8793
}
94+
95+
if let Some(host_config) = self.host_config {
96+
builder = builder.with_config(host_config);
97+
}
98+
8899
let heartbeat_interval = self.heartbeat_interval.unwrap_or(HEARTBEAT_INTERVAL);
89100
let host = builder.build()?;
90101
Ok(ClusterHost {
@@ -242,12 +253,17 @@ async fn handle_command(
242253

243254
/// Convert ImagePullSecret from protobuf to OciConfig
244255
fn image_pull_secret_to_oci_config(
256+
config: &HostConfig,
245257
pull_secret: &Option<types::v2::ImagePullSecret>,
246258
) -> oci::OciConfig {
247-
match &pull_secret {
259+
let mut oci_config = match &pull_secret {
248260
Some(creds) => oci::OciConfig::new_with_credentials(&creds.username, &creds.password),
249261
None => OciConfig::default(),
250-
}
262+
};
263+
oci_config.insecure = config.allow_oci_insecure;
264+
oci_config.timeout = config.oci_pull_timeout;
265+
266+
oci_config
251267
}
252268

253269
async fn host_heartbeat(host: &impl HostApi) -> anyhow::Result<types::v2::HostHeartbeat> {
@@ -275,8 +291,8 @@ async fn workload_start(
275291
let (components, host_interfaces) = if let Some(wit_world) = wit_world {
276292
let mut pulled_components = Vec::with_capacity(wit_world.components.len());
277293
for component in &wit_world.components {
278-
let mut oci_config = image_pull_secret_to_oci_config(&component.image_pull_secret);
279-
oci_config.insecure = config.allow_oci_insecure;
294+
let mut oci_config =
295+
image_pull_secret_to_oci_config(config, &component.image_pull_secret);
280296
let bytes = match oci::pull_component(&component.image, oci_config).await {
281297
Ok(bytes) => bytes,
282298
Err(e) => {
@@ -316,7 +332,7 @@ async fn workload_start(
316332
};
317333

318334
let service = if let Some(service) = service {
319-
let oci_config = image_pull_secret_to_oci_config(&service.image_pull_secret);
335+
let oci_config = image_pull_secret_to_oci_config(config, &service.image_pull_secret);
320336
let bytes = match oci::pull_component(&service.image, oci_config).await {
321337
Ok(bytes) => bytes,
322338
Err(e) => {
@@ -572,12 +588,18 @@ impl From<crate::types::WorkloadStatus> for types::v2::WorkloadStatus {
572588

573589
#[cfg(test)]
574590
mod tests {
591+
use crate::host;
592+
575593
use super::*;
576594

577595
#[tokio::test]
578596
async fn test_image_pull_secret_to_oci_config_none() {
597+
let host_config = HostConfig {
598+
allow_oci_insecure: false,
599+
oci_pull_timeout: None,
600+
};
579601
let secret: Option<types::v2::ImagePullSecret> = None;
580-
let config = image_pull_secret_to_oci_config(&secret);
602+
let config = image_pull_secret_to_oci_config(&host_config, &secret);
581603
assert!(config.credentials.is_none());
582604
assert!(!config.insecure);
583605
}
@@ -589,7 +611,11 @@ mod tests {
589611
password: "testpass".to_string(),
590612
});
591613

592-
let config = image_pull_secret_to_oci_config(&secret);
614+
let host_config = HostConfig {
615+
allow_oci_insecure: false,
616+
oci_pull_timeout: None,
617+
};
618+
let config = image_pull_secret_to_oci_config(&host_config, &secret);
593619
assert_eq!(
594620
config.credentials,
595621
Some(("testuser".to_string(), "testpass".to_string()))

crates/wash/src/cli/host.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use std::{net::SocketAddr, sync::Arc};
1+
use std::{net::SocketAddr, sync::Arc, time::Duration};
22

33
use anyhow::Context as _;
44
use clap::Args;
@@ -34,6 +34,14 @@ pub struct HostCommand {
3434
#[cfg(not(target_os = "windows"))]
3535
#[clap(long = "wasi-webgpu", default_value_t = false)]
3636
pub wasi_webgpu: bool,
37+
38+
/// Allow insecure OCI Registries
39+
#[clap(long = "allow-insecure-registries", default_value_t = false)]
40+
pub allow_insecure_registries: bool,
41+
42+
/// Timeout for pulling artifacts from OCI registries
43+
#[clap(long = "registry-pull-timeout", value_parser = humantime::parse_duration, default_value = "30s")]
44+
pub registry_pull_timeout: Duration,
3745
}
3846

3947
impl CliCommand for HostCommand {
@@ -49,7 +57,13 @@ impl CliCommand for HostCommand {
4957
.context("failed to connect to NATS")?;
5058
let data_nats_client = Arc::new(data_nats_client);
5159

60+
let host_config = wash_runtime::host::HostConfig {
61+
allow_oci_insecure: self.allow_insecure_registries,
62+
oci_pull_timeout: Some(self.registry_pull_timeout),
63+
};
64+
5265
let mut cluster_host_builder = wash_runtime::washlet::ClusterHostBuilder::default()
66+
.with_host_config(host_config)
5367
.with_nats_client(Arc::new(scheduler_nats_client))
5468
.with_host_group(self.host_group.clone())
5569
.with_plugin(Arc::new(
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package runtime
2+
3+
import (
4+
"context"
5+
"strings"
6+
"time"
7+
8+
runtimev2 "go.wasmcloud.dev/runtime-operator/pkg/rpc/wasmcloud/runtime/v2"
9+
"go.wasmcloud.dev/runtime-operator/pkg/wasmbus"
10+
"google.golang.org/protobuf/encoding/protojson"
11+
"google.golang.org/protobuf/proto"
12+
"google.golang.org/protobuf/types/known/emptypb"
13+
)
14+
15+
const HostRoundtripTimeout = 1 * time.Minute
16+
17+
func NewWashHostClient(bus wasmbus.Bus, hostID string) *WashHostClient {
18+
return &WashHostClient{
19+
Bus: bus,
20+
HostID: hostID,
21+
}
22+
}
23+
24+
type WashHostClient struct {
25+
Bus wasmbus.Bus
26+
HostID string
27+
}
28+
29+
func (w *WashHostClient) subject(parts ...string) string {
30+
return strings.Join(append([]string{
31+
"runtime",
32+
"host",
33+
w.HostID,
34+
}, parts...), ".")
35+
}
36+
37+
func (w *WashHostClient) Heartbeat(ctx context.Context) (*runtimev2.HostHeartbeat, error) {
38+
var resp runtimev2.HostHeartbeat
39+
if err := RoundTrip(ctx, w.Bus, w.subject("heartbeat"), &emptypb.Empty{}, &resp); err != nil {
40+
return nil, err
41+
}
42+
return &resp, nil
43+
}
44+
45+
func (w *WashHostClient) Start(ctx context.Context, req *runtimev2.WorkloadStartRequest) (*runtimev2.WorkloadStartResponse, error) {
46+
var resp runtimev2.WorkloadStartResponse
47+
if err := RoundTrip(ctx, w.Bus, w.subject("workload.start"), req, &resp); err != nil {
48+
return nil, err
49+
}
50+
return &resp, nil
51+
}
52+
53+
func (w *WashHostClient) Status(ctx context.Context, req *runtimev2.WorkloadStatusRequest) (*runtimev2.WorkloadStatusResponse, error) {
54+
var resp runtimev2.WorkloadStatusResponse
55+
if err := RoundTrip(ctx, w.Bus, w.subject("workload.status"), req, &resp); err != nil {
56+
return nil, err
57+
}
58+
return &resp, nil
59+
}
60+
61+
func (w *WashHostClient) Stop(ctx context.Context, req *runtimev2.WorkloadStopRequest) (*runtimev2.WorkloadStopResponse, error) {
62+
var resp runtimev2.WorkloadStopResponse
63+
if err := RoundTrip(ctx, w.Bus, w.subject("workload.stop"), req, &resp); err != nil {
64+
return nil, err
65+
}
66+
return &resp, nil
67+
}
68+
69+
// RoundTrip sends a request and waits for a response.
70+
func RoundTrip[Req proto.Message, Resp proto.Message](ctx context.Context, bus wasmbus.Bus, subject string, req Req, resp Resp) error {
71+
ctx, cancel := context.WithTimeout(ctx, HostRoundtripTimeout)
72+
defer cancel()
73+
74+
json, err := protojson.Marshal(req)
75+
if err != nil {
76+
return err
77+
}
78+
79+
msg := wasmbus.NewMessage(subject)
80+
msg.Data = json
81+
82+
reply, err := bus.Request(ctx, msg)
83+
if err != nil {
84+
return err
85+
}
86+
87+
return protojson.Unmarshal(reply.Data, resp)
88+
}

runtime-operator/internal/controller/runtime/host_controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (r *HostReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
4141
}
4242

4343
func (r *HostReconciler) reconcileReporting(ctx context.Context, host *runtimev1alpha1.Host) error {
44-
client := NewWorkloadClient(r.Bus, host.HostID)
44+
client := NewWashHostClient(r.Bus, host.HostID)
4545

4646
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
4747
defer cancel()

0 commit comments

Comments
 (0)