Conversation
There was a problem hiding this comment.
Code Review
This pull request implements incremental object retrieval by patch version, optimizing network transfer for patch-heavy objects like replay buffers. Key changes include updating the Arrow Flight protocol to support versioned patches, modifying the Rust server to stream partial updates, and enhancing the Python SDK to cache patches and materialized views. Review feedback highlights several performance optimizations in the Rust server, such as avoiding unnecessary data cloning and using a RwLock for concurrent reads. Additionally, a critical issue was identified in the Python SDK where using id() for caching materialized views could lead to memory leaks and cache misses when using bound methods.
| fn object_patches_to_flight_data_vec(patches: &[Object]) -> Result<Vec<FlightData>, FlameError> { | ||
| let rows = patches | ||
| .iter() | ||
| .map(|delta| (ObjectResponseKind::Patch, delta)) | ||
| .collect(); | ||
| object_rows_to_flight_data_vec(rows) | ||
| } |
There was a problem hiding this comment.
This helper function can be optimized to avoid unnecessary cloning of Object instances (and their underlying Vec<u8> data) by working with references. This is particularly important for large objects and high-concurrency scenarios.
| fn object_patches_to_flight_data_vec(patches: &[Object]) -> Result<Vec<FlightData>, FlameError> { | |
| let rows = patches | |
| .iter() | |
| .map(|delta| (ObjectResponseKind::Patch, delta)) | |
| .collect(); | |
| object_rows_to_flight_data_vec(rows) | |
| } | |
| fn object_patches_to_flight_data_vec(patches: Vec<&Object>) -> Result<Vec<FlightData>, FlameError> { | |
| let rows = patches | |
| .into_iter() | |
| .map(|delta| (ObjectResponseKind::Patch, delta)) | |
| .collect(); | |
| object_rows_to_flight_data_vec(rows) | |
| } |
| } else if client_version != 0 && object.version <= client_version { | ||
| let needed_patches: Vec<Object> = object | ||
| .deltas | ||
| .iter() | ||
| .filter(|delta| delta.version > client_version) | ||
| .cloned() | ||
| .collect(); |
There was a problem hiding this comment.
Cloning all patches before checking contiguity and encoding is inefficient. Using references avoids deep-copying the patch data.
} else if client_version != 0 && object.version <= client_version {
let needed_patches: Vec<&Object> = object
.deltas
.iter()
.filter(|delta| delta.version > client_version)
.collect();| client_version, | ||
| server_version | ||
| ); | ||
| let flight_data_vec = object_patches_to_flight_data_vec(&needed_patches)?; |
|
|
||
|
|
||
| def _materialize_object(obj: Object, deserializer: Optional[Deserializer] = None) -> Any: | ||
| materialized_key = None if deserializer is None else id(deserializer) |
There was a problem hiding this comment.
Using id(deserializer) as a cache key is problematic for bound methods (e.g., self._deserializer), as Python creates a new bound method object with a unique ID on every access. This results in a 0% cache hit rate and a memory leak as the materialized dictionary grows with every call. Using the deserializer object itself as the key is safe and efficient, as functions and methods are hashable and bound methods implement equality correctly.
| materialized_key = None if deserializer is None else id(deserializer) | |
| materialized_key = deserializer |
| pub fn current_version(&self) -> u64 { | ||
| self.deltas | ||
| .iter() | ||
| .map(|delta| delta.version) | ||
| .max() | ||
| .unwrap_or(self.version) | ||
| .max(self.version) | ||
| } |
There was a problem hiding this comment.
The .max(self.version) call is redundant because unwrap_or(self.version) already ensures the result is at least self.version if the iterator is empty. If the iterator is not empty, the maximum delta version should naturally be greater than or equal to the base version.
pub fn current_version(&self) -> u64 {
self.deltas
.iter()
.map(|delta| delta.version)
.max()
.unwrap_or(self.version)
}| .cache | ||
| .get_write_lock(&key_str) | ||
| .map_err(|e| Status::internal(format!("Lock error: {}", e)))?; | ||
| let _guard = write_lock.lock().await; |
There was a problem hiding this comment.
Using an exclusive Mutex lock for do_get operations serializes all read requests for the same key, which can become a significant performance bottleneck. Since cache.get() is thread-safe and returns a cloned Object, the returned data is consistent. If synchronization against concurrent writes is required to avoid reading partial storage states, consider using a RwLock to allow multiple concurrent readers.
| version: int | ||
| data: Any | ||
| patches: List[Patch] = field(default_factory=list) | ||
| materialized: Dict[Optional[int], Any] = field(default_factory=dict) |
There was a problem hiding this comment.
The type hint for the materialized dictionary should reflect that it will store callables as keys to support stable caching of materialized views.
| materialized: Dict[Optional[int], Any] = field(default_factory=dict) | |
| materialized: Dict[Optional[Callable], Any] = field(default_factory=dict) |
Summary
Validation
Note