Skip to content

Commit 30a744f

Browse files
committed
Eliminate temporary Vec<EncodedSlot> during RecordBundle writing to WAL
1 parent 4024ae8 commit 30a744f

File tree

1 file changed

+24
-56
lines changed
  • rust/otap-dataflow/crates/quiver/src/wal

1 file changed

+24
-56
lines changed

rust/otap-dataflow/crates/quiver/src/wal/writer.rs

Lines changed: 24 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,14 @@ use arrow_array::RecordBatch;
119119
use arrow_ipc::writer::StreamWriter;
120120
use crc32fast::Hasher;
121121

122-
use crate::record_bundle::{PayloadRef, RecordBundle, SchemaFingerprint, SlotId};
122+
use crate::record_bundle::{PayloadRef, RecordBundle, SlotId};
123123

124124
use super::checkpoint_sidecar::CheckpointSidecar;
125125
use super::header::{WAL_HEADER_LEN, WalHeader};
126126
use super::reader::WalReader;
127127
use super::{
128-
ENTRY_HEADER_LEN, ENTRY_TYPE_RECORD_BUNDLE, MAX_ROTATION_TARGET_BYTES, SCHEMA_FINGERPRINT_LEN,
129-
SLOT_HEADER_LEN, WalConsumerCheckpoint, WalError, WalResult,
128+
ENTRY_HEADER_LEN, ENTRY_TYPE_RECORD_BUNDLE, MAX_ROTATION_TARGET_BYTES, WalConsumerCheckpoint,
129+
WalError, WalResult,
130130
};
131131

132132
// ---------------------------------------------------------------------------
@@ -377,36 +377,26 @@ impl WalWriter {
377377
let ingestion_time = bundle.ingestion_time();
378378
let ingestion_ts_nanos = system_time_to_nanos(ingestion_time)?;
379379

380-
let mut encoded_slots = Vec::new();
381-
let mut total_payload_bytes = 0usize;
382-
383380
self.active_file.payload_buffer.clear();
384381

385382
let sequence = self.next_sequence;
386383

387384
let mut slot_bitmap = 0u64;
388385

386+
// Single pass: validate slots and write directly to payload_buffer.
387+
// The buffer is reused across calls, so after a few appends it stabilizes
388+
// at a capacity that fits typical bundles without reallocation.
389389
for slot in &descriptor.slots {
390390
let slot_index = slot.id.0 as usize;
391391
if slot_index >= 64 {
392392
return Err(WalError::SlotOutOfRange(slot.id));
393393
}
394394
if let Some(payload) = bundle.payload(slot.id) {
395395
slot_bitmap |= 1u64 << slot_index;
396-
let encoded_slot = self.prepare_slot(slot.id, payload)?;
397-
let slot_size = encoded_slot.serialized_size();
398-
total_payload_bytes = total_payload_bytes
399-
.checked_add(slot_size)
400-
.ok_or(WalError::EntryTooLarge(slot_size))?;
401-
encoded_slots.push(encoded_slot);
396+
self.encode_slot_into_buffer(slot.id, payload)?;
402397
}
403398
}
404399

405-
self.active_file.payload_buffer.reserve(total_payload_bytes);
406-
for slot in encoded_slots {
407-
slot.write_into(&mut self.active_file.payload_buffer);
408-
}
409-
410400
let mut entry_header = [0u8; ENTRY_HEADER_LEN];
411401
let mut cursor = 0;
412402
entry_header[cursor] = ENTRY_TYPE_RECORD_BUNDLE;
@@ -470,20 +460,29 @@ impl WalWriter {
470460
.checkpoint_cursor(&mut self.active_file, checkpoint)
471461
}
472462

473-
fn prepare_slot(&mut self, slot_id: SlotId, payload: PayloadRef<'_>) -> WalResult<EncodedSlot> {
463+
/// Encodes a slot directly into `payload_buffer`, avoiding intermediate allocations.
464+
///
465+
/// Writes the slot header (id, fingerprint, row_count, payload_len) followed by
466+
/// the Arrow IPC-encoded payload bytes.
467+
fn encode_slot_into_buffer(&mut self, slot_id: SlotId, payload: PayloadRef<'_>) -> WalResult<()> {
474468
let row_count = u32::try_from(payload.batch.num_rows())
475469
.map_err(|_| WalError::RowCountOverflow(payload.batch.num_rows()))?;
476470
let payload_bytes = encode_record_batch(payload.batch)?;
477471
let payload_len = u32::try_from(payload_bytes.len())
478472
.map_err(|_| WalError::PayloadTooLarge(payload_bytes.len()))?;
479473

480-
Ok(EncodedSlot {
481-
slot_id_raw: slot_id.0,
482-
schema_fingerprint: payload.schema_fingerprint,
483-
row_count,
484-
payload_len,
485-
payload_bytes,
486-
})
474+
let buf = &mut self.active_file.payload_buffer;
475+
476+
// Write slot header
477+
buf.extend_from_slice(&slot_id.0.to_le_bytes());
478+
buf.extend_from_slice(&payload.schema_fingerprint);
479+
buf.extend_from_slice(&row_count.to_le_bytes());
480+
buf.extend_from_slice(&payload_len.to_le_bytes());
481+
482+
// Write payload
483+
buf.extend_from_slice(&payload_bytes);
484+
485+
Ok(())
487486
}
488487
}
489488

@@ -1101,14 +1100,6 @@ fn reopen_wal_file(path: &Path, segment_hash: [u8; 16]) -> WalResult<File> {
11011100
Ok(file)
11021101
}
11031102

1104-
struct EncodedSlot {
1105-
slot_id_raw: u16,
1106-
schema_fingerprint: SchemaFingerprint,
1107-
row_count: u32,
1108-
payload_len: u32,
1109-
payload_bytes: Vec<u8>,
1110-
}
1111-
11121103
/// Metadata describing an on-disk rotated WAL file. We retain enough information
11131104
/// to decide when the file can be deleted once readers have safely advanced
11141105
/// past its logical range.
@@ -1130,29 +1121,6 @@ impl RotatedWalFile {
11301121
}
11311122
}
11321123

1133-
impl EncodedSlot {
1134-
fn serialized_size(&self) -> usize {
1135-
SLOT_HEADER_LEN + self.payload_bytes.len()
1136-
}
1137-
1138-
fn write_into(self, buffer: &mut Vec<u8>) {
1139-
let total = self.serialized_size();
1140-
let start = buffer.len();
1141-
buffer.resize(start + total, 0);
1142-
1143-
let mut cursor = start;
1144-
buffer[cursor..cursor + 2].copy_from_slice(&self.slot_id_raw.to_le_bytes());
1145-
cursor += 2;
1146-
buffer[cursor..cursor + SCHEMA_FINGERPRINT_LEN].copy_from_slice(&self.schema_fingerprint);
1147-
cursor += SCHEMA_FINGERPRINT_LEN;
1148-
buffer[cursor..cursor + 4].copy_from_slice(&self.row_count.to_le_bytes());
1149-
cursor += 4;
1150-
buffer[cursor..cursor + 4].copy_from_slice(&self.payload_len.to_le_bytes());
1151-
cursor += 4;
1152-
buffer[cursor..cursor + self.payload_bytes.len()].copy_from_slice(&self.payload_bytes);
1153-
}
1154-
}
1155-
11561124
#[cfg(test)]
11571125
pub(super) mod test_support {
11581126
use std::cell::Cell;

0 commit comments

Comments
 (0)