Skip to content

Commit c015cdb

Browse files
authored
Merge pull request #4 from golemcloud/dynamic-subscribe
Support for dynamically overridden pollables
2 parents 0748876 + edff924 commit c015cdb

3 files changed

Lines changed: 102 additions & 13 deletions

File tree

crates/wasi/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ pub use self::ctx::{WasiCtx, WasiCtxBuilder, WasiImpl, WasiView};
207207
pub use self::error::{I32Exit, TrappableError};
208208
pub use self::filesystem::{DirPerms, FileInputStream, FilePerms, FsError, FsResult, ReaddirIterator};
209209
pub use self::network::{Network, SocketAddrUse, SocketError, SocketResult};
210-
pub use self::poll::{subscribe, ClosureFuture, MakeFuture, Pollable, PollableFuture, Subscribe};
210+
pub use self::poll::{dynamic_subscribe, subscribe, ClosureFuture, DynamicSubscribe, OverrideSelf, MakeFuture, Pollable, PollableFuture, Subscribe};
211211
pub use self::random::{thread_rng, Deterministic};
212212
pub use self::stdio::{
213213
stderr, stdin, stdout, AsyncStdinStream, AsyncStdoutStream, IsATTY, OutputFile, Stderr, Stdin,

crates/wasi/src/poll.rs

Lines changed: 95 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,15 @@ use std::collections::HashMap;
55
use std::future::Future;
66
use std::pin::Pin;
77
use std::task::{Context, Poll};
8-
use wasmtime::component::{Resource, ResourceTable};
98
use std::time::Instant;
9+
use wasmtime::component::{Resource, ResourceTable};
1010

11-
pub type PollableFuture<'a> = Pin<Box<dyn Future<Output=()> + Send + 'a>>;
11+
pub type PollableFuture<'a> = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
1212
pub type MakeFuture = for<'a> fn(&'a mut dyn Any) -> PollableFuture<'a>;
1313
pub type ClosureFuture = Box<dyn Fn() -> PollableFuture<'static> + Send + 'static>;
1414

15+
pub type OverrideSelf = fn(&dyn Any) -> Option<u32>;
16+
1517
/// A host representation of the `wasi:io/poll.pollable` resource.
1618
///
1719
/// A pollable is not the same thing as a Rust Future: the same pollable may be used to
@@ -20,6 +22,7 @@ pub type ClosureFuture = Box<dyn Fn() -> PollableFuture<'static> + Send + 'stati
2022
/// Pollable contains a way to create a Future in each call to `poll`.
2123
pub struct Pollable {
2224
index: u32,
25+
override_self: Option<OverrideSelf>,
2326
make_future: MakeFuture,
2427
remove_index_on_delete: Option<fn(&mut ResourceTable, u32) -> Result<()>>,
2528
pub supports_suspend: Option<Instant>,
@@ -52,7 +55,7 @@ pub struct Pollable {
5255
/// let end = Instant::now() + dur;
5356
/// let sleep = MySleep { end };
5457
/// let sleep_resource = cx.table().push(sleep)?;
55-
/// subscribe(cx.table(), sleep_resource)
58+
/// subscribe(cx.table(), sleep_resource, None)
5659
/// }
5760
///
5861
/// struct MySleep {
@@ -95,18 +98,71 @@ pub fn subscribe<T>(
9598
resource: Resource<T>,
9699
supports_suspend: Option<Instant>,
97100
) -> Result<Resource<Pollable>>
101+
where
102+
T: Subscribe,
103+
{
104+
fn make_future<'a, T>(stream: &'a mut dyn Any) -> PollableFuture<'a>
98105
where
99106
T: Subscribe,
107+
{
108+
stream.downcast_mut::<T>().unwrap().ready()
109+
}
110+
111+
let pollable = Pollable {
112+
index: resource.rep(),
113+
override_self: None,
114+
remove_index_on_delete: if resource.owned() {
115+
Some(|table, idx| {
116+
let resource = Resource::<T>::new_own(idx);
117+
table.delete(resource)?;
118+
Ok(())
119+
})
120+
} else {
121+
None
122+
},
123+
make_future: make_future::<T>,
124+
supports_suspend,
125+
};
126+
127+
Ok(table.push_child(pollable, &resource)?)
128+
}
129+
130+
/// An advanced version of Subscribe supporting dynamically switching the underlying table entry
131+
///
132+
/// This can be used to implement "lazy initialized" pollables.
133+
#[async_trait::async_trait]
134+
pub trait DynamicSubscribe: Subscribe {
135+
/// Returns the table index to another `Pollable` entry in case the override should happen
136+
fn override_index(&self) -> Option<u32>;
137+
}
138+
139+
/// Creates a `pollable` resource from a `DynamicSubscribe` implementation
140+
pub fn dynamic_subscribe<T>(
141+
table: &mut ResourceTable,
142+
resource: Resource<T>,
143+
supports_suspend: Option<Instant>,
144+
) -> Result<Resource<Pollable>>
145+
where
146+
T: DynamicSubscribe,
100147
{
101148
fn make_future<'a, T>(stream: &'a mut dyn Any) -> PollableFuture<'a>
102-
where
103-
T: Subscribe,
149+
where
150+
T: DynamicSubscribe,
104151
{
105152
stream.downcast_mut::<T>().unwrap().ready()
106153
}
107154

155+
fn override_self<'a, T>(entry: &'a dyn Any) -> Option<u32>
156+
where
157+
T: DynamicSubscribe,
158+
{
159+
let entry = entry.downcast_ref::<T>().unwrap();
160+
entry.override_index()
161+
}
162+
108163
let pollable = Pollable {
109164
index: resource.rep(),
165+
override_self: Some(override_self::<T>),
110166
remove_index_on_delete: if resource.owned() {
111167
Some(|table, idx| {
112168
let resource = Resource::<T>::new_own(idx);
@@ -134,14 +190,16 @@ where
134190
if pollables.is_empty() {
135191
return Err(anyhow!("empty poll list"));
136192
}
137-
193+
138194
let mut table_futures: HashMap<u32, (MakeFuture, Vec<ReadylistIndex>)> = HashMap::new();
139195
let mut all_supports_suspend = Some(None);
140196

141197
for (ix, p) in pollables.iter().enumerate() {
142198
let ix: u32 = ix.try_into()?;
143199

144-
let pollable = self.table().get(p)?;
200+
let table = self.table();
201+
let pollable = get_pollable_following_overrides(table, p)?;
202+
145203
let (_, list) = table_futures
146204
.entry(pollable.index)
147205
.or_insert((pollable.make_future, Vec::new()));
@@ -166,10 +224,9 @@ where
166224
return Err((self.ctx().suspend_signal)(duration));
167225
}
168226
}
227+
169228
let mut futures: Vec<(PollableFuture<'_>, Vec<ReadylistIndex>)> = Vec::new();
170-
for (entry, (make_future, readylist_indices)) in
171-
self.table().iter_entries(table_futures)
172-
{
229+
for (entry, (make_future, readylist_indices)) in self.table().iter_entries(table_futures) {
173230
let entry = entry?;
174231
futures.push((make_future(entry), readylist_indices));
175232
}
@@ -211,14 +268,16 @@ where
211268
{
212269
async fn block(&mut self, pollable: Resource<Pollable>) -> Result<()> {
213270
let table = self.table();
214-
let pollable = table.get(&pollable)?;
271+
let pollable = get_pollable_following_overrides(table, &pollable)?;
272+
215273
let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?);
216274
ready.await;
217275
Ok(())
218276
}
219277
async fn ready(&mut self, pollable: Resource<Pollable>) -> Result<bool> {
220278
let table = self.table();
221-
let pollable = table.get(&pollable)?;
279+
let pollable = get_pollable_following_overrides(table, &pollable)?;
280+
222281
let ready = (pollable.make_future)(table.get_any_mut(pollable.index)?);
223282
futures::pin_mut!(ready);
224283
Ok(matches!(
@@ -235,6 +294,30 @@ where
235294
}
236295
}
237296

297+
fn get_pollable_following_overrides<'a>(
298+
table: &'a ResourceTable,
299+
pollable: &Resource<Pollable>,
300+
) -> Result<&'a Pollable> {
301+
let mut pollable = table.get(&pollable)?;
302+
loop {
303+
if let Some(override_self) = &pollable.override_self {
304+
let entry = table.get_any(pollable.index)?;
305+
let pollable_override = override_self(entry);
306+
if let Some(overridden_idx) = pollable_override {
307+
pollable = table
308+
.get_any(overridden_idx)?
309+
.downcast_ref()
310+
.ok_or_else(|| anyhow!("Pollable override does not point to a Pollable"))?;
311+
} else {
312+
break;
313+
}
314+
} else {
315+
break;
316+
}
317+
}
318+
Ok(pollable)
319+
}
320+
238321
pub mod sync {
239322
use crate::{
240323
bindings::io::poll as async_poll,

crates/wasmtime/src/runtime/component/resource_table.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,12 @@ impl ResourceTable {
249249
.ok_or(ResourceTableError::WrongType)
250250
}
251251

252+
/// Returns the raw `Any` at the `key` index provided.
253+
pub fn get_any(&self, key: u32) -> Result<&dyn Any, ResourceTableError> {
254+
let r = self.occupied(key)?;
255+
Ok(&*r.entry)
256+
}
257+
252258
/// Returns the raw `Any` at the `key` index provided.
253259
pub fn get_any_mut(&mut self, key: u32) -> Result<&mut dyn Any, ResourceTableError> {
254260
let r = self.occupied_mut(key)?;

0 commit comments

Comments
 (0)