Skip to content

Commit 36245d1

Browse files
authored
Register tasks in DurableBuilder rather than Durable client (#77)
This enforces that no new tasks can be registered once a Durable client has been constructed. As a result, we can avoid using an RwLock around the TaskRegistry
1 parent 2d69424 commit 36245d1

21 files changed

+835
-470
lines changed

benches/checkpoint.rs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ fn bench_step_cache_miss(c: &mut Criterion) {
3030
let mut total_time = Duration::ZERO;
3131

3232
for _ in 0..iters {
33-
let ctx = BenchContext::new().await;
34-
ctx.client.register::<MultiStepBenchTask>().await.unwrap();
33+
let ctx = BenchContext::with_builder(|b| {
34+
b.register::<MultiStepBenchTask>().unwrap()
35+
})
36+
.await;
3537

3638
ctx.client
3739
.spawn::<MultiStepBenchTask>(MultiStepParams { num_steps })
@@ -82,8 +84,9 @@ fn bench_step_cache_hit(c: &mut Criterion) {
8284
let mut total_time = Duration::ZERO;
8385

8486
for _ in 0..iters {
85-
let ctx = BenchContext::new().await;
86-
ctx.client.register::<MultiStepBenchTask>().await.unwrap();
87+
let ctx = BenchContext::with_builder(|b| {
88+
b.register::<MultiStepBenchTask>().unwrap()
89+
}).await;
8790

8891
// First run to populate checkpoints
8992
let spawn_result = ctx
@@ -170,11 +173,10 @@ fn bench_large_payload_checkpoint(c: &mut Criterion) {
170173
let mut total_time = Duration::ZERO;
171174

172175
for _ in 0..iters {
173-
let ctx = BenchContext::new().await;
174-
ctx.client
175-
.register::<LargePayloadBenchTask>()
176-
.await
177-
.unwrap();
176+
let ctx = BenchContext::with_builder(|b| {
177+
b.register::<LargePayloadBenchTask>().unwrap()
178+
})
179+
.await;
178180

179181
ctx.client
180182
.spawn::<LargePayloadBenchTask>(LargePayloadParams { payload_size })

benches/common/setup.rs

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ pub struct BenchContext {
1717
}
1818

1919
impl BenchContext {
20-
/// Create a new benchmark context with a unique queue.
21-
/// Uses DATABASE_URL environment variable (same as tests).
22-
pub async fn new() -> Self {
20+
/// Create a new benchmark context, allowing task registration on the builder.
21+
pub async fn with_builder(f: impl FnOnce(DurableBuilder) -> DurableBuilder) -> Self {
2322
let database_url = std::env::var("DATABASE_URL")
2423
.unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5436/test".to_string());
2524

@@ -34,9 +33,11 @@ impl BenchContext {
3433
let counter = QUEUE_COUNTER.fetch_add(1, Ordering::SeqCst);
3534
let queue_name = format!("bench_{}", counter);
3635

37-
let client = DurableBuilder::new()
36+
let builder = DurableBuilder::new()
3837
.pool(pool.clone())
39-
.queue_name(&queue_name)
38+
.queue_name(&queue_name);
39+
40+
let client = f(builder)
4041
.build()
4142
.await
4243
.expect("Failed to create Durable client");
@@ -53,16 +54,13 @@ impl BenchContext {
5354
}
5455
}
5556

56-
/// Create a new Durable client using the same pool and queue.
57-
/// Useful for creating multiple workers.
57+
/// Create a new DurableBuilder using the same pool and queue.
58+
/// Useful for creating multiple workers with task registrations.
5859
#[allow(dead_code)]
59-
pub async fn new_client(&self) -> Durable {
60+
pub fn new_builder(&self) -> DurableBuilder {
6061
DurableBuilder::new()
6162
.pool(self.pool.clone())
6263
.queue_name(&self.queue_name)
63-
.build()
64-
.await
65-
.expect("Failed to create Durable client")
6664
}
6765

6866
/// Clean up the queue after benchmark

benches/concurrency.rs

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ fn bench_concurrent_claims(c: &mut Criterion) {
3030
let mut total_time = Duration::ZERO;
3131

3232
for _ in 0..iters {
33-
let ctx = BenchContext::new().await;
34-
ctx.client.register::<QuickTask>().await.unwrap();
33+
let ctx =
34+
BenchContext::with_builder(|b| b.register::<QuickTask>().unwrap())
35+
.await;
3536

3637
// Spawn all tasks
3738
for i in 0..num_tasks {
@@ -49,8 +50,13 @@ fn bench_concurrent_claims(c: &mut Criterion) {
4950

5051
// Spawn multiple worker processes
5152
for _ in 0..num_workers {
52-
let client = ctx.new_client().await;
53-
client.register::<QuickTask>().await.unwrap();
53+
let client = ctx
54+
.new_builder()
55+
.register::<QuickTask>()
56+
.unwrap()
57+
.build()
58+
.await
59+
.unwrap();
5460
let barrier = barrier.clone();
5561

5662
let handle = tokio::spawn(async move {
@@ -123,8 +129,9 @@ fn bench_claim_latency_distribution(c: &mut Criterion) {
123129
let mut total_time = Duration::ZERO;
124130

125131
for _ in 0..iters {
126-
let ctx = BenchContext::new().await;
127-
ctx.client.register::<QuickTask>().await.unwrap();
132+
let ctx =
133+
BenchContext::with_builder(|b| b.register::<QuickTask>().unwrap())
134+
.await;
128135

129136
// Spawn tasks
130137
for i in 0..num_tasks {
@@ -140,8 +147,13 @@ fn bench_claim_latency_distribution(c: &mut Criterion) {
140147
let start = std::time::Instant::now();
141148

142149
for _ in 0..num_workers {
143-
let client = ctx.new_client().await;
144-
client.register::<QuickTask>().await.unwrap();
150+
let client = ctx
151+
.new_builder()
152+
.register::<QuickTask>()
153+
.unwrap()
154+
.build()
155+
.await
156+
.unwrap();
145157
let barrier = barrier.clone();
146158

147159
let handle = tokio::spawn(async move {

benches/throughput.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@ fn bench_spawn_latency(c: &mut Criterion) {
1717
group.bench_function("single_spawn", |b| {
1818
b.iter_custom(|iters| {
1919
rt.block_on(async {
20-
let ctx = BenchContext::new().await;
21-
ctx.client.register::<NoOpTask>().await.unwrap();
20+
let ctx = BenchContext::with_builder(|b| b.register::<NoOpTask>().unwrap()).await;
2221

2322
let start = std::time::Instant::now();
2423
for _ in 0..iters {
@@ -57,8 +56,9 @@ fn bench_task_throughput(c: &mut Criterion) {
5756
let mut total_time = Duration::ZERO;
5857

5958
for _ in 0..iters {
60-
let ctx = BenchContext::new().await;
61-
ctx.client.register::<QuickTask>().await.unwrap();
59+
let ctx =
60+
BenchContext::with_builder(|b| b.register::<QuickTask>().unwrap())
61+
.await;
6262

6363
// Spawn all tasks first
6464
for i in 0..num_tasks {
@@ -113,8 +113,7 @@ fn bench_e2e_completion(c: &mut Criterion) {
113113
group.bench_function("single_task_roundtrip", |b| {
114114
b.iter_custom(|iters| {
115115
rt.block_on(async {
116-
let ctx = BenchContext::new().await;
117-
ctx.client.register::<NoOpTask>().await.unwrap();
116+
let ctx = BenchContext::with_builder(|b| b.register::<NoOpTask>().unwrap()).await;
118117

119118
let worker = ctx
120119
.client

0 commit comments

Comments
 (0)