-
Notifications
You must be signed in to change notification settings - Fork 738
feat: kvbm v2 + nova #4892
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: kvbm v2 + nova #4892
Conversation
Signed-off-by: Ryan Olson <[email protected]>
…vements Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
…nvcc+cuda being present Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
…sing files Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
WalkthroughThis PR introduces comprehensive KVBM v2 architecture with distributed leader support, CUDA kernel bindings, and extensive Python/Rust integration. Major additions include new workspace libraries (identity, kvbm-config, kvbm-kernels, kvbm), feature-gated Python v2 bindings with vLLM integration, distributed session management, and GPU tensor packing kernels. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes This PR introduces substantial architectural changes with significant scope and heterogeneity:
Areas requiring extra attention:
Poem
Pre-merge checks❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 13
Note
Due to the large number of review comments, Critical severity comments were prioritized as inline comments.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
lib/bindings/kvbm/src/block_manager/vllm/connector/leader/slot.rs (1)
179-190:_kvbm_metricsrename is OK; consider also wiring cancellation into the periodic metrics task to avoid task leaks.Because the spawned loop captures clones, it can keep running even after
ConnectorSlotManageris dropped. Consider gating it onget_current_cancel_token()(or storing a JoinHandle / CriticalTaskExecutionHandle) so it stops when the manager is dropped / runtime cancels.Also applies to: 253-260
🟠 Major comments (30)
dynamo.code-workspace-8-11 (1)
8-11: Restore linkedProjects for removed Rust crates.The workspace configuration removed
launch/dynamo-run/Cargo.tomlandlib/bindings/python/Cargo.tomlfrom rust-analyzer.linkedProjects, but both files still exist in the repository. Developers working on these crates will lose rust-analyzer support. Add them back alongsidelib/bindings/kvbm/Cargo.tomlunless these crates are being deprecated.lib/kvbm/src/v2/distributed/leader/session/controllable.rs-189-237 (1)
189-237: Staging failure makes the session non-retriable (staging_started=truesticks).
Ifstage_g3_to_g2_internal()errors, subsequentTriggerStagingcalls no-op forever.At minimum, clear
staging_started(and possibly revertphase) on error, and consider sendingSessionErrorto the controller when attached.Also applies to: 318-397
lib/bindings/kvbm/src/dynamo/mod.rs-14-31 (1)
14-31: Don’t rely on panics in module init paths (prefer surfacing PyErr), especially around OTEL/logging init.
add_to_module()is effectively “import-time” for Python users; panics here are brutal to debug.Consider making
init_pyo3_tokio_rt()returnPyResult<()>(oranyhow::Result<()>) and propagate errors fromadd_to_module()instead ofexpect(...).lib/kvbm/src/v2/distributed/leader/session/controllable.rs-252-266 (1)
252-266: Phase machine bug: sessions with no G3 blocks never becomeReady.
update_phase()requiresstaging_complete == true, butstaging_completeis never set when there’s nothing to stage.One fix: treat “no G3 blocks” as ready regardless of staging flags.
fn update_phase(&mut self) { @@ - // If staging is complete and no G3 blocks remain, we're ready - if self.staging_complete && self.g3_blocks.is_empty() { + // If there are no G3 blocks, staging is irrelevant; we're ready. + if self.g3_blocks.is_empty() { self.phase = RemoteSessionPhase::Ready; } else if self.staging_started && !self.staging_complete { self.phase = RemoteSessionPhase::Staging; } }(Alternative: initialize
staging_complete = g3_blocks.is_empty()innew().)lib/bindings/kvbm/src/dynamo/mod.rs-88-114 (1)
88-114: Decide whether a deadDistributedRuntimeshould beOk(None)vs raisingPyRuntimeError(current behavior contradicts theOptionreturn).
Right now, weak-ref upgrade failure becomes an exception, not “no runtime”.If callers are expected to treat dead runtimes as “absent”, switch to:
- let strong = weak.upgrade().ok_or_else(|| { - PyRuntimeError::new_err("runtime is no longer alive (weak ref upgrade failed)") - })?; - - Ok(Some(strong)) + Ok(weak.upgrade())lib/bindings/kvbm/src/dynamo/mod.rs-40-65 (1)
40-65:available_parallelism().unwrap()can panic; use a safe fallback.
This is a library crate; defaulting to1thread is better than aborting import.let rt = tokio::runtime::Builder::new_multi_thread() .worker_threads( cfg.num_worker_threads - .unwrap_or_else(|| std::thread::available_parallelism().unwrap().get()), + .unwrap_or_else(|| { + std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1) + }), )lib/bindings/kvbm/python/kvbm/v2/vllm/connectors/leader.py-102-114 (1)
102-114:build_connector_meta()always returns empty bytes (likely breaks real transfers even with Rust backing).
If this path can run in production, it should delegate to Rust when available or fail loudly.def build_connector_meta( self, scheduler_output: "SchedulerOutput" ) -> KVConnectorMetadata: @@ - # TODO: Wire up to Rust build_connector_metadata when SchedulerOutput - # conversion is implemented - return DynamoSchedulerConnectorMetadata(b"") + if self._rust_leader is not None: + # TODO: implement conversion; until then, fail explicitly to avoid silent corruption. + raise NotImplementedError("Rust-backed leader requires build_connector_meta implementation") + return DynamoSchedulerConnectorMetadata(b"")lib/bindings/kvbm/src/dynamo/mod.rs-62-64 (1)
62-64: Handleinit_with_runtimeerrors instead of silently ignoring them.The function returns
Err(())if the pyo3-async-runtimes tokio runtime is already initialized. WhileOnceLockguarantees single execution in normal flow, silently ignoring initialization errors prevents debugging if something goes wrong. This inconsistently contrasts with the error handling inlib/bindings/python/rust/lib.rs, which explicitly handles the same error. Either propagate the error or panic on failure to align with the pattern used elsewhere in this file (e.g.,.expect()calls on critical initialization).- let _ = pyo3_async_runtimes::tokio::init_with_runtime(rt_ref); + pyo3_async_runtimes::tokio::init_with_runtime(rt_ref) + .expect("failed to initialize pyo3-async-runtimes tokio runtime");lib/kvbm-config/src/cache.rs-39-46 (1)
39-46: Guardbytes_per_block == 0+ reject non-finite/negativecache_size_gbbefore casting.
Current code can produce absurdnum_blockson invalid inputs (division by zero, NaN/Inf, negative GB).@@ impl HostCacheConfig { @@ pub fn compute_num_blocks(&self, bytes_per_block: usize) -> Option<usize> { + if bytes_per_block == 0 { + return None; + } self.num_blocks.or_else(|| { self.cache_size_gb.map(|gb| { - // Convert GB to bytes and divide by block size - ((gb * 1_000_000_000.0) / bytes_per_block as f64) as usize + let bytes = gb * 1_000_000_000.0; + if !bytes.is_finite() || bytes < 0.0 { + return 0; + } + (bytes / bytes_per_block as f64).floor() as usize }) }) } } @@ impl DiskCacheConfig { @@ pub fn compute_num_blocks(&self, bytes_per_block: usize) -> Option<usize> { + if bytes_per_block == 0 { + return None; + } self.num_blocks.or_else(|| { self.cache_size_gb.map(|gb| { - // Convert GB to bytes and divide by block size - ((gb * 1_000_000_000.0) / bytes_per_block as f64) as usize + let bytes = gb * 1_000_000_000.0; + if !bytes.is_finite() || bytes < 0.0 { + return 0; + } + (bytes / bytes_per_block as f64).floor() as usize }) }) } }If you want invalid
cache_size_gbto be a hard config error, prefer addingvalidatorrules and returningResultfromcompute_num_blocks.Also applies to: 90-97
lib/kvbm-config/src/discovery.rs-59-110 (1)
59-110: Add validation for “required” fields (esp.cluster_id) and avoidDefaultproducing invalid configs.
cluster_idis documented as required but empty strings pass today;Defaulthardcodescluster_id: "", which is almost certainly not a valid runtime config.@@ #[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct EtcdDiscoveryConfig { @@ + #[validate(length(min = 1))] pub cluster_id: String, @@ #[serde(default = "default_etcd_endpoints")] + #[validate(length(min = 1))] pub endpoints: Vec<String>, @@ #[serde(default = "default_operation_timeout")] + #[validate(range(min = 1, max = 600))] pub operation_timeout_secs: u64, @@ } @@ #[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct P2pDiscoveryConfig { @@ + #[validate(length(min = 1))] pub cluster_id: String, @@ #[serde(default = "default_replication_factor")] + #[validate(range(min = 1, max = 100))] pub replication_factor: usize, @@ #[serde(default = "default_record_ttl")] + #[validate(range(min = 1, max = 86_400))] pub record_ttl_secs: u64, }Based on learnings, consider not implementing
Defaultfor configs with required fields, or gatingDefaultbehindcfg(test)to reduce accidental misuse.Also applies to: 142-165
lib/kvbm-config/src/nixl.rs-43-137 (1)
43-137: Enforce backend-name normalization on all construction paths (serde +new()).
Right now only the fluent builders normalize keys;new()and serde-deserialization can leave lowercase keys that then breakhas_backend()/backend_params()(which uppercase the lookup). This also contradicts “All backend names are normalized to uppercase.”@@ use dynamo_memory::nixl::NixlBackendConfig; use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use serde::de::Error as _; use validator::Validate; @@ #[derive(Debug, Clone, Serialize, Deserialize, Validate)] pub struct NixlConfig { @@ - #[serde(default = "default_backends")] + #[serde( + default = "default_backends", + deserialize_with = "deserialize_backends_uppercase" + )] pub backends: HashMap<String, HashMap<String, String>>, @@ +fn deserialize_backends_uppercase<'de, D>( + deserializer: D, +) -> Result<HashMap<String, HashMap<String, String>>, D::Error> +where + D: serde::Deserializer<'de>, +{ + let raw = HashMap::<String, HashMap<String, String>>::deserialize(deserializer)?; + normalize_backends(raw).map_err(D::Error::custom) +} + +fn normalize_backends( + raw: HashMap<String, HashMap<String, String>>, +) -> Result<HashMap<String, HashMap<String, String>>, String> { + let mut out = HashMap::with_capacity(raw.len()); + for (k, v) in raw { + let key = k.to_uppercase(); + if out.contains_key(&key) { + return Err(format!("duplicate backend after normalization: {key}")); + } + out.insert(key, v); + } + Ok(out) +} @@ impl NixlConfig { pub fn new(backends: HashMap<String, HashMap<String, String>>) -> Self { - Self { backends } + let backends = normalize_backends(backends) + .unwrap_or_else(|_| default_backends()); // or choose to panic/return Result + Self { backends } } @@ pub fn from_nixl_backend_config(config: NixlBackendConfig) -> Self { - let backends: HashMap<String, HashMap<String, String>> = config + let backends: HashMap<String, HashMap<String, String>> = config .iter() - .map(|(backend, params)| (backend.to_string(), params.clone())) + .map(|(backend, params)| (backend.to_string().to_uppercase(), params.clone())) .collect();If you don’t want
new()to silently “fix” invalid input, consider changingnew()to returnResult<Self, _>instead of falling back.lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/dynamo.py-92-127 (1)
92-127: Don’t swallow all exceptions; also prefergetattr(..., default)overhasattrfor output fields.
- The current
except Exceptionhides invariants drifting between vLLM and Rust state.hasattr(..., "finished_req_ids")is brittle across vLLM versions; usegetattrand validate type.- if self._rust_scheduler is not None and hasattr( - scheduler_output, "finished_req_ids" - ): + finished_req_ids = getattr(scheduler_output, "finished_req_ids", None) + if self._rust_scheduler is not None and finished_req_ids is not None: try: - finished_ids = list(scheduler_output.finished_req_ids) + finished_ids = list(finished_req_ids) if finished_ids: self._rust_scheduler.remove_finished_requests(finished_ids) - print( - f"DynamoScheduler: Removed {len(finished_ids)} finished requests from Rust scheduler" - ) - except Exception as e: - print( - f"DynamoScheduler: Error removing finished requests from Rust scheduler: {e}" - ) + logger.debug("Removed %d finished requests from Rust scheduler", len(finished_ids)) + except (TypeError, AttributeError): + logger.exception("Invalid finished request ids from SchedulerOutput; disabling Rust path") + self._rust_scheduler = None </blockquote></details> <details> <summary>lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/dynamo.py-260-268 (1)</summary><blockquote> `260-268`: **`get_kv_connector()` should delegate to the wrapped scheduler when present.** Returning `None` unconditionally breaks the delegation pattern used throughout this wrapper class and prevents kv-transfer integration from accessing the underlying scheduler's connector. All other scheduler methods delegate directly to `self._scheduler`; this method should do the same. ```diff # new in vllm v0.11 def get_kv_connector(self) -> Optional[KVConnectorBase_V1]: - return None + # Delegate if the underlying Scheduler exposes a connector. + return getattr(self._scheduler, "get_kv_connector", lambda: None)()lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/dynamo.py-141-181 (1)
141-181: Implement RustSchedulerState before addressing scheduler integration concerns.
RustSchedulerStatedoes not currently exist inkvbm._core.v2, so the import fails silently and this code path never executes. Before the Rust scheduler integration becomes functional, ensure:
- Request types are validated before passing to Rust (especially
request_id, which should handle both str and numeric types consistently with howfinish_requests()validatesrequest_idsat line 198-201)- Error handling catches specific exceptions rather than broad
Exception- If validation fails or the Rust path is unavailable, gracefully fall back to vLLM scheduler without silently diverging state
lib/bindings/kvbm/src/block_manager.rs-4-16 (1)
4-16: Avoid panics fromget_current_tokio_handle()in Python-callable paths.
get_current_tokio_handle()uses anexpect("Tokio runtime not initialized!")(see lib/bindings/kvbm/src/dynamo/mod.rs:66-72). If any#[pymethods](e.g.,BlockManager::new,init_controller) can be invoked before runtime init, this will abort the process instead of raising a Python exception. Consider a fallible accessor (returningPyErr) on the bindings boundary.lib/bindings/kvbm/src/v2/connector/leader/request.rs-15-31 (1)
15-31: API design: make required parameter non-optional in signature.The
max_tokensparameter isOption<usize>in the signature but immediately fails withPyValueErrorifNone. This is confusing API design—the signature suggests the parameter is optional when it's actually required.Apply this diff to make the API contract clear:
- #[pyo3(signature = (request_id, tokens, lora_name=None, salt_hash=None, max_tokens=None))] + #[pyo3(signature = (request_id, tokens, max_tokens, lora_name=None, salt_hash=None))] pub fn new( request_id: String, tokens: Vec<usize>, + max_tokens: usize, lora_name: Option<String>, salt_hash: Option<String>, - max_tokens: Option<usize>, ) -> PyResult<Self> { - if max_tokens.is_none() { - return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>( - "max_tokens is required", - )); - } Ok(Self { - inner: Request::new(request_id, tokens, lora_name, salt_hash, max_tokens), + inner: Request::new(request_id, tokens, lora_name, salt_hash, Some(max_tokens)), }) }This makes the API contract explicit:
max_tokensis required, not optional.lib/kvbm/src/v2/distributed/leader/session/initiator.rs-531-622 (1)
531-622: Remote G3 matches are staged but never become pullable/returnable inFull(noBlocksReadyhandling / no block_id discovery).Today
remote_g3_blocksonly stores sequence hashes;pull_remote_blocks()only pullsremote_g2_blocks. So any blocks found only in remote G3 won’t make it intoconsolidate_blocks()unless there’s an unshown protocol step that converts them intoG2Resultswith block IDs.At minimum: either (a) wait for a responder message that includes the staged G2
block_idsfor the staged hashes, or (b) extendStageBlocks/BlocksReadyto return that mapping and updateremote_g2_blocks/remote_g2_hashesbefore pulling.Also applies to: 594-599
lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/worker.py-111-186 (1)
111-186:num_device_blocks = max(shape[0], shape[1])looks incorrect for the stated KV cache layouts.For both documented layouts,
shape[0]is2(K/V) andshape[1]isnum_blocks, somax()is redundant at best and risky if a different tensor shape slips in (could silently pick the wrong dimension).@@ - # Get first tensor to extract common properties + # Get first tensor to extract common properties first_tensor = tensors[0] shape = first_tensor.shape @@ - # Extract parameters + # Basic shape/device/dtype checks (fail fast; mis-registration is painful to debug) + if len(shape) != 5: + raise ValueError(f"Expected KV cache tensor rank=5, got shape={tuple(shape)}") + if shape[0] != 2: + raise ValueError(f"Expected KV cache shape[0]==2 (K/V), got shape={tuple(shape)}") + if not first_tensor.is_cuda: + raise ValueError("Expected KV cache tensors to be CUDA tensors") + + # Extract parameters # For NHD layout: [2 (K/V), num_blocks, block_size, num_heads, head_size] # For HND layout: [2 (K/V), num_blocks, num_heads, block_size, head_size] - num_device_blocks = max(shape[0], shape[1]) + num_device_blocks = int(shape[1]) page_size = self.vllm_config.cache_config.block_size dtype_width_bytes = self.kvbm_config.cache_dtype_bytes() + + if first_tensor.element_size() != dtype_width_bytes: + raise ValueError( + f"KV dtype mismatch: tensor.element_size()={first_tensor.element_size()} " + f"!= kvbm_config.cache_dtype_bytes()={dtype_width_bytes}" + )lib/kvbm/src/v2/distributed/leader/session/initiator.rs-208-349 (1)
208-349:process_search_responses()can deadlock if a remote never sendsG2Results(or sends onlySearchComplete).
pending_g2_responsesis a counter (not per-responder), andSearchCompletedoesn’t affect it—so the loop may never satisfy the break condition.A safer pattern is to track
pending_g2: HashSet<InstanceId>and remove on eitherG2ResultsorSearchComplete, plus consider a timeout.@@ - let mut pending_g2_responses = remote_leaders.len(); + let mut pending_g2_responses: HashSet<InstanceId> = + remote_leaders.iter().copied().collect(); @@ OnboardMessage::G2Results { @@ pending_acknowledgments.insert(responder); - pending_g2_responses -= 1; + pending_g2_responses.remove(&responder); } @@ OnboardMessage::SearchComplete { responder, .. } => { + pending_g2_responses.remove(&responder); @@ - if pending_g2_responses == 0 + if pending_g2_responses.is_empty() && pending_g3_responses.is_empty() && pending_acknowledgments.is_empty() && pending_search_complete.is_empty() { @@ OnboardMessage::Acknowledged { responder, .. } => { @@ - if pending_g2_responses == 0 + if pending_g2_responses.is_empty() && pending_g3_responses.is_empty() && pending_acknowledgments.is_empty() && pending_search_complete.is_empty() {Committable suggestion skipped: line range outside the PR's diff.
lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/worker.py-19-34 (1)
19-34: Move vLLM imports toTYPE_CHECKINGblock or defer them until runtime, matching the pattern inleader.py.Lines 22–25 have unconditional
vllm.distributed.kv_transfer.kv_connector.v1.baseandvllm.model_executor.models.utilsimports that will fail if vLLM is not installed. Theleader.pyfile in the same directory correctly gates these imports underTYPE_CHECKING. SinceKVConnectorHandshakeMetadatais used as a base class (line 43) andextract_layer_indexis called at runtime (line 128), either wrap the imports in try/except with a clear error message, or defer the runtime usage to conditional import blocks.lib/bindings/kvbm/tests/test_tensor_kernels.py-257-264 (1)
257-264:test_empty_batch_noop()likely shouldn’t passNonepositionally (prefer keywordbackend=Noneor omit).If
block_to_operational/operational_to_blockexpectbackendas a kwarg (as used elsewhere), passing a positionalNonecan break if the signature changes or is different than assumed.lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/recording.py-41-92 (1)
41-92: Defaultingenable_recording=True+ writing underPath.cwd()is risky for prod (data leak + surprising IO).Consider defaulting
enable_recording=False(or gating via env), and/or defaulting the path to a temp dir when enabled.lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/recording.py-93-130 (1)
93-130: Potential schedule/update mismatch:current_schedule_outputcan be overwritten/dropped if schedule() is called multiple times before update().If the underlying engine can call
schedule()repeatedly, consider queueing schedule outputs (FIFO) keyed by iteration/request ids instead of a single slot.lib/bindings/kvbm/src/v2/connector/leader/mod.rs-140-153 (1)
140-153:update_connector_outputwill throw if Python passesNonefor empty finished sets.If Python-side can be
None(or lists), consider acceptingOption<HashSet<String>>(orOption<&Bound<PyAny>>) and treatingNoneas empty.lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/recording.py-247-275 (1)
247-275: Recording file naming can collide (seconds resolution), writes aren't atomic, and metadata has hardcoded/incorrect values.
Filename collision risk (line 254): Using
strftime("%Y%m%d_%H%M%S")provides only second-level resolution. Two saves within the same second will overwrite the previous file. Use microseconds (%f) and/or include the iteration range in the filename.Non-atomic writes (lines 271-272): Direct file open/write without a temp file + rename pattern risks corruption if the process crashes mid-write. Write to a temporary file first, then atomically rename it.
Hardcoded metadata (lines 262-263):
vllm_version: "0.10.2"is hardcoded and outdated (actual version is 0.10.1.1). Replace withvllm.__version__(import at top of file).model: "gpt2"is completely hardcoded with no way to detect the actual model. This value should either be passed as a parameter toRecordingScheduler.__init__or extracted from the wrapped scheduler's config.lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/leader.py-218-257 (1)
218-257: Potential IndexError:_create_slot()assumesrequest.all_token_idsis non-empty
You dorequest.all_token_ids[0]unconditionally. If an empty sequence can occur (even briefly), this will blow up.Minimal guard:
- if isinstance(request.all_token_ids[0], (list, tuple)): + if not request.all_token_ids: + raise ValueError("request.all_token_ids is empty") + if isinstance(request.all_token_ids[0], (list, tuple)):lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/connector.py-65-112 (1)
65-112: Guard config + handle JSON serialization errors explicitly
- Replace
assertchecks (Line 75-77) withValueError(asserts can be skipped).json.dumps(extra_config)(Line 88) can fail for non-JSON-serializable values; that failure will currently bubble as a rawTypeErrorwithout context.- assert vllm_config.kv_transfer_config is not None - assert vllm_config.kv_transfer_config.engine_id is not None + if vllm_config.kv_transfer_config is None: + raise ValueError("vllm_config.kv_transfer_config must be set") + if vllm_config.kv_transfer_config.engine_id is None: + raise ValueError("vllm_config.kv_transfer_config.engine_id must be set") @@ - kvbm_override_config = json.dumps(extra_config) if extra_config else None + if extra_config: + try: + kvbm_override_config = json.dumps(extra_config) + except TypeError as e: + raise ValueError( + "kv_connector_extra_config must be JSON-serializable" + ) from e + else: + kvbm_override_config = NoneAlso applies to: 78-90
lib/kvbm/src/v2/distributed/leader/session/endpoint_session.rs-350-369 (1)
350-369: Validatelayout_handles/sequence_hasheslengths (silent block omission risk)
build_block_infos()(Line 351-369) will omit blocks iflayout_handles.get(i)isNone. That can yield incomplete state/messages and hard-to-debug partial transfers.Consider validating up-front in
create_endpoint_session()(Line 445-468) and failing fast:pub fn create_endpoint_session( @@ ) -> (EndpointSession, EndpointSessionHandle) { + let block_count = blocks.blocks().len(); + assert_eq!(layout_handles.len(), block_count, "layout_handles must match blocks"); + assert_eq!(sequence_hashes.len(), block_count, "sequence_hashes must match blocks"); let (cmd_tx, cmd_rx) = mpsc::channel(16);(If this is library-facing, returning
Result<(EndpointSession, EndpointSessionHandle)>would be nicer than asserts.)Also applies to: 442-468
lib/bindings/kvbm/src/v2/vllm/config.rs-271-288 (1)
271-288:device_id = ranklikely conflicts with current “device 0” limitationYou derive
device_idfrom rank (Line 274-276) and exposedevice_id()as rank (Line 337-341). Based on retrieved learnings about kvbm CUDA context being hardcoded to device 0 in Python/kernel bindings, this can lead to “device_id says 3 but everything runs on device 0”.Options:
- (short-term) set
device_id = 0(or validaterank == 0) when the Python/kernel path is enabled, with a clear error.- (long-term) implement per-device CUDA context management and keep this mapping.
Based on learnings, please ensure these are aligned before enabling multi-GPU in v2 Python.
Also applies to: 337-341
lib/bindings/kvbm/python/kvbm/v2/vllm/connectors/connector.py-42-82 (1)
42-82: Replace runtimeasserts with explicit exceptions (asserts are optimized away)
assert vllm_config.kv_transfer_config is not None/assert ...engine_id is not None(Line 64-66) andassert isinstance(metadata, bytes)(Line 45-47) should be hard failures even underpython -O.class DynamoSchedulerConnectorMetadata(KVConnectorMetadata): @@ def __init__(self, metadata: bytes): - assert isinstance(metadata, bytes) + if not isinstance(metadata, (bytes, bytearray, memoryview)): + raise TypeError("connector metadata must be bytes-like") self.metadata = metadata class DynamoConnector(KVConnectorBase_V1): @@ super().__init__(vllm_config=vllm_config, role=role) - - assert vllm_config.kv_transfer_config is not None - assert vllm_config.kv_transfer_config.engine_id is not None + if vllm_config.kv_transfer_config is None: + raise ValueError("vllm_config.kv_transfer_config must be set") + if vllm_config.kv_transfer_config.engine_id is None: + raise ValueError("vllm_config.kv_transfer_config.engine_id must be set")
🟡 Minor comments (18)
lib/identity/Cargo.toml-23-23 (1)
23-23: Addserde_bytesto workspace dependencies for consistency.
serde_bytesis pinned to version0.11while all other dependencies useworkspace = true. Addserde_bytes = { version = "0.11" }to[workspace.dependencies]in the rootCargo.tomland update line 23 to useserde_bytes = { workspace = true }.lib/bindings/kvbm/python/kvbm/v2/vllm/connectors/leader.py-64-82 (1)
64-82: Stub return value should match the doc contract (Nonevs0).
Doc says “None if evaluating”; stub currently returns0, which looks like a real “0 matched tokens” signal.- return (0, False) + return (None, False)lib/bindings/kvbm/conftest.py-17-24 (1)
17-24: Avoid importingvllmat collection-time (side effects + slower), and drop the unusednoqa.
Ruff is right here: thenoqais currently unused, and importingvllmcan have import-time side effects during test discovery. Prefer a spec check.+import importlib.util + # Check if vllm is available (for v2 vllm integration) -try: - import vllm # noqa: F401 - - VLLM_AVAILABLE = True -except ImportError: - VLLM_AVAILABLE = False +VLLM_AVAILABLE = importlib.util.find_spec("vllm") is not Nonelib/bindings/kvbm/python/kvbm/v2/vllm/connectors/leader.py-38-63 (1)
38-63: Replaceloggingmodule; narrow broadexcept Exceptionduring initialization.Using
Exceptioncan hide real programming errors and silently force stub mode. Instead:
- Add module-level logger:
logger = logging.getLogger(__name__)at the top.- Replace success-path
logger.info(...).- Split the broad
except (ImportError, Exception)into two handlers: catchImportErrorfor expected missing feature, then catch unexpected exceptions separately and re-raise to avoid silent failures.+import logging +logger = logging.getLogger(__name__) @@ - try: + try: import kvbm @@ - print( - f"SchedulerConnectorLeader initialized with Rust backing, engine_id: {engine_id}" - ) + logger.info("SchedulerConnectorLeader initialized with Rust backing (engine_id=%s)", engine_id) else: raise ImportError("kvbm v2 feature not available") - except (ImportError, Exception) as e: - print( - f"SchedulerConnectorLeader initialized in stub mode (no Rust backing): {e}" - ) - print(f"Engine ID: {engine_id}") + except ImportError as e: + logger.info("SchedulerConnectorLeader stub mode (no Rust backing): %s", e) + except Exception: + logger.exception("SchedulerConnectorLeader failed to initialize (engine_id=%s)", engine_id) + raiseCommittable suggestion skipped: line range outside the PR's diff.
lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/dynamo.py-31-36 (1)
31-36: Avoid import-timeprint()side effects; useloggingand keep ImportError silent by default.Library modules importing this will emit stdout noise in normal environments (and can break some log-capture setups).
+import logging +logger = logging.getLogger(__name__) + try: from kvbm._core.v2 import RustSchedulerState except ImportError: RustSchedulerState = None - print("Warning: kvbm not available; forwarding all requests to vLLM scheduler") + logger.debug("kvbm not available; forwarding all requests to vLLM scheduler") </blockquote></details> <details> <summary>lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/dynamo.py-122-125 (1)</summary><blockquote> `122-125`: **Ruff BLE001 is right: replace `print` + blanket `except Exception` with targeted handling + logging.** Catching everything and continuing can desync Rust/vLLM state; better to log with context and disable the Rust path on first unexpected error. Also applies to: 176-178, 207-210 </blockquote></details> <details> <summary>lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/dynamo.py-182-214 (1)</summary><blockquote> `182-214`: **Catch specific exceptions instead of broad Exception; consider how Rust state removal in `update_from_output` integrates with vLLM's finish semantics.** The broad `except Exception` across Rust calls (add_request, finish_requests, update_from_output) masks errors with only print statements. Use specific exceptions (e.g., `AttributeError`, `RuntimeError`) and proper logging. The two-phase finish pattern—mark in `finish_requests()`, remove in `update_from_output()`—appears sound, but verify that removal timing aligns if vLLM's state machine interacts with Rust state during request lifecycle transitions. </blockquote></details> <details> <summary>lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/__init__.py-4-12 (1)</summary><blockquote> `4-12`: **Ruff RUF022: sort `__all__` (or silence intentionally)**. ```diff __all__ = [ - "DynamoScheduler", - "RecordingScheduler", "DynamoConnector", + "DynamoScheduler", + "RecordingScheduler", ]lib/bindings/kvbm/src/v2/mod.rs-34-38 (1)
34-38: Remove or document commented code.These commented-out class registrations lack context. Either remove them if they're not needed, or add a TODO comment explaining when they'll be uncommented.
Apply this diff to remove the dead code:
- // // vLLM specific classes - // // Leader connector classes for v2 vLLM integration - // m.add_class::<vllm::PyKvbmRequest>()?; - // m.add_class::<vllm::PyConnectorMetadataBuilder>()?; - // m.add_class::<vllm::PyRustSchedulerOutput>()?;Or add a TODO if they're planned:
- // // vLLM specific classes - // // Leader connector classes for v2 vLLM integration - // m.add_class::<vllm::PyKvbmRequest>()?; - // m.add_class::<vllm::PyConnectorMetadataBuilder>()?; - // m.add_class::<vllm::PyRustSchedulerOutput>()?; + // TODO: Add vLLM-specific classes when implementation is complete + // m.add_class::<vllm::PyKvbmRequest>()?; + // m.add_class::<vllm::PyConnectorMetadataBuilder>()?; + // m.add_class::<vllm::PyRustSchedulerOutput>()?;lib/bindings/kvbm/python/kvbm/v2/vllm/config.py-166-170 (1)
166-170: Inconsistent warning message and behavior.The warning says "defaulting to 2 bytes (FP16)" but immediately raises a
ValueErrorinstead of defaulting. Either remove the warning or actually default:else: - logger.warning( - f"Unknown cache dtype: {cache_dtype}, defaulting to 2 bytes (FP16)" - ) raise ValueError(f"Unknown cache dtype: {cache_dtype}")Or if defaulting is preferred:
else: logger.warning( f"Unknown cache dtype: {cache_dtype}, defaulting to 2 bytes (FP16)" ) return 2lib/kvbm-config/src/nova.rs-157-179 (1)
157-179: IPv6 fallback logic doesn't work as intended.The current logic checks IPv4 then IPv6 within the same
ifaddrentry. If an interface has multiple addresses (common: one IPv4, one IPv6 as separate entries), the IPv6 check on lines 172-174 only runs if the same entry has no IPv4 address. Since most systems report IPv4 and IPv6 as separateifaddrentries, this fallback never triggers.Consider collecting addresses first, then selecting:
fn get_interface_ip(interface_name: &str) -> Result<IpAddr> { use nix::ifaddrs::getifaddrs; let addrs = getifaddrs().context("Failed to get interface addresses")?; + let mut ipv6_fallback: Option<IpAddr> = None; for ifaddr in addrs { if ifaddr.interface_name == interface_name && let Some(addr) = ifaddr.address { // Prefer IPv4 addresses if let Some(sockaddr) = addr.as_sockaddr_in() { return Ok(IpAddr::V4(sockaddr.ip())); } - // Fall back to IPv6 if no IPv4 - if let Some(sockaddr) = addr.as_sockaddr_in6() { - return Ok(IpAddr::V6(sockaddr.ip())); + // Record IPv6 for fallback + if ipv6_fallback.is_none() { + if let Some(sockaddr) = addr.as_sockaddr_in6() { + ipv6_fallback = Some(IpAddr::V6(sockaddr.ip())); + } } } } + if let Some(ip) = ipv6_fallback { + return Ok(ip); + } + bail!("No IP address found for interface: {}", interface_name) }lib/bindings/kvbm/python/kvbm/v2/vllm/connectors/worker.py-116-124 (1)
116-124: Empty try/except block with TODO - placeholder code left in.This block currently does nothing (
pass) and catches all exceptions. Either implement the TODO or remove the try/except entirely until the implementation is ready.# Try to initialize Rust worker if bindings are available - try: - # Create runtime and worker - # Note: This requires Nova to be set up properly. - # For now, we're just documenting the flow - actual initialization - # requires runtime environment setup. - pass # TODO: Wire up when KvbmRuntime initialization is ready - - except Exception as e: - print(f"Could not initialize Rust worker: {e}") + # TODO: Wire up when KvbmRuntime initialization is ready + # This requires Nova to be set up properly and runtime environment setup.lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/worker.py-207-236 (1)
207-236:save_kv_layer()assumes CUDA; guard to avoid crashing on CPU tensors.@@ - device = kv_layer.device + if not kv_layer.is_cuda: + raise ValueError("save_kv_layer expected a CUDA tensor") + device = kv_layer.device stream = torch.cuda.current_stream(device) stream_handle = stream.cuda_streamlib/kvbm-kernels/src/tensor_kernels.rs-47-60 (1)
47-60: Typo in docs (“Priortizes”)
Minor, but worth fixing since this is public-ish API surface.lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/leader.py-173-215 (1)
173-215: Handshake metadata type hint vs runtime check disagree
The signature says values areKVConnectorHandshakeMetadata, but you enforceNovaPeerMetadata. If the intent is “NovaPeerMetadata only”, update the type hints/docstring to match; otherwise accept the vLLM type and adapt/convert.lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/leader.py-108-120 (1)
108-120:update_state_after_alloc()doc says “no-op”, but it performs a Rust call
Either make it truly no-op (and assertnum_external_tokens == 0) or update the docstring.lib/bindings/kvbm/src/kernels.rs-175-188 (1)
175-188: Update docstring:PySequencedoes not support generatorsGenerators implement the iterator protocol, not the sequence protocol.
PySequencerequires__len__and__getitem__(or registration withcollections.abc.Sequence), which generators do not provide. The docstring should be updated to remove "generator" or the implementation should usePyIteratorfor iterator support.lib/kvbm/src/v2/distributed/leader/instance.rs-1329-1337 (1)
1329-1337: Error handling sends misleading success status.When the
InitiatorSessionfails, the code sendsOnboardingStatus::Complete { matched_blocks: 0 }. This could be misleading since a session with 0 matched blocks could be legitimate (no matches found), making it hard to distinguish from actual errors.Consider adding a
Failedvariant toOnboardingStatusor including an optional error field inComplete.
| from vllm.config import VllmConfig | ||
|
|
||
| # added to the api in vllm v0.11 | ||
| from vllm.distributed.kv_transfer.kv_connector.v1 import KVConnectorBase_V1 | ||
| from vllm.multimodal import MULTIMODAL_REGISTRY, MultiModalRegistry | ||
| from vllm.v1.core.kv_cache_manager import KVCacheConfig | ||
| from vllm.v1.core.sched.interface import SchedulerInterface | ||
| from vllm.v1.core.sched.output import SchedulerOutput | ||
| from vllm.v1.core.sched.scheduler import Scheduler | ||
| from vllm.v1.engine import EngineCoreOutputs | ||
| from vllm.v1.metrics.stats import SchedulerStats | ||
| from vllm.v1.outputs import DraftTokenIds, ModelRunnerOutput | ||
| from vllm.v1.request import Request, RequestStatus | ||
| from vllm.v1.structured_output import StructuredOutputManager | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🌐 Web query:
vLLM 0.12.0 vllm.v1.core.sched.interface SchedulerInterface import path
💡 Result:
Use:
from vllm.v1.core.sched.interface import SchedulerInterface
Docs: vLLM v0.12.0 — SchedulerInterface. [1][2]
Sources:
[1] vLLM docs — v0.12.0 API: vllm.v1.core.sched.interface.
[2] vLLM docs — Scheduler config referencing SchedulerInterface.
🌐 Web query:
vLLM 0.12.0 vllm.v1.core.sched.scheduler Scheduler class
💡 Result:
Summary of vllm.v1.core.sched.scheduler.Scheduler (vLLM ~0.12.0)
-
Purpose: default request scheduler used by vLLM to decide which request tokens (encoder + decoder) to compute each step, manage KV cache slots, preemption, and produce a SchedulerOutput for the engine. [1][2]
-
Key behaviors:
- Supports scheduling policies: "fcfs" (first-come-first-served) and "priority". [1]
- No separate "prefill"/"decode" phases — each request tracks num_computed_tokens and scheduler tries to catch up to num_tokens_with_spec (prompt+output+spec). [2]
- Handles encoder inputs specially via _try_schedule_encoder_inputs (decides which encoder inputs to schedule, adjusts num_new_tokens and encoder budget). If encoder inputs cannot be scheduled, it may reduce scheduled decoder tokens or preempt lower-priority requests. [2][3]
- Advances request.num_computed_tokens after scheduling and frees encoder inputs when appropriate. [3]
- Integrates with KV cache manager, KVConnector, structured output manager, and publishes KV events. [2][3]
- Configurable for async_scheduling and custom scheduler class via scheduler_cls in config. [1][4]
-
Important methods (names seen in source/docs):
- init(...) — sets up configs, cache/parallel/Lora, structured output manager, etc. [3]
- schedule() -> SchedulerOutput — main scheduling loop returning planned work, preemptions, finished IDs. [2]
- _try_schedule_encoder_inputs(...) -> (list[int], int, int) — decide encoder inputs to schedule and update budgets. [2]
- _update_after_schedule / update_from_output — advance computed tokens and apply post-schedule adjustments. [3]
Relevant docs / source references:
- vLLM scheduler config & reference (notes on policy, scheduler_cls): vLLM docs. [1]
- Scheduler implementation details and schedule() description: vLLM API docs / source. [2][3]
References
[1] vLLM scheduler config / API (docs).
[2] vllm.v1.core.sched.scheduler — schedule() and algorithm (docs/source).
[3] vllm.v1.core.sched.scheduler — init, _update_after_schedule (docs/source).
🌐 Web query:
vLLM 0.12.0 vllm.v1.request Request request_id type
💡 Result:
In vLLM (including v0.12.0), vllm.v1.request.Request.request_id is a string (str). [1]
Sources:
[1] vLLM API docs — vllm.v1.request (Request parameters include request_id: str).
🌐 Web query:
vLLM 0.12.0 Scheduler.finish_requests method signature
💡 Result:
Signature (vLLM Scheduler.finish_requests):
def finish_requests(request_ids: str | Iterable[str], finished_status: RequestStatus) -> None
Description: handles external finish/abort signals (e.g., API client disconnect). [1][2]
Sources:
[1] vLLM scheduler docs (scheduler.py).
[2] vLLM v0.11.2 scheduler docs (scheduler.py).
🌐 Web query:
vLLM 0.12.0 SchedulerOutput finished_req_ids field names
💡 Result:
In vLLM v0.12.0 the SchedulerOutput field is named finished_req_ids and is a set of request-id strings (finished_req_ids: set[str]) — it contains the request IDs that finished between the previous and current scheduling steps. See the SchedulerOutput dataclass and scheduler docs. [1][2]
Sources:
- SchedulerOutput (output dataclass) — finished_req_ids: set[str]. [1]
- Scheduler implementation describing finished_req_ids usage. [2]
- v0.12.0 release tag (version confirmation). [3]
References:
[1] vLLM SchedulerOutput docs. (docs.vllm.ai)
[2] vLLM scheduler implementation (finished_req_ids usage). (docs.vllm.ai)
[3] vLLM v0.12.0 release on GitHub. (github.com)
Citations:
- 1: https://docs.vllm.ai/en/stable/api/vllm/v1/core/sched/output/?utm_source=openai
- 2: https://docs.vllm.ai/en/latest/api/vllm/v1/core/sched/scheduler.html?utm_source=openai
- 3: https://github.com/vllm-project/vllm/releases?utm_source=openai
🌐 Web query:
vLLM 0.12.0 Scheduler.get_kv_connector method
💡 Result:
In vLLM (Scheduler), get_kv_connector has signature:
def get_kv_connector(self) -> KVConnectorBase_V1 | None:
return self.connector
It simply returns the Scheduler's KV connector (or None if no KV transfer is configured). [1][2]
Sources: vLLM scheduler API docs. [1] (docs.vllm.ai) [2] (docs.vllm.ai)
Citations:
- 1: https://docs.vllm.ai/api/vllm/v1/core/sched/scheduler.html?utm_source=openai
- 2: https://docs.vllm.ai/en/latest/api/vllm/v1/core/sched/scheduler.html?utm_source=openai
🌐 Web query:
vLLM 0.12.0 KVCacheConfig import path vllm.v1.core.kv_cache_manager
💡 Result:
Use KVCacheConfig from vllm.v1.kv_cache_interface. KVCacheManager (and kv_cache_manager.py) lives at vllm.v1.core.kv_cache_manager.
Example imports:
- from vllm.v1.kv_cache_interface import KVCacheConfig
- from vllm.v1.core.kv_cache_manager import KVCacheManager
Docs: KVCacheManager (kv_cache_manager.py). [1] KVCacheConfig dataclass (kv_cache_interface.py). [2]
The vLLM 0.12.0 import paths are mostly correct, but KVCacheConfig has the wrong import path.
The imports you listed verify correctly against vLLM 0.12.0:
vllm.v1.core.sched.interface.SchedulerInterface✓vllm.v1.core.sched.scheduler.Scheduler✓vllm.v1.core.sched.output.SchedulerOutputwith fieldfinished_req_ids: set[str]✓vllm.v1.request.Request.request_idisstrtype ✓Scheduler.finish_requests(request_ids: str | Iterable[str], finished_status: RequestStatus)exists ✓Scheduler.get_kv_connector() -> KVConnectorBase_V1 | Noneexists ✓
However, line 20 has an incorrect import:
from vllm.v1.core.kv_cache_manager import KVCacheConfig
Should be:
from vllm.v1.kv_cache_interface import KVCacheConfig
🤖 Prompt for AI Agents
In lib/bindings/kvbm/python/kvbm/v2/vllm/schedulers/dynamo.py around lines 16 to
30, the import for KVCacheConfig is incorrect; replace the current import from
vllm.v1.core.kv_cache_manager with the correct path vllm.v1.kv_cache_interface
so the module imports KVCacheConfig from the right location and the rest of the
vLLM v0.12.0-compatible imports remain unchanged.
| def _scheduler_output_to_dict(self, output) -> Dict[str, Any]: | ||
| """Convert SchedulerOutput to a dictionary.""" | ||
| try: | ||
| return { | ||
| "scheduled_new_reqs": [ | ||
| { | ||
| "req_id": req.req_id, | ||
| "prompt_token_ids": req.prompt_token_ids, | ||
| "block_ids": [list(blocks) for blocks in req.block_ids] | ||
| if req.block_ids | ||
| else [], | ||
| "num_computed_tokens": req.num_computed_tokens, | ||
| "mm_hashes": req.mm_hashes if hasattr(req, "mm_hashes") else [], | ||
| } | ||
| for req in output.scheduled_new_reqs | ||
| ], | ||
| "scheduled_cached_reqs": { | ||
| "req_ids": output.scheduled_cached_reqs.req_ids, | ||
| "resumed_from_preemption": output.scheduled_cached_reqs.resumed_from_preemption, | ||
| "new_token_ids": output.scheduled_cached_reqs.new_token_ids, | ||
| "new_block_ids": [ | ||
| [list(blocks) for blocks in block_ids] if block_ids else None | ||
| for block_ids in output.scheduled_cached_reqs.new_block_ids | ||
| ], | ||
| "num_computed_tokens": output.scheduled_cached_reqs.num_computed_tokens, | ||
| }, | ||
| "num_scheduled_tokens": dict(output.num_scheduled_tokens), | ||
| "total_num_scheduled_tokens": output.total_num_scheduled_tokens, | ||
| "scheduled_spec_decode_tokens": dict( | ||
| output.scheduled_spec_decode_tokens | ||
| ), | ||
| "scheduled_encoder_inputs": dict(output.scheduled_encoder_inputs), | ||
| "num_common_prefix_blocks": list(output.num_common_prefix_blocks), | ||
| "finished_req_ids": list(output.finished_req_ids), | ||
| "free_encoder_mm_hashes": list(output.free_encoder_mm_hashes), | ||
| } | ||
| except Exception as e: | ||
| print(f"Error converting SchedulerOutput: {e}") | ||
| return {} | ||
|
|
||
| def _model_runner_output_to_dict(self, output) -> Dict[str, Any]: | ||
| """Convert ModelRunnerOutput to a dictionary.""" | ||
| try: | ||
| result = { | ||
| "req_ids": output.req_ids, | ||
| "req_id_to_index": dict(output.req_id_to_index), | ||
| "sampled_token_ids": output.sampled_token_ids, | ||
| } | ||
|
|
||
| if output.logprobs: | ||
| result["logprobs"] = { | ||
| "logprob_token_ids": output.logprobs.logprob_token_ids, | ||
| "logprobs": output.logprobs.logprobs, | ||
| "sampled_token_ranks": output.logprobs.sampled_token_ranks, | ||
| } | ||
|
|
||
| if hasattr(output, "num_nans_in_logits") and output.num_nans_in_logits: | ||
| result["num_nans_in_logits"] = dict(output.num_nans_in_logits) | ||
|
|
||
| return result | ||
| except Exception as e: | ||
| print(f"Error converting ModelRunnerOutput: {e}") | ||
| return {} | ||
|
|
||
| def _engine_core_outputs_to_dict(self, outputs) -> Dict[str, Any]: | ||
| """Convert dict[int, EngineCoreOutputs] to a serializable format.""" | ||
| try: | ||
| # outputs is a dict mapping engine_idx -> EngineCoreOutputs | ||
| result = {} | ||
| for engine_idx, engine_outputs in outputs.items(): | ||
| # Each engine_outputs is an EngineCoreOutputs struct | ||
| engine_result = { | ||
| "engine_index": engine_outputs.engine_index, | ||
| "outputs": [ | ||
| { | ||
| "request_id": output.request_id, | ||
| "new_token_ids": output.new_token_ids, | ||
| "finish_reason": output.finish_reason.value | ||
| if output.finish_reason | ||
| else None, | ||
| "num_cached_tokens": output.num_cached_tokens, | ||
| "stop_reason": output.stop_reason | ||
| if hasattr(output, "stop_reason") | ||
| else None, | ||
| } | ||
| for output in engine_outputs.outputs | ||
| ], | ||
| "timestamp": engine_outputs.timestamp, | ||
| } | ||
|
|
||
| # Add optional fields if present | ||
| if engine_outputs.scheduler_stats: | ||
| engine_result["scheduler_stats"] = { | ||
| "num_running_reqs": engine_outputs.scheduler_stats.num_running_reqs, | ||
| "num_waiting_reqs": engine_outputs.scheduler_stats.num_waiting_reqs, | ||
| "num_preempted_reqs": getattr( | ||
| engine_outputs.scheduler_stats, "num_preempted_reqs", 0 | ||
| ), | ||
| } | ||
|
|
||
| if engine_outputs.finished_requests: | ||
| engine_result["finished_requests"] = list( | ||
| engine_outputs.finished_requests | ||
| ) | ||
|
|
||
| result[str(engine_idx)] = engine_result | ||
|
|
||
| return result | ||
| except Exception as e: | ||
| print(f"Error converting EngineCoreOutputs: {e}") | ||
| import traceback | ||
|
|
||
| traceback.print_exc() | ||
| return {} | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JSON serialization likely breaks: recorded dicts may contain tensors/arrays/bytes/enums (not JSON-serializable).
Right now fields like prompt_token_ids, sampled_token_ids, new_token_ids, and various nested outputs may be torch tensors or other complex types. Add a to_jsonable() helper (recursive; handles torch.Tensor -> .tolist(), numpy scalars/arrays, enums, sets, bytes) and apply it to all recorded payloads before asdict() / json.dump().
🧰 Tools
🪛 Ruff (0.14.8)
168-168: Do not catch blind exception: Exception
(BLE001)
191-191: Consider moving this statement to an else block
(TRY300)
192-192: Do not catch blind exception: Exception
(BLE001)
239-239: Consider moving this statement to an else block
(TRY300)
240-240: Do not catch blind exception: Exception
(BLE001)
| static CUDA_CONTEXT: OnceCell<Arc<CudaContext>> = OnceCell::new(); | ||
|
|
||
| // TODO: determine the right way to get the CUDA context for the python bindings | ||
| // this is currently disabled, but we'll migrate this to the bindings crate | ||
| fn get_context() -> PyResult<Arc<CudaContext>> { | ||
| let ctx = CUDA_CONTEXT.get_or_try_init(|| { | ||
| CudaContext::new(0).map_err(|err| { | ||
| PyRuntimeError::new_err(format!("Failed to create CUDA context: {:?}", err)) | ||
| }) | ||
| })?; | ||
| Ok(ctx.clone()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# Check file location and purpose
cd /tmp/repo-root 2>/dev/null || cd / 2>/dev/null
pwd
git ls-files | grep -E "lib/bindings/kvbm/src/kernels.rs" | head -5Repository: ai-dynamo/dynamo
Length of output: 147
🏁 Script executed:
# Read the file to understand context
cat -n lib/bindings/kvbm/src/kernels.rs | head -50Repository: ai-dynamo/dynamo
Length of output: 2177
🏁 Script executed:
# Check the tensor_info function and its return type
rg -A 20 "fn tensor_info" lib/bindings/kvbm/src/kernels.rsRepository: ai-dynamo/dynamo
Length of output: 761
🏁 Script executed:
# Check the actual function signatures that use get_context
rg -B 5 -A 15 "let ctx = get_context" lib/bindings/kvbm/src/kernels.rsRepository: ai-dynamo/dynamo
Length of output: 2422
🏁 Script executed:
# Check if there are any device_index validations or multi-GPU handling
rg "device_index|device.index" lib/bindings/kvbm/src/kernels.rsRepository: ai-dynamo/dynamo
Length of output: 1005
🏁 Script executed:
# Check the broader context - is this marked as test code or temporary?
head -20 lib/bindings/kvbm/src/kernels.rsRepository: ai-dynamo/dynamo
Length of output: 700
🏁 Script executed:
# Check if there's module-level documentation
rg -B 10 "mod kernels|pub mod kernels" lib/bindings/kvbm/src/lib.rs 2>/dev/null | head -20Repository: ai-dynamo/dynamo
Length of output: 370
🏁 Script executed:
# Check all validation of device_index in this file
rg -B 3 -A 3 "device_index != " lib/bindings/kvbm/src/kernels.rsRepository: ai-dynamo/dynamo
Length of output: 1712
🏁 Script executed:
# Check the full tensor_info function to see what it does with device_index
rg -A 30 "fn tensor_info" lib/bindings/kvbm/src/kernels.rsRepository: ai-dynamo/dynamo
Length of output: 1186
🏁 Script executed:
# Check if there's any enforcement that would prevent non-device-0 tensors from being used
rg "device_index.*!= 0|device.*!= 0" lib/bindings/kvbm/src/kernels.rsRepository: ai-dynamo/dynamo
Length of output: 42
🏁 Script executed:
# Check if these are actually exposed as pyfunctions or if they're gated/disabled
rg -B 5 "#\[pyfunction\]|#\[pyo3\]" lib/bindings/kvbm/src/kernels.rs | head -40Repository: ai-dynamo/dynamo
Length of output: 909
🏁 Script executed:
# Check the Cargo.toml to understand the kernels feature
rg -A 10 "\[features\]" lib/bindings/kvbm/Cargo.tomlRepository: ai-dynamo/dynamo
Length of output: 520
🏁 Script executed:
# Check if module initialization gates the functions
rg -A 20 "#\[pymodule\]" lib/bindings/kvbm/src/lib.rsRepository: ai-dynamo/dynamo
Length of output: 596
🏁 Script executed:
# Check if there's any conditional compilation around the kernels module
rg -B 2 -A 2 "mod kernels" lib/bindings/kvbm/src/lib.rsRepository: ai-dynamo/dynamo
Length of output: 125
🏁 Script executed:
# Double-check: does the code actually prevent non-device-0 usage, or is it truly unsafe?
rg -B 10 -A 10 "block_to_universal|fn block_to_universal" lib/bindings/kvbm/src/kernels.rs | head -50Repository: ai-dynamo/dynamo
Length of output: 1668
🏁 Script executed:
# Look at one full function to trace the flow: block_to_universal
sed -n '65,120p' lib/bindings/kvbm/src/kernels.rsRepository: ai-dynamo/dynamo
Length of output: 1947
CUDA context hardcoded to device 0 creates undefined behavior when tensors are on other GPUs
The code validates that all input tensors are on the same device_index (allowing any CUDA device), but get_context() always creates and binds a device-0 context. If a caller passes tensors on GPU1+, the code binds GPU0 and passes GPU1 pointers to GPU0 kernels—leading to crashes or silent corruption.
Either enforce device_index == 0 with a clear error message, or implement per-device context management using a map of OnceCell<Arc<CudaContext>> keyed by device_index. Suggested minimal fix:
-fn get_context() -> PyResult<Arc<CudaContext>> {
+fn get_context(device_index: i64) -> PyResult<Arc<CudaContext>> {
+ if device_index != 0 {
+ return Err(PyValueError::new_err(format!(
+ "kvbm kernels currently require CUDA device 0, but got device {device_index}"
+ )));
+ }
let ctx = CUDA_CONTEXT.get_or_try_init(|| {
CudaContext::new(0).map_err(|err| {
PyRuntimeError::new_err(format!("Failed to create CUDA context: {:?}", err))
})
})?;
Ok(ctx.clone())
}Then call it with the validated device: let ctx = get_context(base_info.device_index)?; in each entrypoint.
Also applies to: 65–113, 225–507, 517–816
🤖 Prompt for AI Agents
In lib/bindings/kvbm/src/kernels.rs around lines 22–33 (and similarly for other
entry points noted), the CUDA context is hardcoded to device 0; replace this
with per-device context management by changing get_context() to accept a
device_index argument and returning an Arc<CudaContext> for that device.
Implement a static map (e.g. static MUTEX-protected HashMap<i32,
Arc<CudaContext>> or a concurrent map) that lazily creates and stores a
CudaContext per device_index on first request (initializing via
CudaContext::new(device_index) and wrapping in Arc), then clone and return the
Arc for callers. Update all callers (e.g. entrypoints referenced in the comment)
to pass the validated base_info.device_index into get_context(device_index) and
ensure device_index is validated (non-negative) before calling.
| def test_non_cuda_tensor_error(): | ||
| """ | ||
| CPU tensors should be rejected up-front with a helpful message. | ||
| """ | ||
| device = torch.device("cpu") | ||
| universal = torch.randn(1, 1, 1, 2, 4, device=device) | ||
| blocks = _make_blocks(universal.cuda(), "NHD") | ||
|
|
||
| with pytest.raises(ValueError): | ||
| ctk.block_to_universal([blocks], [universal], "NHD") | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
test_non_cuda_tensor_error() will fail on machines without CUDA (uses .cuda() without a guard).
Add if not torch.cuda.is_available(): pytest.skip(...) at the top (like the other tests).
🤖 Prompt for AI Agents
In lib/bindings/kvbm/tests/test_tensor_kernels.py around lines 245 to 255, the
test calls .cuda() unguarded and will fail on machines without CUDA; add a guard
at the top of test_non_cuda_tensor_error() that checks torch.cuda.is_available()
and calls pytest.skip("CUDA required for this test") if false, so the test is
skipped on non-CUDA hosts and avoids unguarded .cuda() calls.
| #[validate(range(min = 1, max = default_max_cpus()))] | ||
| pub worker_threads: Option<usize>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix invalid validation attribute using function call.
The #[validate(range(min = 1, max = default_max_cpus()))] attribute calls a function, which won't work with the validator derive macro. The max parameter must be a constant expression.
Apply this diff to use custom validation:
- #[validate(range(min = 1, max = default_max_cpus()))]
+ #[validate(range(min = 1))]
+ #[validate(custom(function = "validate_worker_threads"))]
pub worker_threads: Option<usize>,Then add a custom validation function:
fn validate_worker_threads(value: &Option<usize>) -> Result<(), validator::ValidationError> {
if let Some(threads) = value {
let max_cpus = default_max_cpus();
if *threads > max_cpus {
let mut err = validator::ValidationError::new("range");
err.message = Some(format!("worker_threads must be <= {}", max_cpus).into());
return Err(err);
}
}
Ok(())
}And update the struct derive to include custom validation:
-#[derive(Debug, Clone, Serialize, Deserialize, Validate)]
+#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TokioConfig {Then manually implement validation or use a custom validate method.
🤖 Prompt for AI Agents
In lib/kvbm-config/src/tokio.rs around lines 14-15, the validator attribute uses
a function call in the attribute (max = default_max_cpus()) which is invalid;
remove the #[validate(range(min = 1, max = default_max_cpus()))] attribute and
replace it with a custom validator by adding #[validate(custom =
"validate_worker_threads")] to the worker_threads field, implement the provided
fn validate_worker_threads(value: &Option<usize>) -> Result<(),
validator::ValidationError> (checking Some(threads) and comparing to
default_max_cpus(), returning a ValidationError with a clear message when >
max), ensure the struct derives or implements validator::Validate so this custom
function is invoked (or call the validate method manually where needed).
| /// Register the "kvbm.leader.onboard" handler. | ||
| /// | ||
| /// This handler is intentionally simple and fast: | ||
| /// - Deserializes the message | ||
| /// - If CreateSession and session doesn't exist, spawns responder | ||
| /// - Dispatches to session channel | ||
| /// - Returns immediately (< 1ms) | ||
| fn register_onboard_handler(&self) -> Result<()> { | ||
| let sessions = self.sessions.clone(); | ||
| let spawn_responder = self.spawn_responder.clone(); | ||
|
|
||
| let handler = NovaHandler::am_handler_async("kvbm.leader.onboard", move |ctx| { | ||
| let sessions = sessions.clone(); | ||
| let spawn_responder = spawn_responder.clone(); | ||
|
|
||
| async move { | ||
| // Fast path: just deserialize and dispatch | ||
| let message: OnboardMessage = serde_json::from_slice(&ctx.payload) | ||
| .map_err(|e| anyhow::anyhow!("failed to deserialize OnboardMessage: {e}"))?; | ||
|
|
||
| let session_id = message.session_id(); | ||
|
|
||
| eprintln!( | ||
| "[HANDLER] Received message: {:?} for session {}", | ||
| std::mem::discriminant(&message), | ||
| session_id | ||
| ); | ||
|
|
||
| // If this is a CreateSession and no session exists, spawn responder | ||
| if matches!(message, OnboardMessage::CreateSession { .. }) | ||
| && !sessions.contains_key(&session_id) | ||
| { | ||
| eprintln!("[HANDLER] Spawning new ResponderSession for {}", session_id); | ||
| if let Some(ref spawner) = spawn_responder { | ||
| spawner(message.clone()).ok(); // Best-effort spawn | ||
| } | ||
| } | ||
|
|
||
| // Dispatch to session channel (will create if needed by spawner above) | ||
| eprintln!("[HANDLER] Dispatching message to session {}", session_id); | ||
| dispatch_onboard_message(&sessions, message).await?; | ||
|
|
||
| Ok(()) | ||
| } | ||
| }) | ||
| .build(); | ||
|
|
||
| self.nova.register_handler(handler)?; | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Onboard handler has a race: dispatch may occur before the responder session/channel exists.
spawner(message.clone()).ok() doesn’t guarantee the session is present before dispatch_onboard_message(...). Ensure dispatch creates the session entry atomically (or insert a sender before spawning), otherwise CreateSession can intermittently fail under load.
| loop { | ||
| tokio::select! { | ||
| // Handle incoming SessionMessage | ||
| msg = self.endpoint.recv() => { | ||
| match msg { | ||
| Some(msg) => { | ||
| if !self.handle_message(msg).await? { | ||
| break; | ||
| } | ||
| } | ||
| None => { | ||
| debug!( | ||
| session_id = %self.endpoint.session_id(), | ||
| "Message channel closed" | ||
| ); | ||
| break; | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // Handle local commands | ||
| cmd = self.cmd_rx.recv() => { | ||
| match cmd { | ||
| Some(cmd) => { | ||
| if !self.handle_command(cmd).await? { | ||
| break; | ||
| } | ||
| } | ||
| None => { | ||
| // Command channel closed, continue processing messages | ||
| debug!( | ||
| session_id = %self.endpoint.session_id(), | ||
| "Command channel closed" | ||
| ); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: closed cmd_rx can cause a tight loop + contradicts handle “drop closes” semantics
When cmd_rx is closed, recv() returns None immediately, so the select branch is continuously ready and can spin, spamming logs and starving endpoint.recv().
A minimal fix is to treat None as a close signal and break:
cmd = self.cmd_rx.recv() => {
match cmd {
Some(cmd) => {
if !self.handle_command(cmd).await? {
break;
}
}
None => {
- // Command channel closed, continue processing messages
debug!(
session_id = %self.endpoint.session_id(),
"Command channel closed"
);
+ // Handle drop/close semantics: shut down gracefully
+ self.endpoint.set_phase(SessionPhase::Complete);
+ break;
}
}
}This also makes the doc on EndpointSessionHandle (Line 79-80) true: dropping the handle closes the sender and causes shutdown.
Also applies to: 165-172
| /// Wait for control commands (Hold/Prepare modes). | ||
| async fn await_commands(&mut self, mut rx: mpsc::Receiver<OnboardMessage>) -> Result<()> { | ||
| loop { | ||
| tokio::select! { | ||
| Some(cmd) = self.control_rx.recv() => { | ||
| match cmd { | ||
| SessionControl::Prepare => { | ||
| if self.mode == StagingMode::Hold { | ||
| self.prepare_mode().await?; | ||
| self.mode = StagingMode::Prepare; | ||
| } | ||
| } | ||
| SessionControl::Pull => { | ||
| if self.mode == StagingMode::Prepare { | ||
| self.pull_remote_blocks().await?; | ||
| self.consolidate_blocks().await; | ||
|
|
||
| // Send CloseSession to all remotes | ||
| let all_remotes: HashSet<InstanceId> = self | ||
| .remote_g2_blocks | ||
| .keys() | ||
| .chain(self.remote_g3_blocks.keys()) | ||
| .copied() | ||
| .collect(); | ||
|
|
||
| for remote in all_remotes { | ||
| self.transport.send(remote, OnboardMessage::CloseSession { | ||
| requester: self.instance_id, | ||
| session_id: self.session_id, | ||
| }).await?; | ||
| } | ||
|
|
||
| break; | ||
| } | ||
| } | ||
| SessionControl::Cancel => { | ||
| // Release all blocks and exit | ||
| let all_remotes: HashSet<InstanceId> = self | ||
| .remote_g2_blocks | ||
| .keys() | ||
| .chain(self.remote_g3_blocks.keys()) | ||
| .copied() | ||
| .collect(); | ||
|
|
||
| for remote in all_remotes { | ||
| self.transport.send(remote, OnboardMessage::CloseSession { | ||
| requester: self.instance_id, | ||
| session_id: self.session_id, | ||
| }).await?; | ||
| } | ||
| break; | ||
| } | ||
| SessionControl::Shutdown => { | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| // Also drain any remaining messages from responders | ||
| Some(_msg) = rx.recv() => { | ||
| // Process any late messages if needed | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
await_commands() can hang forever when channels close (patterns only match Some).
If control_rx is closed, the Some(cmd) = ... branch is disabled and the loop can become an infinite drain of responder messages (or an idle wait).
@@
async fn await_commands(&mut self, mut rx: mpsc::Receiver<OnboardMessage>) -> Result<()> {
loop {
tokio::select! {
- Some(cmd) = self.control_rx.recv() => {
+ cmd = self.control_rx.recv() => {
+ let Some(cmd) = cmd else { break; };
match cmd {
@@
}
}
// Also drain any remaining messages from responders
- Some(_msg) = rx.recv() => {
+ msg = rx.recv() => {
+ if msg.is_none() { break; }
// Process any late messages if needed
}
}
}
Ok(())
}🤖 Prompt for AI Agents
In lib/kvbm/src/v2/distributed/leader/session/initiator.rs around lines 792 to
857, await_commands() currently only matches Some(...) from control_rx and rx
which can cause the loop to hang if either channel is closed; change the select
arms to handle None from both receivers (e.g., assign to an Option and match
Some(cmd) => { ... } None => break) so the function breaks out and returns when
a channel is closed, ensuring you drain/handle remaining messages
deterministically and avoid an infinite loop.
| pub struct FindMatchesTask {} | ||
|
|
||
| impl FindMatchesResult { | ||
| pub fn status(&self) -> FindStatus { | ||
| todo!() | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compile blocker: FindMatchesResult is referenced but not defined
impl FindMatchesResult { ... } won’t compile unless FindMatchesResult exists (or this impl was meant for FindMatchesTask).
| impl Leader for InstanceLeader { | ||
| fn find_matches_with_options( | ||
| &self, | ||
| sequence_hashes: &[SequenceHash], | ||
| options: FindOptions, | ||
| ) -> Result<FindMatchesTask> { | ||
| // acquire permits from the onboarding budget | ||
| // here we could fail if the options tell us not to queue | ||
| // if we queue, we start a session | ||
|
|
||
| let g2_matches = self.g2_manager.match_blocks(sequence_hashes); | ||
|
|
||
| let remaining_sequence_hashes = &sequence_hashes[g2_matches.len()..]; | ||
|
|
||
| let g3_matches = if let Some(g3_manager) = &self.g3_manager { | ||
| g3_manager.match_blocks(remaining_sequence_hashes) | ||
| } else { | ||
| Vec::new() | ||
| }; | ||
|
|
||
| if g3_matches.is_empty() { | ||
| !todo!( | ||
| "return FindMatchesTask::Done - this arm does not have a future assocated with it" | ||
| ) | ||
| } | ||
|
|
||
| // todo: create an onboarding session for the g2 -> g3 transfers | ||
| // let the connector api determine | ||
|
|
||
| todo!() | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Compile blocker + likely logic issue in “remaining hashes” computation
if g3_matches.is_empty() { !todo!(...) }won’t compile (the!prefix is invalid here).&sequence_hashes[g2_matches.len()..]can panic and also seems semantically dubious unlessmatch_blocks()returns “matched prefix length”.
If this is scaffolding, consider making the function compile while still “TODO” at runtime:
- let remaining_sequence_hashes = &sequence_hashes[g2_matches.len()..];
+ let remaining_sequence_hashes = sequence_hashes; // TODO: compute unmatched hashes safely
let g3_matches = if let Some(g3_manager) = &self.g3_manager {
g3_manager.match_blocks(remaining_sequence_hashes)
} else {
Vec::new()
};
if g3_matches.is_empty() {
- !todo!(
- "return FindMatchesTask::Done - this arm does not have a future assocated with it"
- )
+ todo!("return FindMatchesTask::Done - this arm does not have a future associated with it")
}…and later replace with real “unmatched” derivation once match_blocks() semantics are wired.
🤖 Prompt for AI Agents
lib/kvbm/src/v2/distributed/leader/session/leader.rs lines 85-115: the code
currently uses an invalid prefix `!todo!(...)` and slices sequence_hashes
unsafely with `&sequence_hashes[g2_matches.len()..]` which can panic; fix by
computing the slice start safely (e.g. let start =
g2_matches.len().min(sequence_hashes.len()); let remaining_sequence_hashes =
&sequence_hashes[start..]; replace the `if g3_matches.is_empty()` arm to return
a valid Result (for now return Ok(FindMatchesTask::Done) or another appropriate
Done sentinel), and replace the final `todo!()` with an explicit runtime TODO
placeholder (e.g. unimplemented!("TODO: create onboarding session for g2->g3
transfers")) so the file compiles while keeping the runtime TODO.
…bs build Signed-off-by: Ryan Olson <[email protected]>
…bs build Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
…der; improving logging Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Signed-off-by: Ryan Olson <[email protected]>
Summary by CodeRabbit
New Features
Chores
✏️ Tip: You can customize this high-level summary in your review settings.