Skip to content

Commit 2d78e44

Browse files
author
Harsh Dev Pathak
committed
chore: Improved and optimized in_memory impl
1 parent c2a140e commit 2d78e44

File tree

8 files changed

+115
-153
lines changed

8 files changed

+115
-153
lines changed

.github/workflows/build-and-test-in-memory.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ jobs:
2525
[server_config]
2626
host = "127.0.0.1"
2727
port = 8080
28-
store_type = "in_memory"
28+
store_type = "in-memory"
2929
EOF
3030
3131
- name: Build server

.github/workflows/ldk-node-integration.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ jobs:
102102
[server_config]
103103
host = "127.0.0.1"
104104
port = 8080
105-
store_type = "in_memory"
105+
store_type = "in-memory"
106106
EOF
107107
108108
- name: Build & Start VSS Server

rust/impls/src/in_memory_store.rs

Lines changed: 56 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::postgres_store::{
1+
use crate::{
22
VssDbRecord, LIST_KEY_VERSIONS_MAX_PAGE_SIZE, MAX_PUT_REQUEST_ITEM_COUNT,
33
};
44
use api::error::VssError;
@@ -10,41 +10,39 @@ use api::types::{
1010
use async_trait::async_trait;
1111
use bytes::Bytes;
1212
use chrono::prelude::Utc;
13-
use std::collections::HashMap;
13+
use std::collections::BTreeMap;
1414
use std::sync::Arc;
15-
use tokio::sync::RwLock;
15+
use tokio::sync::Mutex;
1616

1717
fn build_storage_key(user_token: &str, store_id: &str, key: &str) -> String {
1818
format!("{}#{}#{}", user_token, store_id, key)
1919
}
2020

2121
/// In-memory implementation of the VSS Store.
2222
pub struct InMemoryBackendImpl {
23-
store: Arc<RwLock<HashMap<String, VssDbRecord>>>,
23+
store: Arc<Mutex<BTreeMap<String, VssDbRecord>>>,
2424
}
2525

2626
impl InMemoryBackendImpl {
2727
/// Creates an in-memory instance.
2828
pub fn new() -> Self {
29-
Self { store: Arc::new(RwLock::new(HashMap::new())) }
29+
Self { store: Arc::new(Mutex::new(BTreeMap::new())) }
3030
}
3131

3232
fn get_current_global_version(
33-
&self, guard: &HashMap<String, VssDbRecord>, user_token: &str, store_id: &str,
33+
&self, guard: &BTreeMap<String, VssDbRecord>, user_token: &str, store_id: &str,
3434
) -> i64 {
3535
let global_key = build_storage_key(user_token, store_id, GLOBAL_VERSION_KEY);
3636
guard.get(&global_key).map(|r| r.version).unwrap_or(0)
3737
}
3838
}
3939

40-
// Validation functions - check if operations can succeed without modifying data
4140
fn validate_put_operation(
42-
store: &HashMap<String, VssDbRecord>, user_token: &str, store_id: &str, key_value: &KeyValue,
41+
store: &BTreeMap<String, VssDbRecord>, user_token: &str, store_id: &str, key_value: &KeyValue,
4342
) -> Result<(), VssError> {
4443
let key = build_storage_key(user_token, store_id, &key_value.key);
4544

4645
if key_value.version == -1 {
47-
// Non-conditional upsert always succeeds
4846
Ok(())
4947
} else if key_value.version == 0 {
5048
if store.contains_key(&key) {
@@ -75,12 +73,11 @@ fn validate_put_operation(
7573
}
7674

7775
fn validate_delete_operation(
78-
store: &HashMap<String, VssDbRecord>, user_token: &str, store_id: &str, key_value: &KeyValue,
76+
store: &BTreeMap<String, VssDbRecord>, user_token: &str, store_id: &str, key_value: &KeyValue,
7977
) -> Result<(), VssError> {
8078
let key = build_storage_key(user_token, store_id, &key_value.key);
8179

8280
if key_value.version == -1 {
83-
// Non-conditional delete always succeeds
8481
Ok(())
8582
} else {
8683
if let Some(existing) = store.get(&key) {
@@ -101,20 +98,25 @@ fn validate_delete_operation(
10198
}
10299
}
103100

104-
fn execute_non_conditional_upsert(
105-
store: &mut HashMap<String, VssDbRecord>, user_token: &str, store_id: &str, key_value: KeyValue,
101+
fn execute_put_object(
102+
store: &mut BTreeMap<String, VssDbRecord>, user_token: &str, store_id: &str,
103+
key_value: KeyValue,
106104
) {
107105
let key = build_storage_key(user_token, store_id, &key_value.key);
108106
let now = Utc::now();
109107

110108
match store.entry(key) {
111-
std::collections::hash_map::Entry::Occupied(mut occ) => {
109+
std::collections::btree_map::Entry::Occupied(mut occ) => {
112110
let existing = occ.get_mut();
113-
existing.version = INITIAL_RECORD_VERSION as i64;
111+
existing.version = if key_value.version == -1 {
112+
INITIAL_RECORD_VERSION as i64
113+
} else {
114+
existing.version.saturating_add(1)
115+
};
114116
existing.value = key_value.value.to_vec();
115117
existing.last_updated_at = now;
116118
},
117-
std::collections::hash_map::Entry::Vacant(vac) => {
119+
std::collections::btree_map::Entry::Vacant(vac) => {
118120
let new_record = VssDbRecord {
119121
user_token: user_token.to_string(),
120122
store_id: store_id.to_string(),
@@ -129,51 +131,8 @@ fn execute_non_conditional_upsert(
129131
}
130132
}
131133

132-
fn execute_conditional_insert(
133-
store: &mut HashMap<String, VssDbRecord>, user_token: &str, store_id: &str, key_value: KeyValue,
134-
) {
135-
let key = build_storage_key(user_token, store_id, &key_value.key);
136-
let now = Utc::now();
137-
138-
let new_record = VssDbRecord {
139-
user_token: user_token.to_string(),
140-
store_id: store_id.to_string(),
141-
key: key_value.key,
142-
value: key_value.value.to_vec(),
143-
version: INITIAL_RECORD_VERSION as i64,
144-
created_at: now,
145-
last_updated_at: now,
146-
};
147-
store.insert(key, new_record);
148-
}
149-
150-
fn execute_conditional_update(
151-
store: &mut HashMap<String, VssDbRecord>, user_token: &str, store_id: &str, key_value: KeyValue,
152-
) {
153-
let key = build_storage_key(user_token, store_id, &key_value.key);
154-
let now = Utc::now();
155-
156-
if let Some(existing) = store.get_mut(&key) {
157-
existing.version = key_value.version.saturating_add(1);
158-
existing.value = key_value.value.to_vec();
159-
existing.last_updated_at = now;
160-
}
161-
}
162-
163-
fn execute_put_object(
164-
store: &mut HashMap<String, VssDbRecord>, user_token: &str, store_id: &str, key_value: KeyValue,
165-
) {
166-
if key_value.version == -1 {
167-
execute_non_conditional_upsert(store, user_token, store_id, key_value);
168-
} else if key_value.version == 0 {
169-
execute_conditional_insert(store, user_token, store_id, key_value);
170-
} else {
171-
execute_conditional_update(store, user_token, store_id, key_value);
172-
}
173-
}
174-
175134
fn execute_delete_object(
176-
store: &mut HashMap<String, VssDbRecord>, user_token: &str, store_id: &str,
135+
store: &mut BTreeMap<String, VssDbRecord>, user_token: &str, store_id: &str,
177136
key_value: &KeyValue,
178137
) {
179138
let key = build_storage_key(user_token, store_id, &key_value.key);
@@ -186,7 +145,7 @@ impl KvStore for InMemoryBackendImpl {
186145
&self, user_token: String, request: GetObjectRequest,
187146
) -> Result<GetObjectResponse, VssError> {
188147
let key = build_storage_key(&user_token, &request.store_id, &request.key);
189-
let guard = self.store.read().await;
148+
let guard = self.store.lock().await;
190149

191150
if let Some(record) = guard.get(&key) {
192151
Ok(GetObjectResponse {
@@ -197,6 +156,7 @@ impl KvStore for InMemoryBackendImpl {
197156
}),
198157
})
199158
} else if request.key == GLOBAL_VERSION_KEY {
159+
// Non-zero global version is handled above; this is only for initial version 0.
200160
Ok(GetObjectResponse {
201161
value: Some(KeyValue {
202162
key: GLOBAL_VERSION_KEY.to_string(),
@@ -221,7 +181,7 @@ impl KvStore for InMemoryBackendImpl {
221181
}
222182

223183
let store_id = request.store_id.clone();
224-
let mut guard = self.store.write().await;
184+
let mut guard = self.store.lock().await;
225185

226186
if let Some(version) = request.global_version {
227187
validate_put_operation(
@@ -268,7 +228,7 @@ impl KvStore for InMemoryBackendImpl {
268228
})?;
269229

270230
let store_id = request.store_id.clone();
271-
let mut guard = self.store.write().await;
231+
let mut guard = self.store.lock().await;
272232

273233
execute_delete_object(&mut guard, &user_token, &store_id, &key_value);
274234

@@ -278,71 +238,53 @@ impl KvStore for InMemoryBackendImpl {
278238
async fn list_key_versions(
279239
&self, user_token: String, request: ListKeyVersionsRequest,
280240
) -> Result<ListKeyVersionsResponse, VssError> {
281-
let store_id = request.store_id;
282-
let key_prefix = request.key_prefix.unwrap_or("".to_string());
283-
let page_token_option = request.page_token;
241+
let store_id = request.store_id.clone();
242+
let key_prefix = request.key_prefix.clone().unwrap_or_default();
284243
let page_size = request.page_size.unwrap_or(i32::MAX);
285244
let limit = std::cmp::min(page_size, LIST_KEY_VERSIONS_MAX_PAGE_SIZE) as usize;
286245

287-
let (keys_with_versions, global_version) = {
288-
let guard = self.store.read().await;
289-
290-
let mut global_version: Option<i64> = None;
291-
if page_token_option.is_none() {
292-
global_version =
293-
Some(self.get_current_global_version(&guard, &user_token, &store_id));
294-
}
295-
296-
let storage_prefix = format!("{}#{}#", user_token, store_id);
297-
let mut temp: Vec<(String, i64)> = Vec::new();
298-
299-
for (storage_key, r) in guard.iter() {
300-
if !storage_key.starts_with(&storage_prefix) {
301-
continue;
302-
}
303-
let key = &storage_key[storage_prefix.len()..];
304-
if key == GLOBAL_VERSION_KEY {
305-
continue;
306-
}
307-
if !key_prefix.is_empty() && !key.starts_with(&key_prefix) {
308-
continue;
309-
}
310-
temp.push((key.to_string(), r.version));
311-
}
312-
313-
(temp, global_version)
246+
let offset: usize = if let Some(token) = &request.page_token {
247+
token.parse::<usize>().map_err(|_| {
248+
VssError::InvalidRequestError(format!("Invalid page token: {}", token))
249+
})?
250+
} else {
251+
0
314252
};
315253

316-
let mut keys_with_versions = keys_with_versions;
317-
keys_with_versions.sort_by(|a, b| a.0.cmp(&b.0));
254+
let guard = self.store.lock().await;
318255

319-
let start_idx = if page_token_option.is_none() {
320-
0
321-
} else if page_token_option.as_deref() == Some("") {
322-
keys_with_versions.len()
323-
} else {
324-
let token = page_token_option.as_deref().unwrap();
325-
keys_with_versions
326-
.iter()
327-
.position(|(k, _)| k.as_str() > token)
328-
.unwrap_or(keys_with_versions.len())
329-
};
256+
let mut global_version: Option<i64> = None;
257+
if offset == 0 {
258+
global_version = Some(self.get_current_global_version(&guard, &user_token, &store_id));
259+
}
260+
261+
let storage_prefix = build_storage_key(&user_token, &store_id, &key_prefix);
262+
let global_key = build_storage_key(&user_token, &store_id, GLOBAL_VERSION_KEY);
330263

331-
let page_items: Vec<KeyValue> = keys_with_versions
264+
let page_items: Vec<KeyValue> = guard
332265
.iter()
333-
.skip(start_idx)
266+
.filter(|(storage_key, _)| {
267+
storage_key.starts_with(&storage_prefix) && *storage_key != &global_key
268+
})
269+
.skip(offset)
334270
.take(limit)
335-
.map(|(key, version)| KeyValue {
336-
key: key.clone(),
337-
value: Bytes::new(),
338-
version: *version,
271+
.map(|(storage_key, record)| {
272+
let prefix_len = format!("{}#{}#", user_token, store_id).len();
273+
let key = &storage_key[prefix_len..];
274+
275+
KeyValue {
276+
key: key.to_string(),
277+
value: Bytes::new(),
278+
version: record.version,
279+
}
339280
})
340281
.collect();
341282

283+
let next_offset = offset + page_items.len();
342284
let next_page_token = if page_items.is_empty() {
343285
Some("".to_string())
344286
} else {
345-
page_items.last().map(|kv| kv.key.clone())
287+
Some(next_offset.to_string())
346288
};
347289

348290
Ok(ListKeyVersionsResponse { key_versions: page_items, next_page_token, global_version })
@@ -411,4 +353,4 @@ mod tests {
411353
let response = store.get(user_token.clone(), global_request).await.unwrap();
412354
assert_eq!(response.value.unwrap().version, 0, "Expected global_version=0");
413355
}
414-
}
356+
}

rust/impls/src/lib.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,46 @@
1111
#![deny(rustdoc::private_intra_doc_links)]
1212
#![deny(missing_docs)]
1313

14+
use chrono::Utc;
15+
1416
/// Contains in-memory backend implementation for VSS, for testing purposes only.
1517
pub mod in_memory_store;
16-
mod migrations;
1718
/// Contains [PostgreSQL](https://www.postgresql.org/) based backend implementation for VSS.
1819
pub mod postgres_store;
1920

21+
/// A record stored in the VSS database.
22+
struct VssDbRecord {
23+
/// Token uniquely identifying the user that owns this record.
24+
user_token: String,
25+
/// Identifier for the store this record belongs to.
26+
store_id: String,
27+
/// Key under which the value is stored.
28+
key: String,
29+
/// Stored value as raw bytes.
30+
value: Vec<u8>,
31+
/// Version number for optimistic concurrency control.
32+
version: i64,
33+
/// Timestamp when the record was created (UTC).
34+
created_at: chrono::DateTime<Utc>,
35+
/// Timestamp when the record was last updated (UTC).
36+
last_updated_at: chrono::DateTime<Utc>,
37+
}
38+
39+
/// The maximum number of key versions that can be returned in a single page.
40+
///
41+
/// This constant helps control memory and bandwidth usage for list operations,
42+
/// preventing overly large payloads. If the number of results exceeds this limit,
43+
/// the response will be paginated.
44+
const LIST_KEY_VERSIONS_MAX_PAGE_SIZE: i32 = 100;
45+
46+
/// The maximum number of items allowed in a single `PutObjectRequest`.
47+
///
48+
/// Setting an upper bound on the number of items helps ensure that
49+
/// each request stays within acceptable memory and performance limits.
50+
/// Exceeding this value will result in request rejection through [`VssError::InvalidRequestError`].
51+
const MAX_PUT_REQUEST_ITEM_COUNT: usize = 1000;
52+
53+
mod migrations;
54+
2055
#[macro_use]
2156
extern crate api;

0 commit comments

Comments
 (0)