Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
3331edb
Initial commit
abhikjain360 Jun 8, 2021
340e05d
disk/segment.rs completed
abhikjain360 Jun 8, 2021
2a91e9a
Complete disk
abhikjain360 Jun 8, 2021
36da178
Log implementation started
abhikjain360 Jun 8, 2021
b98e08f
fix Cargo.toml
abhikjain360 Jun 8, 2021
08888c2
fixes and tests
abhikjain360 Jun 9, 2021
7e68265
fixes and tests #2
abhikjain360 Jun 9, 2021
2ea4564
Attempt at completion
abhikjain360 Jun 9, 2021
d5a6822
fixes and tests #3
abhikjain360 Jun 10, 2021
305a682
fixes and tests #4
abhikjain360 Jun 10, 2021
ef9bf17
fixes and tests #5
abhikjain360 Jun 10, 2021
349af1e
visibility fix and docs
abhikjain360 Jun 10, 2021
d5011f0
fix clippy warnings
abhikjain360 Jun 10, 2021
1028ad3
Add review comments
tekjar Jun 10, 2021
68d12c4
de-duplication of file openings
abhikjain360 Jun 10, 2021
ded9c71
Cleanup and fixes
abhikjain360 Jun 10, 2021
d11c554
add hashing and fix disk reading
abhikjain360 Jun 11, 2021
e7c65ae
improve code readability
abhikjain360 Jun 11, 2021
2a4a08b
docs
abhikjain360 Jun 11, 2021
90720ba
windows support
abhikjain360 Jun 11, 2021
451ee80
remove unnecessary comments
abhikjain360 Jun 11, 2021
1ef01c2
use vecdeq in memory
abhikjain360 Jun 12, 2021
d26c1f5
add timestamps
abhikjain360 Jun 14, 2021
ef06ad3
timestamp tests
abhikjain360 Jun 14, 2021
7392b7d
timestamp tests #2
abhikjain360 Jun 14, 2021
a1492a6
timestamp tests #3
abhikjain360 Jun 14, 2021
912c375
Merge branch 'main' into new-logic2
abhikjain360 Jun 14, 2021
23a9a68
fix usize:u64 conversions
abhikjain360 Jun 16, 2021
34ee41a
add CommitLog::next_offset
abhikjain360 Jun 16, 2021
8eb7a36
add CommitLog::head_and_tail, fix tests
abhikjain360 Jun 19, 2021
5f31aa2
fix docs
abhikjain360 Jun 20, 2021
25d5859
fix return types
abhikjain360 Jun 21, 2021
8088fe9
tests anf bug fix
abhikjain360 Jun 22, 2021
4505fac
fixes and tests 6
abhikjain360 Jun 23, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ description = "kafka inspired rumqtt's mqtt commitlog"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
fnv = "1"
byteorder = "1.3"
memmap = "0.7"
log = "0.4"
bytes = "1.0.1"
fnv = "1.0.7"
log = "0.4.14"
sha2 = "0.9.5"

[dev-dependencies]
tempfile = "3.1"
pretty_assertions = "0.6"
mqttbytes = "0.3.0"
pretty_assertions = "0.7.2"
simplelog = "0.10.0"
tempfile = "3.2.0"
292 changes: 292 additions & 0 deletions src/disk/chunk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,292 @@
use std::{io, path::Path};

use bytes::Bytes;
use sha2::Digest;

use super::{index::Index, segment::Segment};

/// The handler for a segment file which is on the disk, and it's corresponding index file.
#[derive(Debug)]
pub(super) struct Chunk {
/// The handle for index file.
index: Index,
/// The handle for segment file.
segment: Segment,
}

impl Chunk {
/// Opens an existing segment-index pair from the disk. Will throw error if either does not
/// exist. Note that this does not verify the checksum. Call [`Chunk::verify`] to do so
/// manually.
///
/// This only opens them immutably.
#[inline]
pub(super) fn open<P: AsRef<Path>>(dir: P, index: u64) -> io::Result<(Self, u64, u64)> {
let index_path = dir.as_ref().join(&format!("{:020}.index", index));
let segment_path = dir.as_ref().join(&format!("{:020}.segment", index));

let (index, start_time, end_time) = Index::open(index_path)?;
let segment = Segment::open(segment_path)?;

Ok((Self { index, segment }, start_time, end_time))
}

/// Creates a new segment-index pair onto the disk, and throws error if they already exist. The
/// given hasher is used to calculate the the checksum of the given bytes. The given bytes are
/// stored as 1 single segment.
///
/// This only opens them immutably, after writing the given data.
pub(super) fn new<P: AsRef<Path>>(
dir: P,
index: u64,
bytes: Vec<(Bytes, u64)>,
hasher: &mut impl Digest,
) -> io::Result<Self> {
let index_path = dir.as_ref().join(&format!("{:020}.index", index));
let segment_path = dir.as_ref().join(&format!("{:020}.segment", index));

let mut lens = Vec::with_capacity(bytes.len());
for (byte, timestamp) in &bytes {
lens.push((byte.len() as u64, *timestamp));
}

let bytes: Vec<u8> = bytes.into_iter().map(|x| x.0).flatten().collect();
let bytes = Bytes::from(bytes);
hasher.update(&bytes);
let hash = hasher.finalize_reset();

let segment = Segment::new(segment_path, bytes)?;
// SAFETY: the length is already this, but AsRef for this length not implemented.
let index = Index::new(index_path, hash.as_ref(), lens)?;

Ok(Self { index, segment })
}

/// Get the size of the segment.
#[allow(dead_code)]
#[inline]
pub(super) fn segment_size(&self) -> u64 {
self.segment.size()
}

/// Get the timestamp of the first entry.
#[inline]
pub(super) fn head_time(&self) -> u64 {
self.index.head_time()
}

/// Get the timestamp of the last entry.
#[inline]
pub(super) fn tail_time(&self) -> u64 {
self.index.tail_time()
}

/// Verify the checksum by reading the checksum from the start of the index file, calcuating
/// the checksum of segment file and then comparing those two.
pub(super) fn verify(&self, hasher: &mut impl Digest) -> io::Result<bool> {
let read_hash = self.index.read_hash()?;
// unwrap fine as we reading the exactly the len starting from 0.
let read_segment = self.segment.read(0, self.segment.size())?.unwrap();
hasher.update(&read_segment);
let calculated_hash = hasher.finalize_reset();
Ok(calculated_hash.len() == read_hash.len()
&& read_hash
.iter()
.enumerate()
.all(|(i, x)| *x == calculated_hash[i]))
}

/// Read a packet from the disk segment at the particular index.
#[inline]
pub(super) fn read(&self, index: u64) -> io::Result<Option<Bytes>> {
if index >= self.index.entries() {
return Ok(None);
}
let [offset, len] = self.index.read(index)?;
self.segment.read(offset, len)
}

/// Read a packet from the disk segment at the particular index, and also retreive it's
/// timestamp.
#[inline]
pub(super) fn read_with_timestamps(&self, index: u64) -> io::Result<Option<(Bytes, u64)>> {
if index >= self.index.entries() {
return Ok(None)
}
let [timestamp, offset, len] = self.index.read_with_timestamps(index)?;
Ok(Some((match self.segment.read(offset, len)? {
Some(ret) => ret,
// TODO: maybe we should let users know that index file is wrong, or maybe we should
// optimize this as unreachable.
None => return Ok(None),
}, timestamp)))
}

/// Read `len` packets from disk starting at `index`. If it is not possible to read `len`, it
/// returns the number of bytes still left to read.
#[inline]
pub(super) fn readv(&self, index: u64, len: u64, out: &mut Vec<Bytes>) -> io::Result<Option<u64>> {
if index >= self.index.entries() {
return Ok(None)
}
let (offsets, left) = self.index.readv(index, len)?;
self.segment.readv(offsets, out)?;
Ok(Some(left))
}

/// Read `len` packets from disk starting at `index` as well as their timestamps. If it is not
/// possible to read `len`, it returns the number of bytes still left to read.
#[inline]
pub(super) fn readv_with_timestamps(
&self,
index: u64,
len: u64,
out: &mut Vec<(Bytes, u64)>,
) -> io::Result<Option<u64>> {
if index >= self.index.entries() {
return Ok(None)
}
let (offsets, left) = self.index.readv_with_timestamps(index, len)?;
self.segment.readv_with_timestamps(offsets, out)?;
Ok(Some(left))
}

/// Get the index that corresponds to the given timestamp, and if exact match is not found then
/// the entry with immediate next timestamp is returned.
#[inline]
pub(super) fn index_from_timestamp(&self, timestamp: u64) -> io::Result<u64> {
self.index.index_from_timestamp(timestamp)
}

/// Checks whether the timestamp given is contained within the smallest and the largest
/// timestamps of the entries. Does **not** checks for exact match.
#[inline]
pub(super) fn is_timestamp_contained(&self, timestamp: u64) -> bool {
self.index.is_timestamp_contained(timestamp)
}

/// Total number of packet appended.
#[inline(always)]
pub(super) fn entries(&self) -> u64 {
self.index.entries()
}
}

#[cfg(test)]
mod test {
use bytes::Bytes;
use pretty_assertions::assert_eq;
use sha2::Sha256;
use tempfile::tempdir;

use super::*;

#[test]
fn new_and_read_chunk() {
let dir = tempdir().unwrap();
let mut hasher = Sha256::new();

let mut v = Vec::with_capacity(20);
for i in 0..20u8 {
v.push((Bytes::from(vec![i; 1024]), i as u64 * 100));
}

let chunk = Chunk::new(dir.path(), 0, v, &mut hasher).unwrap();
assert!(chunk.verify(&mut hasher).unwrap());

for i in 0..20u8 {
let byte = chunk.read(i as u64).unwrap().unwrap();
assert_eq!(byte.len(), 1024);
assert_eq!(byte[0], i);
assert_eq!(byte[1023], i);
}
assert_eq!(chunk.read(20).unwrap(), None);

for i in 0..20u8 {
let (byte, timestamp) = chunk.read_with_timestamps(i as u64).unwrap().unwrap();
assert_eq!(byte.len(), 1024);
assert_eq!(byte[0], i);
assert_eq!(byte[1023], i);
assert_eq!(timestamp, i as u64 * 100);
}
assert_eq!(chunk.read_with_timestamps(20).unwrap(), None);

let mut out = Vec::with_capacity(chunk.entries() as usize);
chunk.readv(0, chunk.entries(), &mut out).unwrap().unwrap();
assert_eq!(chunk.readv(20, 1, &mut out).unwrap(), None);

for (i, byte) in out.into_iter().enumerate() {
assert_eq!(byte.len(), 1024);
assert_eq!(byte[0], i as u8);
assert_eq!(byte[1023], i as u8);
}

let mut out = Vec::with_capacity(chunk.entries() as usize);
chunk.readv_with_timestamps(0, chunk.entries(), &mut out).unwrap();
assert_eq!(chunk.readv_with_timestamps(20, 1, &mut out).unwrap(), None);

for (i, (byte, timestamp)) in out.into_iter().enumerate() {
assert_eq!(byte.len(), 1024);
assert_eq!(byte[0], i as u8);
assert_eq!(byte[1023], i as u8);
assert_eq!(timestamp, i as u64 * 100);
}
}

#[test]
fn open_and_read_chunk() {
let dir = tempdir().unwrap();
let mut hasher = Sha256::new();

let mut v = Vec::with_capacity(20);
for i in 0..20u8 {
v.push((Bytes::from(vec![i; 1024]), i as u64 * 100));
}

let chunk = Chunk::new(dir.path(), 0, v, &mut hasher).unwrap();
assert!(chunk.verify(&mut hasher).unwrap());

drop(chunk);

let (chunk, _, _) = Chunk::open(dir.path(), 0).unwrap();
assert!(chunk.verify(&mut hasher).unwrap());

for i in 0..20u8 {
let byte = chunk.read(i as u64).unwrap().unwrap();
assert_eq!(byte.len(), 1024);
assert_eq!(byte[0], i);
assert_eq!(byte[1023], i);
}
assert_eq!(chunk.read(20).unwrap(), None);

for i in 0..20u8 {
let (byte, timestamp) = chunk.read_with_timestamps(i as u64).unwrap().unwrap();
assert_eq!(byte.len(), 1024);
assert_eq!(byte[0], i);
assert_eq!(byte[1023], i);
assert_eq!(timestamp, i as u64 * 100);
}
assert_eq!(chunk.read_with_timestamps(20).unwrap(), None);

let mut out = Vec::with_capacity(chunk.entries() as usize);
chunk.readv(0, chunk.entries(), &mut out).unwrap();
assert_eq!(chunk.readv(20, 1, &mut out).unwrap(), None);

for (i, byte) in out.into_iter().enumerate() {
assert_eq!(byte.len(), 1024);
assert_eq!(byte[0], i as u8);
assert_eq!(byte[1023], i as u8);
}

let mut out = Vec::with_capacity(chunk.entries() as usize);
chunk.readv_with_timestamps(0, chunk.entries(), &mut out).unwrap();
assert_eq!(chunk.readv_with_timestamps(20, 1, &mut out).unwrap(), None);

for (i, (byte, timestamp)) in out.into_iter().enumerate() {
assert_eq!(byte.len(), 1024);
assert_eq!(byte[0], i as u8);
assert_eq!(byte[1023], i as u8);
assert_eq!(timestamp, i as u64 * 100);
}
}
}
Loading