Skip to content

Commit 61c390f

Browse files
committed
feat(host): introduce modular outbound transport architecture with initial gRPC client support
- Add `OutgoingHandler` trait to enable pluggable transport protocols - Implement `CompositeOutgoingHandler` for automatic protocol routing - Add gRPC client handler with HTTP/2, TLS, and h2c support - Move transport configuration ownership to `HttpServer` - Ensure full backward compatibility for existing HTTP-only hosts Signed-off-by: Aditya <[email protected]>
1 parent 105b903 commit 61c390f

File tree

8 files changed

+185
-70
lines changed

8 files changed

+185
-70
lines changed

crates/wash-runtime/build.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ fn main() {
2525
env::var("OUT_DIR").expect("failed to look up `OUT_DIR` from environment variables"),
2626
);
2727
let workspace_dir = workspace_dir().expect("failed to get workspace dir");
28+
println!("cargo:rerun-if-changed=tests/proto");
29+
println!("cargo:rerun-if-changed=proto");
2830
let top_proto_dir = workspace_dir.join("proto");
2931
let proto_dir = top_proto_dir.join("wasmcloud/runtime/v2");
3032

@@ -58,15 +60,15 @@ fn main() {
5860
}
5961

6062
// Compile test proto files if they exist
61-
let test_proto_file = workspace_dir.join("tests/proto/helloworld.proto");
62-
if test_proto_file.exists() {
63-
println!("cargo:rerun-if-changed={}", test_proto_file.display());
63+
let crate_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").unwrap());
64+
let test_proto_file = crate_dir.join("tests/proto/helloworld.proto");
6465

65-
tonic_prost_build::configure()
66-
.build_client(false)
67-
.build_server(true)
68-
.build_transport(true)
69-
.compile_protos(&[&test_proto_file], &[&workspace_dir.join("tests/proto")])
70-
.expect("failed to compile test protos");
71-
}
66+
println!("cargo:rerun-if-changed={}", test_proto_file.display());
67+
68+
tonic_prost_build::configure()
69+
.build_client(false)
70+
.build_server(true)
71+
.build_transport(true)
72+
.compile_protos(&[&test_proto_file], &[&crate_dir.join("tests/proto")])
73+
.expect("failed to compile test protos");
7274
}

crates/wash-runtime/src/host/http.rs

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::{collections::HashMap, net::SocketAddr, path::Path, sync::Arc};
2222

2323
use crate::engine::ctx::Ctx;
2424
use crate::engine::workload::ResolvedWorkload;
25+
use crate::host::transport::CompositeOutgoingHandler;
2526
use crate::wit::WitInterface;
2627
use anyhow::{Context, ensure};
2728
use hyper::server::conn::http1;
@@ -276,6 +277,7 @@ pub struct HttpServer<T: Router> {
276277
workload_handles: WorkloadHandles,
277278
shutdown_tx: Arc<RwLock<Option<mpsc::Sender<()>>>>,
278279
tls_acceptor: Option<TlsAcceptor>,
280+
outgoing: CompositeOutgoingHandler,
279281
}
280282

281283
impl<T: Router> std::fmt::Debug for HttpServer<T> {
@@ -296,12 +298,22 @@ impl<T: Router> HttpServer<T> {
296298
/// # Returns
297299
/// A new `HttpServer` instance configured for HTTP connections.
298300
pub fn new(router: T, addr: SocketAddr) -> Self {
301+
Self::with_outgoing_handler(router, addr, CompositeOutgoingHandler::default())
302+
}
303+
304+
/// Creates a new HTTP server with custom outgoing handlers
305+
pub fn with_outgoing_handler(
306+
router: T,
307+
addr: SocketAddr,
308+
outgoing: CompositeOutgoingHandler,
309+
) -> Self {
299310
Self {
300311
router: Arc::new(router),
301312
addr,
302313
workload_handles: Arc::default(),
303314
shutdown_tx: Arc::new(RwLock::new(None)),
304315
tls_acceptor: None,
316+
outgoing,
305317
}
306318
}
307319

@@ -325,6 +337,7 @@ impl<T: Router> HttpServer<T> {
325337
cert_path: &Path,
326338
key_path: &Path,
327339
ca_path: Option<&Path>,
340+
outgoing: Option<CompositeOutgoingHandler>,
328341
) -> anyhow::Result<Self> {
329342
let tls_config = load_tls_config(cert_path, key_path, ca_path).await?;
330343
let tls_acceptor = TlsAcceptor::from(Arc::new(tls_config));
@@ -335,6 +348,7 @@ impl<T: Router> HttpServer<T> {
335348
workload_handles: Arc::default(),
336349
shutdown_tx: Arc::new(RwLock::new(None)),
337350
tls_acceptor: Some(tls_acceptor),
351+
outgoing: outgoing.unwrap_or_default(),
338352
})
339353
}
340354
}
@@ -430,32 +444,8 @@ impl<T: Router> HostHandler for HttpServer<T> {
430444
wasmtime_wasi_http::HttpError::trap(anyhow::anyhow!("request not allowed: {}", e))
431445
})?;
432446

433-
// NOTE(lxf): Bring wasi-http code if needed
434-
// Separate HTTP / GRPC handling
435-
let is_grpc = request
436-
.headers()
437-
.get(hyper::header::CONTENT_TYPE)
438-
.and_then(|v| v.to_str().ok())
439-
.map(|v| v.starts_with("application/grpc"))
440-
.unwrap_or(false);
441-
442-
// Route to appropriate client plugin
443-
if is_grpc {
444-
#[cfg(feature = "grpc")]
445-
{
446-
return crate::grpc::send_request(request, config);
447-
}
448-
#[cfg(not(feature = "grpc"))]
449-
{
450-
return Err(wasmtime_wasi_http::HttpError::trap(anyhow::anyhow!(
451-
"gRPC requests are not supported. Please enable the 'grpc' feature."
452-
)));
453-
}
454-
}
455-
456-
Ok(wasmtime_wasi_http::types::default_send_request(
457-
request, config,
458-
))
447+
// Route through composite handler
448+
self.outgoing.send_request(request, config)
459449
}
460450
}
461451

crates/wash-runtime/src/host/mod.rs

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ mod sysinfo;
5959
use sysinfo::SystemMonitor;
6060

6161
pub mod http;
62+
pub mod transport;
6263

6364
/// The API for interacting with a wasmcloud host.
6465
///
@@ -603,8 +604,8 @@ pub struct HostConfig {
603604
}
604605

605606
/// Builder for the [`Host`]
606-
#[derive(Default)]
607607
pub struct HostBuilder {
608+
id: String,
608609
engine: Option<Engine>,
609610
plugins: HashMap<&'static str, Arc<dyn HostPlugin>>,
610611
hostname: Option<String>,
@@ -634,6 +635,10 @@ impl HostBuilder {
634635
Self::default()
635636
}
636637

638+
pub fn id(&self) -> &str {
639+
&self.id
640+
}
641+
637642
pub fn with_engine(mut self, engine: Engine) -> Self {
638643
self.engine = Some(engine);
639644
self
@@ -645,12 +650,6 @@ impl HostBuilder {
645650
self
646651
}
647652

648-
#[cfg(feature = "grpc")]
649-
pub fn with_grpc(mut self, config: HashMap<String, String>) -> Self {
650-
self.grpc_config = Some(config);
651-
self
652-
}
653-
654653
pub fn with_plugin<T: HostPlugin>(mut self, plugin: Arc<T>) -> anyhow::Result<Self> {
655654
let plugin_id = plugin.id();
656655

@@ -662,6 +661,7 @@ impl HostBuilder {
662661
self.plugins.insert(plugin_id, plugin);
663662
Ok(self)
664663
}
664+
665665
/// Sets the hostname for this host.
666666
///
667667
/// # Arguments
@@ -721,11 +721,6 @@ impl HostBuilder {
721721
/// # Errors
722722
/// Returns an error if the default engine cannot be created (when no engine is provided).
723723
pub fn build(self) -> anyhow::Result<Host> {
724-
#[cfg(feature = "grpc")]
725-
if let Some(config) = self.grpc_config {
726-
crate::grpc::init_grpc(&config)?;
727-
}
728-
729724
let engine = if let Some(engine) = self.engine {
730725
engine
731726
} else {
@@ -758,7 +753,7 @@ impl HostBuilder {
758753
engine,
759754
workloads: Arc::default(),
760755
plugins: self.plugins,
761-
id: uuid::Uuid::new_v4().to_string(),
756+
id: self.id,
762757
hostname,
763758
friendly_name,
764759
version: env!("CARGO_PKG_VERSION").to_string(),
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
//! Modular outgoing request handling for HTTP server
2+
3+
use std::collections::HashMap;
4+
use std::sync::Arc;
5+
use wasmtime_wasi_http::{
6+
HttpResult,
7+
body::HyperOutgoingBody,
8+
types::{HostFutureIncomingResponse, OutgoingRequestConfig},
9+
};
10+
11+
/// Trait for handling outgoing requests with specific transport protocols
12+
pub trait OutgoingHandler: Send + Sync + 'static {
13+
/// Check if this handler can handle the given request
14+
fn can_handle(&self, request: &hyper::Request<HyperOutgoingBody>) -> bool;
15+
16+
/// Send the request using this handler's transport
17+
fn send_request(
18+
&self,
19+
request: hyper::Request<HyperOutgoingBody>,
20+
config: OutgoingRequestConfig,
21+
) -> HttpResult<HostFutureIncomingResponse>;
22+
}
23+
24+
/// Default HTTP/1.1 and HTTP/2 handler
25+
#[derive(Clone, Default)]
26+
pub struct DefaultHttpHandler;
27+
28+
impl OutgoingHandler for DefaultHttpHandler {
29+
fn can_handle(&self, _request: &hyper::Request<HyperOutgoingBody>) -> bool {
30+
true // Accepts everything
31+
}
32+
33+
fn send_request(
34+
&self,
35+
request: hyper::Request<HyperOutgoingBody>,
36+
config: OutgoingRequestConfig,
37+
) -> HttpResult<HostFutureIncomingResponse> {
38+
Ok(wasmtime_wasi_http::types::default_send_request(
39+
request, config,
40+
))
41+
}
42+
}
43+
44+
/// gRPC handler
45+
#[cfg(feature = "grpc")]
46+
pub struct GrpcHandler {
47+
_config: HashMap<String, String>,
48+
}
49+
50+
#[cfg(feature = "grpc")]
51+
impl GrpcHandler {
52+
pub fn new(config: HashMap<String, String>) -> anyhow::Result<Self> {
53+
crate::grpc::init_grpc(&config)?;
54+
Ok(Self { _config: config })
55+
}
56+
}
57+
58+
#[cfg(feature = "grpc")]
59+
impl OutgoingHandler for GrpcHandler {
60+
fn can_handle(&self, request: &hyper::Request<HyperOutgoingBody>) -> bool {
61+
request
62+
.headers()
63+
.get(hyper::header::CONTENT_TYPE)
64+
.and_then(|v| v.to_str().ok())
65+
.map(|v| v.starts_with("application/grpc"))
66+
.unwrap_or(false)
67+
}
68+
69+
fn send_request(
70+
&self,
71+
request: hyper::Request<HyperOutgoingBody>,
72+
config: OutgoingRequestConfig,
73+
) -> HttpResult<HostFutureIncomingResponse> {
74+
crate::grpc::send_request(request, config)
75+
}
76+
}
77+
78+
/// Chains multiple outgoing handlers together
79+
pub struct CompositeOutgoingHandler {
80+
handlers: Vec<Arc<dyn OutgoingHandler>>,
81+
default: Arc<dyn OutgoingHandler>,
82+
}
83+
84+
impl CompositeOutgoingHandler {
85+
pub fn new() -> Self {
86+
Self {
87+
handlers: Vec::new(),
88+
default: Arc::new(DefaultHttpHandler),
89+
}
90+
}
91+
92+
pub fn with_handler(mut self, handler: Arc<dyn OutgoingHandler>) -> Self {
93+
self.handlers.push(handler);
94+
self
95+
}
96+
97+
#[cfg(feature = "grpc")]
98+
pub fn with_grpc(self, config: HashMap<String, String>) -> anyhow::Result<Self> {
99+
let grpc = GrpcHandler::new(config)?;
100+
Ok(self.with_handler(Arc::new(grpc)))
101+
}
102+
103+
pub fn send_request(
104+
&self,
105+
request: hyper::Request<HyperOutgoingBody>,
106+
config: OutgoingRequestConfig,
107+
) -> HttpResult<HostFutureIncomingResponse> {
108+
for handler in &self.handlers {
109+
if handler.can_handle(&request) {
110+
tracing::debug!(
111+
uri = %request.uri(),
112+
"routing outgoing request to custom handler"
113+
);
114+
return handler.send_request(request, config);
115+
}
116+
}
117+
118+
tracing::debug!(uri = %request.uri(), "routing to default HTTP handler");
119+
self.default.send_request(request, config)
120+
}
121+
}
122+
123+
impl Default for CompositeOutgoingHandler {
124+
fn default() -> Self {
125+
Self::new()
126+
}
127+
}

crates/wash-runtime/src/washlet/mod.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
#[cfg(feature = "grpc")]
2-
use std::collections::HashMap;
31
use std::env;
42
use std::sync::Arc;
53
use std::time::Duration;
@@ -37,8 +35,6 @@ pub struct ClusterHostBuilder {
3735
host_group: Option<String>,
3836
host_name: Option<String>,
3937
heartbeat_interval: Option<Duration>,
40-
#[cfg(feature = "grpc")]
41-
grpc_config: Option<HashMap<String, String>>,
4238
}
4339

4440
impl ClusterHostBuilder {
@@ -62,12 +58,6 @@ impl ClusterHostBuilder {
6258
self
6359
}
6460

65-
#[cfg(feature = "grpc")]
66-
pub fn with_grpc(mut self, config: HashMap<String, String>) -> Self {
67-
self.grpc_config = Some(config);
68-
self
69-
}
70-
7161
pub fn with_plugin<T: HostPlugin>(mut self, plugin: Arc<T>) -> anyhow::Result<Self> {
7262
self.host_builder = self.host_builder.with_plugin(plugin)?;
7363
Ok(self)

0 commit comments

Comments
 (0)