-
Notifications
You must be signed in to change notification settings - Fork 737
fix: worker to graceful shutdown after finishing in-flight requests #4838
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: hongkuanz <[email protected]>
WalkthroughChanges introduce graceful shutdown mechanisms across the network ingress pipeline and mocker engine by tracking in-flight requests, awaiting their completion before finalization, and adding structured logging throughout shutdown paths. This affects cancellation semantics, join handle management, and request draining behavior. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes
Poem
Pre-merge checks❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
lib/runtime/src/pipeline/network/ingress/http_endpoint.rs (1)
108-128: Solid implementation of inflight request draining with one minor consideration.The structured logging and wait-for-inflight logic is well-implemented. However, there's a subtle race condition: between loading
inflight_countat line 114 and entering the wait loop at line 121-123, new notifications may be missed.Consider this scenario:
inflight_countloads as 1- The last in-flight request completes, calls
notify_one(), decrements to 0- The wait loop starts, but no more notifications will come
This is mitigated because you re-check
handler.inflight.load()in the while condition, but you could miss wakeups if notification happens between the condition check andnotified().await. The current pattern should work in practice due to the re-check, but you may want to useNotify::notified()before the condition check for stricter correctness:if inflight_count > 0 { tracing::info!( endpoint_name = %endpoint_name, inflight_count = inflight_count, "Waiting for inflight HTTP requests to complete" ); - while handler.inflight.load(Ordering::SeqCst) > 0 { - handler.notify.notified().await; - } + loop { + let notified = handler.notify.notified(); + if handler.inflight.load(Ordering::SeqCst) == 0 { + break; + } + notified.await; + }lib/runtime/src/pipeline/network/ingress/push_endpoint.rs (1)
138-152: Good conditional waiting logic with consistent logging.The implementation correctly waits for inflight requests only when
inflight_count > 0, avoiding unnecessary waits. The sameNotifyrace consideration fromhttp_endpoint.rsapplies here—consider registeringnotified()before checking the condition for stricter correctness (see comment on http_endpoint.rs).
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
lib/llm/src/mocker/engine.rs(2 hunks)lib/runtime/src/pipeline/network/ingress/http_endpoint.rs(1 hunks)lib/runtime/src/pipeline/network/ingress/nats_server.rs(4 hunks)lib/runtime/src/pipeline/network/ingress/push_endpoint.rs(1 hunks)lib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs(1 hunks)
🧰 Additional context used
🧠 Learnings (8)
📓 Common learnings
Learnt from: oandreeva-nv
Repo: ai-dynamo/dynamo PR: 2989
File: lib/llm/src/block_manager/distributed/transfer.rs:6-6
Timestamp: 2025-09-18T21:47:44.143Z
Learning: For PR ai-dynamo/dynamo#2989, the ConnectorTransferBatcher architectural issues will be addressed in a follow-up PR by removing the duplicate batching logic and integrating distributed transfers with the existing TransferBatcher + LocalTransferManager pipeline, rather than adding bounded concurrency primitives like Semaphore.
📚 Learning: 2025-08-27T17:56:14.690Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 2500
File: lib/llm/src/migration.rs:58-77
Timestamp: 2025-08-27T17:56:14.690Z
Learning: In lib/llm/src/migration.rs, the cancellation visibility in the Migration operator is intentionally one-way - it checks engine_ctx.is_stopped()/is_killed() to stop pulling from streams but doesn't link newly created streams as child contexts to the parent. This is a conscious architectural decision with plans for future enhancement.
Applied to files:
lib/llm/src/mocker/engine.rs
📚 Learning: 2025-07-14T21:25:56.930Z
Learnt from: ryanolson
Repo: ai-dynamo/dynamo PR: 1919
File: lib/runtime/src/engine.rs:168-168
Timestamp: 2025-07-14T21:25:56.930Z
Learning: The AsyncEngineContextProvider trait in lib/runtime/src/engine.rs was intentionally changed from `Send + Sync + Debug` to `Send + Debug` because the Sync bound was overly constraining. The trait should only require Send + Debug as designed.
Applied to files:
lib/llm/src/mocker/engine.rs
📚 Learning: 2025-05-29T06:20:12.901Z
Learnt from: ryanolson
Repo: ai-dynamo/dynamo PR: 1093
File: lib/llm/src/block_manager/block/registry.rs:98-122
Timestamp: 2025-05-29T06:20:12.901Z
Learning: In lib/llm/src/block_manager/block/registry.rs, the background task spawned for handling unregister notifications uses detached concurrency by design. The JoinHandle is intentionally not stored as this represents a reasonable architectural tradeoff for a long-running cleanup task.
Applied to files:
lib/llm/src/mocker/engine.rslib/runtime/src/pipeline/network/ingress/nats_server.rs
📚 Learning: 2025-06-02T19:37:27.666Z
Learnt from: oandreeva-nv
Repo: ai-dynamo/dynamo PR: 1195
File: lib/llm/tests/block_manager.rs:150-152
Timestamp: 2025-06-02T19:37:27.666Z
Learning: In Rust/Tokio applications, when background tasks use channels for communication, dropping the sender automatically signals task termination when the receiver gets `None`. The `start_batching_publisher` function in `lib/llm/tests/block_manager.rs` demonstrates this pattern: when the `KVBMDynamoRuntimeComponent` is dropped, its `batch_tx` sender is dropped, causing `rx.recv()` to return `None`, which triggers cleanup and task termination.
Applied to files:
lib/llm/src/mocker/engine.rs
📚 Learning: 2025-06-13T22:07:24.843Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 1424
File: lib/runtime/src/pipeline/network/egress/push_router.rs:204-209
Timestamp: 2025-06-13T22:07:24.843Z
Learning: The codebase uses async-nats version 0.40, not the older nats crate. Error handling should use async_nats::error::Error variants, not nats::Error variants.
Applied to files:
lib/runtime/src/pipeline/network/ingress/nats_server.rs
📚 Learning: 2025-09-11T03:24:47.820Z
Learnt from: kthui
Repo: ai-dynamo/dynamo PR: 3004
File: lib/runtime/src/pipeline/network/ingress/push_handler.rs:271-277
Timestamp: 2025-09-11T03:24:47.820Z
Learning: In lib/runtime/src/pipeline/network/ingress/push_handler.rs, the maintainer prefers to keep the existing error comparison logic using format!("{:?}", err) == STREAM_ERR_MSG unchanged until proper error types are implemented, even though it has technical debt. Avoid suggesting changes to working legacy code that will be refactored later.
Applied to files:
lib/runtime/src/pipeline/network/ingress/nats_server.rslib/runtime/src/pipeline/network/ingress/http_endpoint.rslib/runtime/src/pipeline/network/ingress/push_endpoint.rslib/runtime/src/pipeline/network/ingress/shared_tcp_endpoint.rs
📚 Learning: 2025-08-21T17:23:02.836Z
Learnt from: michaelfeil
Repo: ai-dynamo/dynamo PR: 2591
File: lib/bindings/python/rust/http.rs:0-0
Timestamp: 2025-08-21T17:23:02.836Z
Learning: In lib/bindings/python/rust/http.rs, the enable_endpoint method uses EndpointType::all() to dynamically support all available endpoint types with case-insensitive matching, which is more maintainable than hardcoded match statements for endpoint type mappings.
Applied to files:
lib/runtime/src/pipeline/network/ingress/http_endpoint.rslib/runtime/src/pipeline/network/ingress/push_endpoint.rs
🧬 Code graph analysis (2)
lib/llm/src/mocker/engine.rs (1)
lib/runtime/src/utils/tasks/tracker.rs (2)
cancel_token(1951-1954)active(1429-1431)
lib/runtime/src/pipeline/network/ingress/http_endpoint.rs (2)
lib/runtime/src/transports/nats.rs (1)
subject(848-850)lib/llm/src/kv/reserved.rs (1)
inflight_count(20-22)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (7)
- GitHub Check: trtllm (arm64)
- GitHub Check: trtllm (amd64)
- GitHub Check: vllm (arm64)
- GitHub Check: vllm (amd64)
- GitHub Check: sglang (amd64)
- GitHub Check: sglang (arm64)
- GitHub Check: Build and Test - dynamo
🔇 Additional comments (6)
lib/llm/src/mocker/engine.rs (2)
63-67: LGTM on the token change for graceful shutdown semantics.The switch from
child_token()toprimary_token()correctly implements Phase 3 shutdown behavior, ensuring the mocker continues serving in-flight requests during Phases 1-2. The inline comments clearly document the rationale.
149-155: Good approach to unblock waiting request handlers.Clearing
active_requestson cancellation correctly causes pendingrequest_rx.recv()calls to returnNone, allowing request handler tasks to exit gracefully. The subsequentactive.remove(&request_uuid)at line 349 is safe since removing a non-existent key is a no-op.lib/runtime/src/pipeline/network/ingress/push_endpoint.rs (1)
153-158: LGTM on the non-graceful shutdown path.Adding
endpoint_nameto the skip log improves observability and maintains consistency with the graceful path.lib/runtime/src/pipeline/network/ingress/nats_server.rs (3)
35-35: LGTM on adding JoinHandle for lifecycle tracking.Storing the
JoinHandleenables proper task lifecycle management during shutdown.
149-172: Good task lifecycle management.Capturing the
JoinHandlefromtokio::spawnand storing it inEndpointTaskenables proper coordination during unregistration.Also applies to: 184-184
198-216: Well-structured shutdown coordination with proper panic handling.The implementation correctly:
- Cancels the endpoint-specific token to trigger graceful shutdown
- Waits for the task to complete (which internally waits for inflight requests via
PushEndpoint)- Handles panics gracefully with a warning log
- Logs completion for observability
This properly implements the layered shutdown approach where
NatsMultiplexedServerdelegates inflight draining toPushEndpoint.
…i-dynamo#4838) Signed-off-by: hongkuanz <[email protected]> Co-authored-by: Biswa Panda <[email protected]>
Cleanup task wakes up, calls server.unregister_endpoint() which waits for inflight requests
Then tracker.unregister_endpoint() is called
Summary by CodeRabbit
Release Notes
Bug Fixes
Chores
✏️ Tip: You can customize this high-level summary in your review settings.