Skip to content

Commit 06b6b98

Browse files
implemented files batching
1 parent cee76ce commit 06b6b98

File tree

4 files changed

+175
-25
lines changed

4 files changed

+175
-25
lines changed

crates/apollo_storage/src/body/mod.rs

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -473,34 +473,51 @@ fn write_transactions<'env>(
473473
events_table: &'env EventsTable<'env>,
474474
block_number: BlockNumber,
475475
) -> StorageResult<()> {
476-
for (index, ((tx, tx_output), tx_hash)) in block_body
477-
.transactions
476+
if block_body.transactions.is_empty() {
477+
return Ok(());
478+
}
479+
480+
// Batch append all transactions and transaction outputs at once
481+
let transactions_refs: Vec<&Transaction> = block_body.transactions.iter().collect();
482+
let tx_outputs_refs: Vec<&TransactionOutput> = block_body.transaction_outputs.iter().collect();
483+
484+
let tx_locations = file_handlers.append_transactions_batch(&transactions_refs);
485+
let tx_output_locations = file_handlers.append_transaction_outputs_batch(&tx_outputs_refs);
486+
487+
// Write metadata and events to MDBX tables
488+
for (index, ((tx_output, tx_hash), (tx_location, tx_output_location))) in block_body
489+
.transaction_outputs
478490
.iter()
479-
.zip(block_body.transaction_outputs.iter())
480491
.zip(block_body.transaction_hashes.iter())
492+
.zip(tx_locations.iter().zip(tx_output_locations.iter()))
481493
.enumerate()
482494
{
483495
let tx_offset_in_block = TransactionOffsetInBlock(index);
484496
let transaction_index = TransactionIndex(block_number, tx_offset_in_block);
485-
let tx_location = file_handlers.append_transaction(tx);
486-
let tx_output_location = file_handlers.append_transaction_output(tx_output);
497+
487498
write_events(tx_output, txn, events_table, transaction_index)?;
488499
transaction_hash_to_idx_table.insert(txn, tx_hash, &transaction_index)?;
489500
transaction_metadata_table.append(
490501
txn,
491502
&transaction_index,
492-
&TransactionMetadata { tx_location, tx_output_location, tx_hash: *tx_hash },
503+
&TransactionMetadata {
504+
tx_location: *tx_location,
505+
tx_output_location: *tx_output_location,
506+
tx_hash: *tx_hash
507+
},
493508
)?;
509+
}
494510

495-
// If this is the last iteration, update the file offset table.
496-
if index == block_body.transactions.len() - 1 {
497-
file_offset_table.upsert(txn, &OffsetKind::Transaction, &tx_location.next_offset())?;
498-
file_offset_table.upsert(
499-
txn,
500-
&OffsetKind::TransactionOutput,
501-
&tx_output_location.next_offset(),
502-
)?;
503-
}
511+
// Update file offsets with the last locations
512+
if let (Some(last_tx_location), Some(last_tx_output_location)) =
513+
(tx_locations.last(), tx_output_locations.last())
514+
{
515+
file_offset_table.upsert(txn, &OffsetKind::Transaction, &last_tx_location.next_offset())?;
516+
file_offset_table.upsert(
517+
txn,
518+
&OffsetKind::TransactionOutput,
519+
&last_tx_output_location.next_offset(),
520+
)?;
504521
}
505522

506523
Ok(())

crates/apollo_storage/src/class.rs

Lines changed: 50 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -207,11 +207,27 @@ fn write_classes<'env>(
207207
file_handlers: &FileHandlers<RW>,
208208
file_offset_table: &'env FileOffsetTable<'env>,
209209
) -> StorageResult<()> {
210-
for (class_hash, contract_class) in classes {
211-
let location = file_handlers.append_contract_class(contract_class);
212-
declared_classes_table.insert(txn, class_hash, &location)?;
213-
file_offset_table.upsert(txn, &OffsetKind::ContractClass, &location.next_offset())?;
210+
if classes.is_empty() {
211+
return Ok(());
214212
}
213+
214+
// Extract contract classes for batch append
215+
let contract_classes: Vec<&SierraContractClass> =
216+
classes.iter().map(|(_, class)| *class).collect();
217+
218+
// Batch append all contract classes at once
219+
let locations = file_handlers.append_contract_classes_batch(&contract_classes);
220+
221+
// Write to MDBX tables
222+
for ((class_hash, _), location) in classes.iter().zip(locations.iter()) {
223+
declared_classes_table.insert(txn, class_hash, location)?;
224+
}
225+
226+
// Update file offset with the last location
227+
if let Some(last_location) = locations.last() {
228+
file_offset_table.upsert(txn, &OffsetKind::ContractClass, &last_location.next_offset())?;
229+
}
230+
215231
Ok(())
216232
}
217233

@@ -223,18 +239,42 @@ fn write_deprecated_classes<'env>(
223239
file_handlers: &FileHandlers<RW>,
224240
file_offset_table: &'env FileOffsetTable<'env>,
225241
) -> StorageResult<()> {
242+
if deprecated_classes.is_empty() {
243+
return Ok(());
244+
}
245+
246+
// Filter out classes that already exist and collect the ones to write
247+
let mut classes_to_write = Vec::new();
248+
let mut hashes_to_write = Vec::new();
249+
226250
for (class_hash, deprecated_contract_class) in deprecated_classes {
227-
if deprecated_declared_classes_table.get(txn, class_hash)?.is_some() {
228-
continue;
251+
if deprecated_declared_classes_table.get(txn, class_hash)?.is_none() {
252+
classes_to_write.push(*deprecated_contract_class);
253+
hashes_to_write.push(*class_hash);
229254
}
230-
let location = file_handlers.append_deprecated_contract_class(deprecated_contract_class);
231-
let value = IndexedDeprecatedContractClass { block_number, location_in_file: location };
255+
}
256+
257+
if classes_to_write.is_empty() {
258+
return Ok(());
259+
}
260+
261+
// Batch append all deprecated contract classes at once
262+
let locations = file_handlers.append_deprecated_contract_classes_batch(&classes_to_write);
263+
264+
// Write to MDBX tables
265+
for (class_hash, location) in hashes_to_write.iter().zip(locations.iter()) {
266+
let value = IndexedDeprecatedContractClass { block_number, location_in_file: *location };
267+
deprecated_declared_classes_table.insert(txn, class_hash, &value)?;
268+
}
269+
270+
// Update file offset with the last location
271+
if let Some(last_location) = locations.last() {
232272
file_offset_table.upsert(
233273
txn,
234274
&OffsetKind::DeprecatedContractClass,
235-
&location.next_offset(),
275+
&last_location.next_offset(),
236276
)?;
237-
deprecated_declared_classes_table.insert(txn, class_hash, &value)?;
238277
}
278+
239279
Ok(())
240280
}

crates/apollo_storage/src/lib.rs

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -727,11 +727,24 @@ impl FileHandlers<RW> {
727727
self.clone().contract_class.append(contract_class)
728728
}
729729

730+
// Appends multiple contract classes in batch and returns their locations.
731+
fn append_contract_classes_batch(
732+
&self,
733+
contract_classes: &[&SierraContractClass],
734+
) -> Vec<LocationInFile> {
735+
self.clone().contract_class.append_batch(contract_classes)
736+
}
737+
730738
// Appends a CASM to the corresponding file and returns its location.
731739
fn append_casm(&self, casm: &CasmContractClass) -> LocationInFile {
732740
self.clone().casm.append(casm)
733741
}
734742

743+
// Appends multiple CASMs in batch and returns their locations.
744+
fn append_casms_batch(&self, casms: &[&CasmContractClass]) -> Vec<LocationInFile> {
745+
self.clone().casm.append_batch(casms)
746+
}
747+
735748
// Appends a deprecated contract class to the corresponding file and returns its location.
736749
fn append_deprecated_contract_class(
737750
&self,
@@ -740,16 +753,37 @@ impl FileHandlers<RW> {
740753
self.clone().deprecated_contract_class.append(deprecated_contract_class)
741754
}
742755

756+
// Appends multiple deprecated contract classes in batch and returns their locations.
757+
fn append_deprecated_contract_classes_batch(
758+
&self,
759+
deprecated_contract_classes: &[&DeprecatedContractClass],
760+
) -> Vec<LocationInFile> {
761+
self.clone().deprecated_contract_class.append_batch(deprecated_contract_classes)
762+
}
763+
743764
// Appends a thin transaction output to the corresponding file and returns its location.
744765
fn append_transaction_output(&self, transaction_output: &TransactionOutput) -> LocationInFile {
745766
self.clone().transaction_output.append(transaction_output)
746767
}
747768

769+
// Appends multiple transaction outputs in batch and returns their locations.
770+
fn append_transaction_outputs_batch(
771+
&self,
772+
transaction_outputs: &[&TransactionOutput],
773+
) -> Vec<LocationInFile> {
774+
self.clone().transaction_output.append_batch(transaction_outputs)
775+
}
776+
748777
// Appends a transaction to the corresponding file and returns its location.
749778
fn append_transaction(&self, transaction: &Transaction) -> LocationInFile {
750779
self.clone().transaction.append(transaction)
751780
}
752781

782+
// Appends multiple transactions in batch and returns their locations.
783+
fn append_transactions_batch(&self, transactions: &[&Transaction]) -> Vec<LocationInFile> {
784+
self.clone().transaction.append_batch(transactions)
785+
}
786+
753787
// TODO(dan): Consider 1. flushing only the relevant files, 2. flushing concurrently.
754788
#[latency_histogram("storage_file_handler_flush_latency_seconds", false)]
755789
fn flush(&self) {

crates/apollo_storage/src/mmap_file/mod.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,13 @@ pub(crate) trait Writer<V: ValueSerde> {
107107
/// Inserts an object to the file, returns the [`LocationInFile`] of the object.
108108
fn append(&mut self, val: &V::Value) -> LocationInFile;
109109

110+
/// Inserts multiple objects to the file in a batch, returns their [`LocationInFile`]s.
111+
/// This is more efficient than calling `append` multiple times as it:
112+
/// - Serializes all objects first
113+
/// - Writes them all at once
114+
/// - Performs a single flush operation for all objects
115+
fn append_batch(&mut self, vals: &[&V::Value]) -> Vec<LocationInFile>;
116+
110117
/// Flushes the mmap to the file.
111118
fn flush(&self);
112119
}
@@ -247,6 +254,58 @@ impl<V: ValueSerde + Debug> Writer<V> for FileHandler<V, RW> {
247254
location
248255
}
249256

257+
fn append_batch(&mut self, vals: &[&V::Value]) -> Vec<LocationInFile> {
258+
if vals.is_empty() {
259+
return Vec::new();
260+
}
261+
262+
trace!("Batch inserting {} objects", vals.len());
263+
264+
// First, serialize all values
265+
let mut serialized_vals = Vec::with_capacity(vals.len());
266+
let mut total_len = 0;
267+
for val in vals {
268+
let serialized = V::serialize(val).expect("Should be able to serialize");
269+
total_len += serialized.len();
270+
serialized_vals.push(serialized);
271+
}
272+
273+
// Then, write all at once and record locations
274+
let mut locations = Vec::with_capacity(vals.len());
275+
let start_offset;
276+
{
277+
let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
278+
start_offset = mmap_file.offset;
279+
let mut current_offset = start_offset;
280+
281+
trace!("Batch inserting at starting offset: {}, total size: {}", start_offset, total_len);
282+
283+
for serialized in &serialized_vals {
284+
let len = serialized.len();
285+
let mmap_slice = &mut mmap_file.mmap[current_offset..];
286+
mmap_slice[..len].copy_from_slice(serialized);
287+
locations.push(LocationInFile { offset: current_offset, len });
288+
current_offset += len;
289+
}
290+
291+
// Single flush for all objects
292+
mmap_file
293+
.mmap
294+
.flush_async_range(start_offset, total_len)
295+
.expect("Failed to asynchronously flush the mmap after batch insert");
296+
297+
mmap_file.offset = current_offset;
298+
mmap_file.should_flush = true;
299+
}
300+
301+
// Check if file needs to grow after all writes
302+
let final_offset = start_offset + total_len;
303+
self.grow_file_if_needed(final_offset);
304+
305+
trace!("Batch insert complete. Wrote {} objects totaling {} bytes", vals.len(), total_len);
306+
locations
307+
}
308+
250309
fn flush(&self) {
251310
let mut mmap_file = self.mmap_file.lock().expect("Lock should not be poisoned");
252311
if mmap_file.should_flush {

0 commit comments

Comments
 (0)