Skip to content

Commit 4424606

Browse files
committed
Add additional doc comments; rename checkpoint API names for clarity.
1 parent 34d30e7 commit 4424606

File tree

6 files changed

+192
-50
lines changed

6 files changed

+192
-50
lines changed

rust/otap-dataflow/crates/quiver/src/wal/checkpoint_sidecar.rs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,22 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4-
// Provides read/write helpers for the `checkpoint.offset` sidecar that tracks the
5-
// global WAL consumer checkpoint (data bytes excluding headers) still required for crash
6-
// recovery. The sidecar lets new processes resume from a known safe offset
7-
// without rescanning the entire WAL. It carries a CRC so we can discard corrupted
8-
// metadata safely.
4+
//! Crash-safe checkpoint offset persistence.
5+
//!
6+
//! The checkpoint sidecar is a tiny file (`checkpoint.offset`) that tracks how
7+
//! much of the WAL has been durably consumed. It survives crashes so restarts
8+
//! can resume from the last known safe offset without rescanning the entire log.
9+
//!
10+
//! # Format (24 bytes)
11+
//!
12+
//! ```text
13+
//! ┌────────────┬─────────┬──────────┬─────────────────────┬──────────┐
14+
//! │ magic (8) │ ver (2) │ rsv (2) │ global_data_off (8) │ crc (4) │
15+
//! └────────────┴─────────┴──────────┴─────────────────────┴──────────┘
16+
//! ```
17+
//!
18+
//! Writes use atomic rename (`write_to` → `.tmp` → `rename`) plus parent
19+
//! directory fsync to ensure durability across power loss.
920
1021
use std::fs::OpenOptions;
1122
use std::io::{Read, Write};

rust/otap-dataflow/crates/quiver/src/wal/header.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,21 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4+
//! WAL file header encoding and validation.
5+
//!
6+
//! Every WAL file starts with a fixed 30-byte header:
7+
//!
8+
//! ```text
9+
//! ┌────────────┬─────────┬──────────┬──────────────────┐
10+
//! │ magic (10) │ ver (2) │ rsv (2) │ segment_hash (16)│
11+
//! └────────────┴─────────┴──────────┴──────────────────┘
12+
//! ```
13+
//!
14+
//! - **magic**: `b"QUIVER\0WAL"` identifies the file type
15+
//! - **version**: Format version (currently 1)
16+
//! - **reserved**: Zero padding for future use
17+
//! - **segment_hash**: MD5 of segment configuration; mismatches reject the file
18+
419
use std::io::{Read, Seek, SeekFrom, Write};
520

621
use super::{WAL_MAGIC, WalError};

rust/otap-dataflow/crates/quiver/src/wal/mod.rs

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,62 @@
11
// Copyright The OpenTelemetry Authors
22
// SPDX-License-Identifier: Apache-2.0
33

4+
//! Write-Ahead Log (WAL) for Quiver crash recovery.
5+
//!
6+
//! This module provides durable, append-only storage for Arrow record batches.
7+
//! On crash, the WAL replays uncommitted data to restore in-memory state.
8+
//!
9+
//! # Quick Start
10+
//!
11+
//! ```ignore
12+
//! // Writing
13+
//! let mut writer = WalWriter::open(options)?;
14+
//! let offset = writer.append_bundle(&bundle)?;
15+
//!
16+
//! // Reading (for replay)
17+
//! let mut reader = WalReader::open(&path)?;
18+
//! let mut cursor = WalConsumerCheckpoint::default();
19+
//! for entry in reader.iter_from(0)? {
20+
//! let bundle = entry?;
21+
//! // ... rebuild state from bundle ...
22+
//! cursor.increment(&bundle); // in-memory only
23+
//! }
24+
//!
25+
//! // Checkpointing (after downstream confirms durability)
26+
//! writer.checkpoint_cursor(&cursor)?; // persists + enables cleanup
27+
//! ```
28+
//!
29+
//! # Module Organization
30+
//!
31+
//! | File | Purpose |
32+
//! |-------------------------|--------------------------------------------------|
33+
//! | `writer.rs` | Append entries, rotate files, manage checkpoints |
34+
//! | `reader.rs` | Iterate entries, decode payloads, track progress |
35+
//! | `header.rs` | WAL file header format (magic, version, config) |
36+
//! | `checkpoint_sidecar.rs` | Crash-safe checkpoint offset persistence |
37+
//! | `tests.rs` | Integration tests and crash simulation |
38+
//!
39+
//! # On-Disk Layout
40+
//!
41+
//! ```text
42+
//! wal/
43+
//! ├── quiver.wal # Active WAL file (append target)
44+
//! ├── quiver.wal.1 # Rotated file (oldest)
45+
//! ├── quiver.wal.2 # Rotated file
46+
//! └── checkpoint.offset # Consumer progress (24 bytes, CRC-protected)
47+
//! ```
48+
//!
49+
//! # Key Concepts
50+
//!
51+
//! - **Entry**: One [`RecordBundle`] serialized with CRC32 integrity check
52+
//! - **Rotation**: When the active file exceeds `rotation_target_bytes`, it's
53+
//! renamed to `quiver.wal.N` and a fresh file starts
54+
//! - **Checkpoint**: Consumers call [`WalConsumerCheckpoint::increment()`] while
55+
//! iterating (in-memory), then [`WalWriter::checkpoint_cursor()`] to persist
56+
//! - **Purge**: Rotated files are deleted once fully covered by the checkpoint
57+
//!
58+
//! See [`writer`] module docs for detailed lifecycle documentation.
59+
460
use std::io;
561

662
use arrow_schema::ArrowError;
@@ -31,6 +87,9 @@ pub(crate) const SLOT_HEADER_LEN: usize = 2 + SCHEMA_FINGERPRINT_LEN + 4 + 4;
3187
pub(crate) type WalResult<T> = Result<T, WalError>;
3288

3389
/// Errors produced while reading or writing WAL data.
90+
///
91+
/// Most variants include context about where the failure occurred.
92+
/// [`WalError::Io`] wraps underlying filesystem errors.
3493
#[derive(Error, Debug)]
3594
pub enum WalError {
3695
/// Underlying filesystem failure.

rust/otap-dataflow/crates/quiver/src/wal/reader.rs

Lines changed: 48 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,38 @@
44
//! Read-side companion to the WAL writer.
55
//!
66
//! The reader validates headers, streams entries starting at arbitrary offsets,
7-
//! and exposes helper types such as [`WalConsumerCheckpoint`] so higher layers can
8-
//! describe how much of the log is safe to reclaim. Like the writer, it is
9-
//! currently exercised by tests until the runtime wires in replay logic.
7+
//! and exposes helper types for tracking replay progress.
8+
//!
9+
//! # Entry Format
10+
//!
11+
//! Each WAL entry has this layout:
12+
//!
13+
//! ```text
14+
//! ┌──────────┬────────────────┬─────────────────┬──────────┐
15+
//! │ len (4) │ entry_hdr (25) │ slot_data (var) │ crc (4) │
16+
//! └──────────┴────────────────┴─────────────────┴──────────┘
17+
//! ```
18+
//!
19+
//! - **len**: Size of `entry_hdr + slot_data` (excludes len and crc fields)
20+
//! - **entry_hdr**: Type (1), timestamp (8), sequence (8), slot_bitmap (8)
21+
//! - **slot_data**: For each set bit: slot_id (2), fingerprint (32), rows (4),
22+
//! payload_len (4), Arrow IPC bytes (payload_len)
23+
//! - **crc**: CRC32 over `entry_hdr + slot_data`
24+
//!
25+
//! # Usage
26+
//!
27+
//! ```ignore
28+
//! let mut reader = WalReader::open("wal/quiver.wal")?;
29+
//! let mut cursor = WalConsumerCheckpoint::default();
30+
//!
31+
//! for result in reader.iter_from(0)? {
32+
//! let bundle = result?;
33+
//! println!("seq={} slots={}", bundle.sequence, bundle.slots.len());
34+
//! cursor.increment(&bundle); // in-memory only
35+
//! }
36+
//! // cursor now points past the last entry
37+
//! // call writer.checkpoint_cursor(&cursor) to persist
38+
//! ```
1039
#![allow(dead_code)]
1140

1241
#[cfg(test)]
@@ -188,12 +217,16 @@ pub(crate) struct DecodedWalSlot {
188217
pub payload: Vec<u8>,
189218
}
190219

191-
/// Opaque cursor describing how much of the WAL has been durably processed.
220+
/// Opaque cursor describing how much of the WAL has been seen.
192221
///
193-
/// Operators should not interpret the internal fields directly. Instead:
222+
/// This is an **in-memory only** cursor. Updating it has no durability effect.
223+
/// To persist progress and allow WAL cleanup, pass the cursor to
224+
/// [`WalWriter::checkpoint_cursor()`].
225+
///
226+
/// Usage:
194227
/// 1. Start with `WalConsumerCheckpoint::default()` (beginning of WAL)
195-
/// 2. Call `cursor.advance(&bundle)` after processing each entry
196-
/// 3. Pass the cursor to `advance_consumer_checkpoint()` to persist progress
228+
/// 2. Call `cursor.increment(&bundle)` after processing each entry (in-memory)
229+
/// 3. Call `writer.checkpoint_cursor(&cursor)` to persist and enable cleanup
197230
///
198231
/// The writer validates that checkpoints land on entry boundaries and rejects
199232
/// stale or regressed values.
@@ -209,7 +242,7 @@ impl WalConsumerCheckpoint {
209242
/// Creates a checkpoint positioned immediately after the given bundle.
210243
///
211244
/// This is equivalent to `WalConsumerCheckpoint::default()` followed by
212-
/// `advance(bundle)`, but clearer when you only need to checkpoint a
245+
/// `increment(bundle)`, but clearer when you only need to checkpoint a
213246
/// single entry.
214247
pub fn after(bundle: &WalRecordBundle) -> Self {
215248
Self {
@@ -218,18 +251,21 @@ impl WalConsumerCheckpoint {
218251
}
219252
}
220253

221-
/// Advances the cursor to cover the provided bundle.
254+
/// Moves the cursor past the provided bundle (in-memory only).
255+
///
256+
/// This does **not** persist the checkpoint or trigger WAL cleanup.
257+
/// Call [`WalWriter::checkpoint_cursor()`] to make progress durable.
222258
///
223259
/// Typical usage in a replay loop:
224260
/// ```ignore
225261
/// let mut cursor = WalConsumerCheckpoint::default();
226262
/// for bundle in reader.iter_from(0)? {
227263
/// process(&bundle?);
228-
/// cursor.advance(&bundle);
264+
/// cursor.increment(&bundle); // in-memory only
229265
/// }
230-
/// writer.advance_consumer_checkpoint(&cursor)?;
266+
/// writer.checkpoint_cursor(&cursor)?; // persist + cleanup
231267
/// ```
232-
pub fn advance(&mut self, bundle: &WalRecordBundle) {
268+
pub fn increment(&mut self, bundle: &WalRecordBundle) {
233269
self.safe_offset = bundle.next_offset;
234270
self.safe_sequence = bundle.sequence;
235271
}

rust/otap-dataflow/crates/quiver/src/wal/tests.rs

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -542,9 +542,7 @@ fn wal_writer_records_cursor_without_truncating() {
542542
safe_offset: first_entry.next_offset,
543543
..WalConsumerCheckpoint::default()
544544
};
545-
writer
546-
.advance_consumer_checkpoint(&cursor)
547-
.expect("record cursor");
545+
writer.checkpoint_cursor(&cursor).expect("record cursor");
548546
drop(writer);
549547

550548
let len_after = std::fs::metadata(&wal_path).expect("metadata").len();
@@ -602,16 +600,16 @@ fn wal_writer_enforces_safe_offset_boundaries() {
602600
safe_sequence: first_entry.sequence,
603601
};
604602

605-
match writer.advance_consumer_checkpoint(&cursor) {
603+
match writer.checkpoint_cursor(&cursor) {
606604
Err(WalError::InvalidConsumerCheckpoint(message)) => {
607605
assert_eq!(message, "safe offset splits entry boundary")
608606
}
609607
other => panic!("expected invalid cursor error, got {other:?}"),
610608
}
611609

612-
cursor.advance(&first_entry);
610+
cursor.increment(&first_entry);
613611
writer
614-
.advance_consumer_checkpoint(&cursor)
612+
.checkpoint_cursor(&cursor)
615613
.expect("record succeeds with aligned cursor");
616614
drop(writer);
617615

@@ -639,9 +637,7 @@ fn wal_writer_persists_consumer_checkpoint_sidecar() {
639637
safe_offset: file_len,
640638
..WalConsumerCheckpoint::default()
641639
};
642-
writer
643-
.advance_consumer_checkpoint(&cursor)
644-
.expect("record cursor");
640+
writer.checkpoint_cursor(&cursor).expect("record cursor");
645641
drop(writer);
646642

647643
let sidecar_path = wal_path.parent().expect("dir").join("checkpoint.offset");
@@ -803,7 +799,7 @@ fn wal_writer_enforces_size_cap_and_purges_rotations() {
803799
..WalConsumerCheckpoint::default()
804800
};
805801
writer
806-
.advance_consumer_checkpoint(&cursor)
802+
.checkpoint_cursor(&cursor)
807803
.expect("record cursor purges rotated chunks");
808804

809805
assert!(
@@ -857,9 +853,7 @@ fn wal_writer_ignores_invalid_checkpoint_sidecar() {
857853
safe_offset: file_len,
858854
..WalConsumerCheckpoint::default()
859855
};
860-
writer
861-
.advance_consumer_checkpoint(&cursor)
862-
.expect("record cursor");
856+
writer.checkpoint_cursor(&cursor).expect("record cursor");
863857
drop(writer);
864858

865859
let state = CheckpointSidecar::read_from(&sidecar_path).expect("sidecar");
@@ -1407,7 +1401,7 @@ fn wal_reader_iter_from_respects_offsets() {
14071401
assert_eq!(entry_one.next_offset, entry_two.offset.position);
14081402

14091403
let mut cursor = WalConsumerCheckpoint::default();
1410-
cursor.advance(&entry_one);
1404+
cursor.increment(&entry_one);
14111405
assert_eq!(cursor.safe_offset, entry_one.next_offset);
14121406
assert_eq!(cursor.safe_sequence, entry_one.sequence);
14131407

@@ -1607,7 +1601,7 @@ fn wal_consumer_checkpoint_recovers_after_partial_entry() {
16071601
let mut iter = reader.iter_from(0).expect("iterator");
16081602
let first_entry = iter.next().expect("first entry").expect("ok");
16091603
let mut cursor = WalConsumerCheckpoint::default();
1610-
cursor.advance(&first_entry);
1604+
cursor.increment(&first_entry);
16111605
drop(reader);
16121606

16131607
// Simulate a crash that truncates the file in the middle of the second
@@ -1811,7 +1805,7 @@ fn run_crash_case(case: CrashCase) {
18111805
case.name
18121806
);
18131807
writer_test_support::inject_crash(case.injection);
1814-
let err = match writer.advance_consumer_checkpoint(&cursor) {
1808+
let err = match writer.checkpoint_cursor(&cursor) {
18151809
Ok(_) => panic!("{}: crash injection did not trigger", case.name),
18161810
Err(err) => err,
18171811
};
@@ -1841,7 +1835,7 @@ fn wal_cursor_after_entries(path: &Path, entry_count: usize) -> WalConsumerCheck
18411835
)
18421836
})
18431837
.expect("entry ok while building cursor");
1844-
cursor.advance(&bundle);
1838+
cursor.increment(&bundle);
18451839
}
18461840
cursor
18471841
}
@@ -2061,7 +2055,7 @@ fn wal_writer_rejects_checkpoint_sequence_regression() {
20612055
safe_sequence: entries[1].sequence,
20622056
};
20632057
writer
2064-
.advance_consumer_checkpoint(&cursor_at_second)
2058+
.checkpoint_cursor(&cursor_at_second)
20652059
.expect("advance to second entry");
20662060

20672061
// Now try to regress to the first entry (sequence=0)
@@ -2070,7 +2064,7 @@ fn wal_writer_rejects_checkpoint_sequence_regression() {
20702064
safe_sequence: entries[0].sequence, // sequence=0, which is less than 1
20712065
};
20722066

2073-
match writer.advance_consumer_checkpoint(&cursor_at_first) {
2067+
match writer.checkpoint_cursor(&cursor_at_first) {
20742068
Err(WalError::InvalidConsumerCheckpoint(msg)) => {
20752069
assert!(
20762070
msg.contains("regressed"),
@@ -2105,7 +2099,7 @@ fn wal_writer_rejects_checkpoint_offset_regression() {
21052099
safe_sequence: entries[1].sequence,
21062100
};
21072101
writer
2108-
.advance_consumer_checkpoint(&cursor_at_second)
2102+
.checkpoint_cursor(&cursor_at_second)
21092103
.expect("advance to second entry");
21102104

21112105
// Now try to advance with a higher sequence but lower offset
@@ -2115,7 +2109,7 @@ fn wal_writer_rejects_checkpoint_offset_regression() {
21152109
safe_sequence: entries[1].sequence + 1, // higher sequence to pass that check
21162110
};
21172111

2118-
match writer.advance_consumer_checkpoint(&bad_cursor) {
2112+
match writer.checkpoint_cursor(&bad_cursor) {
21192113
Err(WalError::InvalidConsumerCheckpoint(msg)) => {
21202114
assert!(
21212115
msg.contains("regressed"),
@@ -2218,7 +2212,7 @@ fn wal_recovery_clamps_stale_sidecar_offset() {
22182212
safe_sequence: offset.sequence,
22192213
};
22202214
writer
2221-
.advance_consumer_checkpoint(&cursor)
2215+
.checkpoint_cursor(&cursor)
22222216
.expect("advance checkpoint to end of WAL");
22232217
drop(writer);
22242218

0 commit comments

Comments
 (0)