1- use crate :: postgres_store:: {
2- VssDbRecord , LIST_KEY_VERSIONS_MAX_PAGE_SIZE , MAX_PUT_REQUEST_ITEM_COUNT ,
3- } ;
1+ use crate :: { VssDbRecord , LIST_KEY_VERSIONS_MAX_PAGE_SIZE , MAX_PUT_REQUEST_ITEM_COUNT } ;
42use api:: error:: VssError ;
53use api:: kv_store:: { KvStore , GLOBAL_VERSION_KEY , INITIAL_RECORD_VERSION } ;
64use api:: types:: {
@@ -10,41 +8,39 @@ use api::types::{
108use async_trait:: async_trait;
119use bytes:: Bytes ;
1210use chrono:: prelude:: Utc ;
13- use std:: collections:: HashMap ;
11+ use std:: collections:: BTreeMap ;
1412use std:: sync:: Arc ;
15- use tokio:: sync:: RwLock ;
13+ use tokio:: sync:: Mutex ;
1614
1715fn build_storage_key ( user_token : & str , store_id : & str , key : & str ) -> String {
1816 format ! ( "{}#{}#{}" , user_token, store_id, key)
1917}
2018
2119/// In-memory implementation of the VSS Store.
2220pub struct InMemoryBackendImpl {
23- store : Arc < RwLock < HashMap < String , VssDbRecord > > > ,
21+ store : Arc < Mutex < BTreeMap < String , VssDbRecord > > > ,
2422}
2523
2624impl InMemoryBackendImpl {
2725 /// Creates an in-memory instance.
2826 pub fn new ( ) -> Self {
29- Self { store : Arc :: new ( RwLock :: new ( HashMap :: new ( ) ) ) }
27+ Self { store : Arc :: new ( Mutex :: new ( BTreeMap :: new ( ) ) ) }
3028 }
3129
3230 fn get_current_global_version (
33- & self , guard : & HashMap < String , VssDbRecord > , user_token : & str , store_id : & str ,
31+ & self , guard : & BTreeMap < String , VssDbRecord > , user_token : & str , store_id : & str ,
3432 ) -> i64 {
3533 let global_key = build_storage_key ( user_token, store_id, GLOBAL_VERSION_KEY ) ;
3634 guard. get ( & global_key) . map ( |r| r. version ) . unwrap_or ( 0 )
3735 }
3836}
3937
40- // Validation functions - check if operations can succeed without modifying data
4138fn validate_put_operation (
42- store : & HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : & KeyValue ,
39+ store : & BTreeMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : & KeyValue ,
4340) -> Result < ( ) , VssError > {
4441 let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
4542
4643 if key_value. version == -1 {
47- // Non-conditional upsert always succeeds
4844 Ok ( ( ) )
4945 } else if key_value. version == 0 {
5046 if store. contains_key ( & key) {
@@ -75,12 +71,11 @@ fn validate_put_operation(
7571}
7672
7773fn validate_delete_operation (
78- store : & HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : & KeyValue ,
74+ store : & BTreeMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : & KeyValue ,
7975) -> Result < ( ) , VssError > {
8076 let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
8177
8278 if key_value. version == -1 {
83- // Non-conditional delete always succeeds
8479 Ok ( ( ) )
8580 } else {
8681 if let Some ( existing) = store. get ( & key) {
@@ -101,20 +96,25 @@ fn validate_delete_operation(
10196 }
10297}
10398
104- fn execute_non_conditional_upsert (
105- store : & mut HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : KeyValue ,
99+ fn execute_put_object (
100+ store : & mut BTreeMap < String , VssDbRecord > , user_token : & str , store_id : & str ,
101+ key_value : KeyValue ,
106102) {
107103 let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
108104 let now = Utc :: now ( ) ;
109105
110106 match store. entry ( key) {
111- std:: collections:: hash_map :: Entry :: Occupied ( mut occ) => {
107+ std:: collections:: btree_map :: Entry :: Occupied ( mut occ) => {
112108 let existing = occ. get_mut ( ) ;
113- existing. version = INITIAL_RECORD_VERSION as i64 ;
109+ existing. version = if key_value. version == -1 {
110+ INITIAL_RECORD_VERSION as i64
111+ } else {
112+ existing. version . saturating_add ( 1 )
113+ } ;
114114 existing. value = key_value. value . to_vec ( ) ;
115115 existing. last_updated_at = now;
116116 } ,
117- std:: collections:: hash_map :: Entry :: Vacant ( vac) => {
117+ std:: collections:: btree_map :: Entry :: Vacant ( vac) => {
118118 let new_record = VssDbRecord {
119119 user_token : user_token. to_string ( ) ,
120120 store_id : store_id. to_string ( ) ,
@@ -129,51 +129,8 @@ fn execute_non_conditional_upsert(
129129 }
130130}
131131
132- fn execute_conditional_insert (
133- store : & mut HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : KeyValue ,
134- ) {
135- let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
136- let now = Utc :: now ( ) ;
137-
138- let new_record = VssDbRecord {
139- user_token : user_token. to_string ( ) ,
140- store_id : store_id. to_string ( ) ,
141- key : key_value. key ,
142- value : key_value. value . to_vec ( ) ,
143- version : INITIAL_RECORD_VERSION as i64 ,
144- created_at : now,
145- last_updated_at : now,
146- } ;
147- store. insert ( key, new_record) ;
148- }
149-
150- fn execute_conditional_update (
151- store : & mut HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : KeyValue ,
152- ) {
153- let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
154- let now = Utc :: now ( ) ;
155-
156- if let Some ( existing) = store. get_mut ( & key) {
157- existing. version = key_value. version . saturating_add ( 1 ) ;
158- existing. value = key_value. value . to_vec ( ) ;
159- existing. last_updated_at = now;
160- }
161- }
162-
163- fn execute_put_object (
164- store : & mut HashMap < String , VssDbRecord > , user_token : & str , store_id : & str , key_value : KeyValue ,
165- ) {
166- if key_value. version == -1 {
167- execute_non_conditional_upsert ( store, user_token, store_id, key_value) ;
168- } else if key_value. version == 0 {
169- execute_conditional_insert ( store, user_token, store_id, key_value) ;
170- } else {
171- execute_conditional_update ( store, user_token, store_id, key_value) ;
172- }
173- }
174-
175132fn execute_delete_object (
176- store : & mut HashMap < String , VssDbRecord > , user_token : & str , store_id : & str ,
133+ store : & mut BTreeMap < String , VssDbRecord > , user_token : & str , store_id : & str ,
177134 key_value : & KeyValue ,
178135) {
179136 let key = build_storage_key ( user_token, store_id, & key_value. key ) ;
@@ -186,7 +143,7 @@ impl KvStore for InMemoryBackendImpl {
186143 & self , user_token : String , request : GetObjectRequest ,
187144 ) -> Result < GetObjectResponse , VssError > {
188145 let key = build_storage_key ( & user_token, & request. store_id , & request. key ) ;
189- let guard = self . store . read ( ) . await ;
146+ let guard = self . store . lock ( ) . await ;
190147
191148 if let Some ( record) = guard. get ( & key) {
192149 Ok ( GetObjectResponse {
@@ -197,6 +154,7 @@ impl KvStore for InMemoryBackendImpl {
197154 } ) ,
198155 } )
199156 } else if request. key == GLOBAL_VERSION_KEY {
157+ // Non-zero global version is handled above; this is only for initial version 0.
200158 Ok ( GetObjectResponse {
201159 value : Some ( KeyValue {
202160 key : GLOBAL_VERSION_KEY . to_string ( ) ,
@@ -221,7 +179,7 @@ impl KvStore for InMemoryBackendImpl {
221179 }
222180
223181 let store_id = request. store_id . clone ( ) ;
224- let mut guard = self . store . write ( ) . await ;
182+ let mut guard = self . store . lock ( ) . await ;
225183
226184 if let Some ( version) = request. global_version {
227185 validate_put_operation (
@@ -268,7 +226,7 @@ impl KvStore for InMemoryBackendImpl {
268226 } ) ?;
269227
270228 let store_id = request. store_id . clone ( ) ;
271- let mut guard = self . store . write ( ) . await ;
229+ let mut guard = self . store . lock ( ) . await ;
272230
273231 execute_delete_object ( & mut guard, & user_token, & store_id, & key_value) ;
274232
@@ -278,71 +236,49 @@ impl KvStore for InMemoryBackendImpl {
278236 async fn list_key_versions (
279237 & self , user_token : String , request : ListKeyVersionsRequest ,
280238 ) -> Result < ListKeyVersionsResponse , VssError > {
281- let store_id = request. store_id ;
282- let key_prefix = request. key_prefix . unwrap_or ( "" . to_string ( ) ) ;
283- let page_token_option = request. page_token ;
239+ let store_id = request. store_id . clone ( ) ;
240+ let key_prefix = request. key_prefix . clone ( ) . unwrap_or ( "" . to_string ( ) ) ;
284241 let page_size = request. page_size . unwrap_or ( i32:: MAX ) ;
285242 let limit = std:: cmp:: min ( page_size, LIST_KEY_VERSIONS_MAX_PAGE_SIZE ) as usize ;
286243
287- let ( keys_with_versions, global_version) = {
288- let guard = self . store . read ( ) . await ;
289-
290- let mut global_version: Option < i64 > = None ;
291- if page_token_option. is_none ( ) {
292- global_version =
293- Some ( self . get_current_global_version ( & guard, & user_token, & store_id) ) ;
294- }
295-
296- let storage_prefix = format ! ( "{}#{}#" , user_token, store_id) ;
297- let mut temp: Vec < ( String , i64 ) > = Vec :: new ( ) ;
298-
299- for ( storage_key, r) in guard. iter ( ) {
300- if !storage_key. starts_with ( & storage_prefix) {
301- continue ;
302- }
303- let key = & storage_key[ storage_prefix. len ( ) ..] ;
304- if key == GLOBAL_VERSION_KEY {
305- continue ;
306- }
307- if !key_prefix. is_empty ( ) && !key. starts_with ( & key_prefix) {
308- continue ;
309- }
310- temp. push ( ( key. to_string ( ) , r. version ) ) ;
311- }
312-
313- ( temp, global_version)
244+ let offset: usize = if let Some ( token) = & request. page_token {
245+ token. parse :: < usize > ( ) . map_err ( |_| {
246+ VssError :: InvalidRequestError ( format ! ( "Invalid page token: {}" , token) )
247+ } ) ?
248+ } else {
249+ 0
314250 } ;
315251
316- let mut keys_with_versions = keys_with_versions;
317- keys_with_versions. sort_by ( |a, b| a. 0 . cmp ( & b. 0 ) ) ;
252+ let guard = self . store . lock ( ) . await ;
318253
319- let start_idx = if page_token_option. is_none ( ) {
320- 0
321- } else if page_token_option. as_deref ( ) == Some ( "" ) {
322- keys_with_versions. len ( )
323- } else {
324- let token = page_token_option. as_deref ( ) . unwrap ( ) ;
325- keys_with_versions
326- . iter ( )
327- . position ( |( k, _) | k. as_str ( ) > token)
328- . unwrap_or ( keys_with_versions. len ( ) )
329- } ;
254+ let mut global_version: Option < i64 > = None ;
255+ if offset == 0 {
256+ global_version = Some ( self . get_current_global_version ( & guard, & user_token, & store_id) ) ;
257+ }
258+
259+ let storage_prefix = build_storage_key ( & user_token, & store_id, & key_prefix) ;
260+ let global_key = build_storage_key ( & user_token, & store_id, GLOBAL_VERSION_KEY ) ;
330261
331- let page_items: Vec < KeyValue > = keys_with_versions
262+ let page_items: Vec < KeyValue > = guard
332263 . iter ( )
333- . skip ( start_idx)
264+ . filter ( |( storage_key, _) | {
265+ storage_key. starts_with ( & storage_prefix) && * storage_key != & global_key
266+ } )
267+ . skip ( offset)
334268 . take ( limit)
335- . map ( |( key, version) | KeyValue {
336- key : key. clone ( ) ,
337- value : Bytes :: new ( ) ,
338- version : * version,
269+ . map ( |( storage_key, record) | {
270+ let prefix_len = format ! ( "{}#{}#" , user_token, store_id) . len ( ) ;
271+ let key = & storage_key[ prefix_len..] ;
272+
273+ KeyValue { key : key. to_string ( ) , value : Bytes :: new ( ) , version : record. version }
339274 } )
340275 . collect ( ) ;
341276
277+ let next_offset = offset + page_items. len ( ) ;
342278 let next_page_token = if page_items. is_empty ( ) {
343279 Some ( "" . to_string ( ) )
344280 } else {
345- page_items . last ( ) . map ( |kv| kv . key . clone ( ) )
281+ Some ( next_offset . to_string ( ) )
346282 } ;
347283
348284 Ok ( ListKeyVersionsResponse { key_versions : page_items, next_page_token, global_version } )
0 commit comments