Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
3404624
Remove stray README.md
AaronRM Dec 2, 2025
047f7a2
Add initial WAL framing and writer
AaronRM Dec 2, 2025
6f1f630
Refactor WAL contants to mod level; initial WAL reader
AaronRM Dec 2, 2025
4018dbd
Initial engine + WAL integration
AaronRM Dec 2, 2025
86c9ea0
Add additional test coverage, cross-cutting WAL tests
AaronRM Dec 2, 2025
5157167
Address clippy errors
AaronRM Dec 2, 2025
d20b541
Add copyright headers
AaronRM Dec 2, 2025
b9a90a5
Add missing copyright header line
AaronRM Dec 2, 2025
ff109fa
Add error injection plumbing to reader tests
AaronRM Dec 2, 2025
48c90da
Add header tests for additional line coverage
AaronRM Dec 2, 2025
febe725
Add tests for cases around header/config mismatch.
AaronRM Dec 2, 2025
da7fc58
Add truncate_to to WalWriter; tests for recovery and error condition.
AaronRM Dec 2, 2025
2ebdd88
Add flush on drop and when bytes have exceeded max_unflushed_bytes.
AaronRM Dec 2, 2025
25825d5
Add cross segment iteration tests
AaronRM Dec 2, 2025
38ec8f2
Add tests for 64 bitmap slots, large RecordBundles
AaronRM Dec 2, 2025
7347361
Update ARCHITECTURE.md to clarify the CRC algorithm.
AaronRM Dec 2, 2025
aeecbbb
Add test + implementation for flush syncing data for durability
AaronRM Dec 2, 2025
948d465
Add truncate.offset implementation
AaronRM Dec 2, 2025
2ca8053
Add tests and implementation for Prefix Reclamation (Hole Punching)
AaronRM Dec 2, 2025
fe9a7b0
Add rotation_target_bytes; additional rotation & cap tests
AaronRM Dec 3, 2025
7fbb0c0
Fix doctest to write to tempdir
AaronRM Dec 3, 2025
80c8c8b
Add support for safe offset boundaries
AaronRM Dec 3, 2025
91a28e6
Add additional comments to key methods in the WAL reader/writer imple…
AaronRM Dec 3, 2025
699dddd
Add tests for crash/resume scenarios
AaronRM Dec 3, 2025
9206451
Fix warning about unused variable
AaronRM Dec 3, 2025
5e00da6
Formatting
AaronRM Dec 3, 2025
67c01ed
Address clippy errors, formatting
AaronRM Dec 3, 2025
31f8065
Reload rotated chunks on WalWriter restart
AaronRM Dec 3, 2025
32892b4
Refactor to include WalCoordinator, WalSegment. Reduce duplication in…
AaronRM Dec 4, 2025
6290235
Remove hole-punching for initial implementation; fix sidecar state to…
AaronRM Dec 5, 2025
9c48b86
Remove reclaim_prefix
AaronRM Dec 5, 2025
1e49492
Update terminology used in WAL implementation for consistency & reada…
AaronRM Dec 5, 2025
83cd09b
Remove some extraneous functionality from prior edits
AaronRM Dec 5, 2025
8260d85
Update README.md
AaronRM Dec 5, 2025
fa93cfd
Simplify checkpoint sidecar format, file rotation, flush policy
AaronRM Dec 5, 2025
e2d6e7d
Remove rotation_generation mention from ARCHITECTURE.md
AaronRM Dec 5, 2025
fe96508
Resolve clippy warnings
AaronRM Dec 5, 2025
4d1b3a0
Formatting
AaronRM Dec 5, 2025
ada76af
Add additional testcases, reduce boilerplate.
AaronRM Dec 5, 2025
515c953
Encapsulate WAL checkpoint details
AaronRM Dec 5, 2025
575fd25
Ensure parent directory sync after rename
AaronRM Dec 5, 2025
61b4d9c
Add additional doc comments; rename checkpoint API names for clarity.
AaronRM Dec 5, 2025
d693425
Remove non-ASCII chars
AaronRM Dec 5, 2025
d6096d2
Add BSD-2-Clause to allowed licenses
AaronRM Dec 5, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 0 additions & 36 deletions rust/experimental/quiver/README.md

This file was deleted.

4 changes: 3 additions & 1 deletion rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ base64 = "0.22.1"
bitflags = "2.10"
bytes = "1.11"
bytemuck = "1.24"
blake3 = "1.5.5"
chrono = { version = "0.4", features = ["serde"] }
ciborium = "0.2.2"
clap = { version = "4.5.42", features = ["derive"] }
Expand All @@ -83,7 +84,7 @@ local-sync = "0.1.1"
log = "0.4"
miette = { version="7.6.0", features = ["fancy"] }
mimalloc-rust = "0.2.1"
nix = { version = "0.30.1", features = ["process", "signal"] }
nix = { version = "0.30.1", features = ["process", "signal", "fs"] }
num_enum = "0.7"
object_store = "0.12.3"
once_cell = "1.20.2"
Expand Down Expand Up @@ -111,6 +112,7 @@ serde_cbor = "0.11.2"
serde_json = { version = "1.0.145" }
serde_with = { version = "3.16.0", features = ["std", "macros", "json"] }
serde_yaml = "0.9.34-deprecated" # Deprecated, but no good alternative yet
crc32fast = "1.4.2"
replace_with = "0.1.8"
simdutf8 = "0.1.5"
slotmap = "1.0.7"
Expand Down
104 changes: 48 additions & 56 deletions rust/otap-dataflow/crates/quiver/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ WAL entries belonging to:
}
[u8;payload_len] arrow_payload; // streaming IPC bytes for the slot
}
u32 crc32c; // trailer; covers EntryHeader..payloads
u32 crc32; // trailer; covers EntryHeader..payloads
```

- Arrow payload blobs are serialized in the **streaming** IPC format. For each
Expand All @@ -150,8 +150,9 @@ WAL entries belonging to:
[metadata1][payload1]...` Absent slots (bitmap bit cleared) contribute neither
metadata nor bytes; they are implicitly `None` when reconstructing the
bundle.
- Every entry ends with a 4-byte little-endian CRC32C checksum that covers the
entry header, bitmap, metadata blocks, and payload bytes (everything except
- Every entry ends with a 4-byte little-endian CRC32 (IEEE polynomial)
checksum that covers the entry header, bitmap, metadata blocks, and payload
bytes (everything except
the leading length field and the checksum itself). Replay verifies the CRC
before decoding Arrow IPC bytes; a mismatch marks the WAL as corrupted and
triggers truncation back to the last known-good offset.
Expand All @@ -171,78 +172,69 @@ WAL entries belonging to:
introduce new entry types (e.g., periodic checkpoints) or swap serialization
without breaking older data; unknown entry types are skipped using the recorded
length. A checkpoint entry (`entry_type = 1`) would embed the current
open-segment manifest, `truncate_offset`, and high-water sequence numbers so
open-segment manifest, `logical_offset`, and high-water sequence numbers so
recovery can jump directly to the latest checkpoint instead of replaying the
entire log.

##### Truncation & rotation mechanics

- **Track truncate progress**: After a segment finalizes and its metadata + file
are flushed, we advance a `truncate_offset` pointer to the first byte belonging
to the next entry in the open segment. Think of `truncate_offset` as "the
earliest WAL byte still needed for crash recovery." We persist that `u64`
(plus a monotonically increasing rotation generation) into a tiny sidecar file
(e.g., `wal/truncate.offset`) immediately after advancing it and fsync the
sidecar so crash recovery can seek straight to that logical offset without
rescanning finalized entries.
- **Truncate sidecar format**: The sidecar is a fixed 32-byte struct written in
##### Checkpointing & rotation mechanics

- **Track checkpoint progress**: After a segment finalizes and its metadata + file
are flushed, we advance a *logical* cursor that counts data bytes (headers
excluded) across the entire WAL stream. This `global_data_offset` represents "the
earliest WAL data still needed for crash recovery." We persist that `u64` into
a tiny sidecar file (e.g., `wal/checkpoint.offset`) immediately after advancing
it and fsync the sidecar so crash recovery can resume from that logical offset
without rescanning finalized entries.
- **Checkpoint sidecar format**: The sidecar is a fixed 24-byte struct written in
little-endian order:

```text
TruncateSidecar {
CheckpointSidecar {
[u8; 8] magic = b"QUIVER\0T"; // distinguishes from WAL proper
u16 version = 1; // bump if layout changes
u16 reserved = 0;
u64 truncate_offset; // first byte still needed in wal/quiver.wal
u64 rotation_generation; // increments each WAL rotation
u32 crc32c; // covers magic..rotation_generation
u64 global_data_offset; // logical data bytes still required
u32 crc32; // covers magic..global_data_offset
}
```

We write updates via `truncate.offset.tmp`: encode the struct, compute the CRC,
We write updates via `checkpoint.offset.tmp`: encode the struct, compute the CRC,
`pwrite`+`fdatasync`, then `renameat` over the live file so readers see either
the old or new offset. On startup we verify the magic, version, and checksum
before trusting the recorded offsets; failure falls back to scanning from the
beginning of `quiver.wal`.
- **Probe prefix reclamation**: On startup we test whether the active filesystem
supports punching holes out of the current WAL file. Linux builds attempt
`fallocate(FALLOC_FL_PUNCH_HOLE)` against a temporary WAL stub while Windows
builds issue `FSCTL_SET_ZERO_DATA` on a sparse scratch file. If the probe
fails with `EOPNOTSUPP`/`ERROR_INVALID_FUNCTION` we mark the capability as
disabled and fall back to rewriting until the process restarts.
- **Drop reclaimed prefixes**: When support exists, or when the pointer crosses a
configurable threshold, we invoke `fallocate(FALLOC_FL_PUNCH_HOLE)` in-place to
discard bytes `[header_len, truncate_offset)`, leaving the fixed header (30
bytes: magic + version + reserved + cfg hash) intact. On filesystems where hole
punching is not supported, we instead perform a rewrite: copy every byte from
`truncate_offset` through the current end-of-file into `quiver.wal.new` (using
`copy_file_range`/`CopyFile2` where available) while the writer stays paused,
then reopen the new file and stream the fixed header back to the front so the
layout matches a freshly created WAL. Once the copy completes we fsync the new
file, atomically rename it over the original, and resume appends at offset
`(header_len) + (old_len - truncate_offset)`. Windows builds rely on
`CopyFile2` plus `SetFileInformationByHandle` for the same sequence. This
rewrite path preserves the header bytes verbatim, so replay cannot distinguish
a rewritten WAL from one that was never hole-punched.
- **Record prefix reclamation**: When higher layers report a safe cursor, we
translate it into the logical coordinate space and persist it via the sidecar
while leaving the active WAL file unchanged. The pointer simply marks how far
readers have progressed through the concatenated stream.
- **Drop reclaimed prefixes**: Once the active file grows beyond
`rotation_target_bytes` we rotate it to `quiver.wal.N` (where N is a
monotonically increasing rotation id), start a fresh WAL, and remember the
byte span covered by the retired file. When the persisted consumer checkpoint
fully covers a rotated file we delete the file outright. Until a rotation
occurs the reclaimed bytes remain in the active file even though they are
logically safe to discard.
*Note:* We may revisit direct hole punching in the future as a disk-space
optimization if production workloads show that waiting for rotation leaves too
much reclaimed data stranded in the active file.
- **Rotate on size**: `wal.max_size` caps the *aggregate* footprint of the active
WAL plus every still-referenced rotated sibling. We keep a running total of the
active file and the byte spans tracked for `quiver.wal.N`; when the next append
would push the aggregate over the configured cap we rotate immediately: shift
older suffixes up, close and rename `wal/quiver.wal` to `quiver.wal.1`, and
reopen a fresh `quiver.wal`. Each rotation records the byte span covered by the
retired chunk so cleanup can later delete `quiver.wal.N` only after the
persisted `truncate_offset` exceeds that span's upper bound. We never
hole-punch rotated files; they are deleted wholesale once fully covered by the
durability pointer, avoiding rewrites of large historical blobs while keeping
the total WAL footprint bounded by `wal.max_size`. To keep rename churn and
per-core directory fan-out predictable we retain at most `wal.max_chunks`
files (default `10`, counting the active WAL plus rotated siblings). Operators
can override the default. Hitting the chunk cap is treated like hitting the
byte cap: the next append that *would* require a rotation instead trips
backpressure (or `drop_oldest`, if selected) until either truncation reclaims
an older chunk or the limit is raised. We never create an eleventh file in the
background because doing so would undermine the predictive bound the knob is
meant to provide.
would push the aggregate over the configured cap we rotate immediately: rename
`wal/quiver.wal` to `quiver.wal.{next_rotation_id}` and reopen a fresh
`quiver.wal`. Each rotation records the byte span covered by the retired file
so cleanup can later delete `quiver.wal.N` only after the persisted logical
cursor exceeds that span's upper bound. We never rewrite rotated files; they
are deleted wholesale once fully covered by the durability pointer, avoiding
rewrites of large historical blobs while keeping the total WAL footprint
bounded by `wal.max_size`. To keep per-core directory fan-out predictable we
retain at most `wal.max_rotated_files` files (default `10`, counting the
active WAL plus rotated siblings). Operators can override the default. Hitting
the rotated file cap is treated like hitting the byte cap: the next append
that *would* require a rotation instead trips backpressure (or `drop_oldest`,
if selected) until either compaction reclaims an older rotated file or the
limit is raised. We never create more files in the background because doing so
would undermine the predictive bound the knob is meant to provide.
- **Durability-only dependency**: Because WAL truncation depends solely on segment
durability, exporter ACK lag never blocks WAL cleanup; segments themselves
remain on disk until subscribers advance, but the WAL only needs to cover the
Expand Down
8 changes: 8 additions & 0 deletions rust/otap-dataflow/crates/quiver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,19 @@ serde = ["dep:serde"]
[dependencies]
arrow-array.workspace = true
arrow-schema.workspace = true
arrow-ipc.workspace = true
crc32fast.workspace = true
blake3.workspace = true
parking_lot.workspace = true
serde = { workspace = true, optional = true }
thiserror.workspace = true

[target.'cfg(unix)'.dependencies]
nix.workspace = true

[dev-dependencies]
criterion.workspace = true
tempfile.workspace = true

[lints]
workspace = true
Expand Down
7 changes: 4 additions & 3 deletions rust/otap-dataflow/crates/quiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ explicitly enabled.

## Status

**Not production-ready** This crate is being prototyped based on the
specifications in `ARCHITECTURE.md` (which may be updated as development proceeds).
*It is not yet complete, stable or suitable for taking a dependency on.*
**Under Development, Not production-ready** This crate is
being actively developed based on the specifications in `ARCHITECTURE.md`
(which may be updated as development proceeds). *It is not yet complete,
stable or suitable for taking a dependency on.*

## Quick start

Expand Down
40 changes: 31 additions & 9 deletions rust/otap-dataflow/crates/quiver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,19 +123,26 @@ impl QuiverConfigBuilder {
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct WalConfig {
/// Maximum on-disk footprint (across active + rotated chunks).
/// Maximum on-disk footprint (across active + rotated files).
pub max_size_bytes: NonZeroU64,
/// Maximum number of chunk files retained during rotation.
pub max_chunk_count: u16,
/// Maximum number of rotated WAL files retained during rotation.
pub max_rotated_files: u16,
/// Target data size to keep in the active WAL file before rotating.
pub rotation_target_bytes: NonZeroU64,
/// Preferred fsync cadence for durability vs. latency.
pub flush_interval: Duration,
}

impl WalConfig {
fn validate(&self) -> Result<()> {
if self.max_chunk_count == 0 {
if self.max_rotated_files == 0 {
return Err(QuiverError::invalid_config(
"max_chunk_count must be at least 1",
"max_rotated_files must be at least 1",
));
}
if self.rotation_target_bytes > self.max_size_bytes {
return Err(QuiverError::invalid_config(
"rotation_target_bytes must not exceed max_size_bytes",
));
}
Ok(())
Expand All @@ -146,7 +153,8 @@ impl Default for WalConfig {
fn default() -> Self {
Self {
max_size_bytes: NonZeroU64::new(4 * 1024 * 1024 * 1024).expect("non-zero"),
max_chunk_count: 10,
max_rotated_files: 10,
rotation_target_bytes: NonZeroU64::new(64 * 1024 * 1024).expect("non-zero"),
flush_interval: Duration::from_millis(25),
}
}
Expand Down Expand Up @@ -279,7 +287,8 @@ mod tests {
fn builder_overrides_sub_configs() {
let wal = WalConfig {
max_size_bytes: NonZeroU64::new(1).unwrap(),
max_chunk_count: 1,
max_rotated_files: 1,
rotation_target_bytes: NonZeroU64::new(1).unwrap(),
flush_interval: Duration::from_millis(1),
};
let segment = SegmentConfig {
Expand Down Expand Up @@ -316,10 +325,23 @@ mod tests {
}

#[test]
fn wal_validate_rejects_zero_chunk_count() {
fn wal_validate_rejects_zero_rotated_files() {
let wal = WalConfig {
max_size_bytes: NonZeroU64::new(1).unwrap(),
max_chunk_count: 0,
max_rotated_files: 0,
rotation_target_bytes: NonZeroU64::new(1).unwrap(),
flush_interval: Duration::from_millis(1),
};

assert!(wal.validate().is_err());
}

#[test]
fn wal_validate_rejects_rotation_target_exceeding_cap() {
let wal = WalConfig {
max_size_bytes: NonZeroU64::new(1024).unwrap(),
max_rotated_files: 1,
rotation_target_bytes: NonZeroU64::new(2048).unwrap(),
flush_interval: Duration::from_millis(1),
};

Expand Down
Loading
Loading