Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 35 additions & 9 deletions crates/wash-runtime/src/washlet/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;

use crate::host::{Host, HostApi, HostConfig};
use crate::oci::{self, OciConfig};
use crate::plugin::HostPlugin;
use anyhow::Context as _;
use futures::StreamExt as _;
use futures::{FutureExt, StreamExt as _};
use opentelemetry::KeyValue;
use opentelemetry_sdk::resource::{Resource, ResourceBuilder};
use opentelemetry_semantic_conventions::resource;
Expand Down Expand Up @@ -118,9 +119,31 @@ impl ClusterHost {
}
}

pub struct RunningHost {
host: Arc<Host>,
run_loop: Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send + 'static>>,
}

impl RunningHost {
pub fn host(&self) -> &Host {
&self.host
}
}

impl std::future::Future for RunningHost {
type Output = anyhow::Result<()>;

fn poll(
mut self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
self.as_mut().get_mut().run_loop.poll_unpin(cx)
}
}

pub async fn run_cluster_host(
cluster_host: ClusterHost,
) -> anyhow::Result<impl Future<Output = anyhow::Result<()>>, anyhow::Error> {
) -> anyhow::Result<RunningHost, anyhow::Error> {
let (one_shot_tx, mut one_shot_rx) = oneshot::channel();
let nats_client = cluster_host.nats_client.clone();
let host = cluster_host
Expand All @@ -131,7 +154,7 @@ pub async fn run_cluster_host(

let heartbeat_interval = cluster_host.heartbeat_interval;
let host_id = host.id().to_string();
let host = host.clone();
let host_clone = host.clone();

let task = tokio::task::spawn(async move {
let host_subject = host_subject(host_id.as_ref());
Expand All @@ -149,18 +172,18 @@ pub async fn run_cluster_host(
// Shutdown signal
_ = &mut one_shot_rx => {
api_subscription.unsubscribe().await.context("failed to unsubscribe from API requests")?;
return host.stop().await.context("failed to stop host");
return host_clone.stop().await.context("failed to stop host");
}
// Send heartbeat
_ = heartbeat_timer.tick() => {
let heartbeat = host_heartbeat(&host).await?;
let heartbeat = host_heartbeat(&host_clone).await?;
let heartbeat_bytes = serde_json::to_vec(&heartbeat)
.context("failed to serialize heartbeat")?;
nats_client.publish(heartbeat_subject.clone(), heartbeat_bytes.into()).await.context("failed to publish heartbeat")?;
}
// Handle API requests
Some(msg) = api_subscription.next() => {
let response = handle_command(host.as_ref(), &msg, host.config()).await;
let response = handle_command(host_clone.as_ref(), &msg, host_clone.config()).await;
match response {
Ok(resp_bytes) => {
if let Some(reply_to) = msg.reply {
Expand All @@ -176,9 +199,12 @@ pub async fn run_cluster_host(
}
});

Ok(async move {
let _ = one_shot_tx.send(());
task.await?
Ok(RunningHost {
host,
run_loop: Box::pin(async move {
let _ = one_shot_tx.send(());
task.await?
}),
})
}

Expand Down