diff --git a/Cargo.toml b/Cargo.toml index 67cb3aa..56c6cb2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,11 +10,13 @@ description = "kafka inspired rumqtt's mqtt commitlog" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -fnv = "1" -byteorder = "1.3" -memmap = "0.7" -log = "0.4" +bytes = "1.0.1" +fnv = "1.0.7" +log = "0.4.14" +sha2 = "0.9.5" [dev-dependencies] -tempfile = "3.1" -pretty_assertions = "0.6" +mqttbytes = "0.3.0" +pretty_assertions = "0.7.2" +simplelog = "0.10.0" +tempfile = "3.2.0" diff --git a/src/disk/chunk.rs b/src/disk/chunk.rs new file mode 100644 index 0000000..b9528db --- /dev/null +++ b/src/disk/chunk.rs @@ -0,0 +1,292 @@ +use std::{io, path::Path}; + +use bytes::Bytes; +use sha2::Digest; + +use super::{index::Index, segment::Segment}; + +/// The handler for a segment file which is on the disk, and it's corresponding index file. +#[derive(Debug)] +pub(super) struct Chunk { + /// The handle for index file. + index: Index, + /// The handle for segment file. + segment: Segment, +} + +impl Chunk { + /// Opens an existing segment-index pair from the disk. Will throw error if either does not + /// exist. Note that this does not verify the checksum. Call [`Chunk::verify`] to do so + /// manually. + /// + /// This only opens them immutably. + #[inline] + pub(super) fn open>(dir: P, index: u64) -> io::Result<(Self, u64, u64)> { + let index_path = dir.as_ref().join(&format!("{:020}.index", index)); + let segment_path = dir.as_ref().join(&format!("{:020}.segment", index)); + + let (index, start_time, end_time) = Index::open(index_path)?; + let segment = Segment::open(segment_path)?; + + Ok((Self { index, segment }, start_time, end_time)) + } + + /// Creates a new segment-index pair onto the disk, and throws error if they already exist. The + /// given hasher is used to calculate the the checksum of the given bytes. The given bytes are + /// stored as 1 single segment. + /// + /// This only opens them immutably, after writing the given data. + pub(super) fn new>( + dir: P, + index: u64, + bytes: Vec<(Bytes, u64)>, + hasher: &mut impl Digest, + ) -> io::Result { + let index_path = dir.as_ref().join(&format!("{:020}.index", index)); + let segment_path = dir.as_ref().join(&format!("{:020}.segment", index)); + + let mut lens = Vec::with_capacity(bytes.len()); + for (byte, timestamp) in &bytes { + lens.push((byte.len() as u64, *timestamp)); + } + + let bytes: Vec = bytes.into_iter().map(|x| x.0).flatten().collect(); + let bytes = Bytes::from(bytes); + hasher.update(&bytes); + let hash = hasher.finalize_reset(); + + let segment = Segment::new(segment_path, bytes)?; + // SAFETY: the length is already this, but AsRef for this length not implemented. + let index = Index::new(index_path, hash.as_ref(), lens)?; + + Ok(Self { index, segment }) + } + + /// Get the size of the segment. + #[allow(dead_code)] + #[inline] + pub(super) fn segment_size(&self) -> u64 { + self.segment.size() + } + + /// Get the timestamp of the first entry. + #[inline] + pub(super) fn head_time(&self) -> u64 { + self.index.head_time() + } + + /// Get the timestamp of the last entry. + #[inline] + pub(super) fn tail_time(&self) -> u64 { + self.index.tail_time() + } + + /// Verify the checksum by reading the checksum from the start of the index file, calcuating + /// the checksum of segment file and then comparing those two. + pub(super) fn verify(&self, hasher: &mut impl Digest) -> io::Result { + let read_hash = self.index.read_hash()?; + // unwrap fine as we reading the exactly the len starting from 0. + let read_segment = self.segment.read(0, self.segment.size())?.unwrap(); + hasher.update(&read_segment); + let calculated_hash = hasher.finalize_reset(); + Ok(calculated_hash.len() == read_hash.len() + && read_hash + .iter() + .enumerate() + .all(|(i, x)| *x == calculated_hash[i])) + } + + /// Read a packet from the disk segment at the particular index. + #[inline] + pub(super) fn read(&self, index: u64) -> io::Result> { + if index >= self.index.entries() { + return Ok(None); + } + let [offset, len] = self.index.read(index)?; + self.segment.read(offset, len) + } + + /// Read a packet from the disk segment at the particular index, and also retreive it's + /// timestamp. + #[inline] + pub(super) fn read_with_timestamps(&self, index: u64) -> io::Result> { + if index >= self.index.entries() { + return Ok(None) + } + let [timestamp, offset, len] = self.index.read_with_timestamps(index)?; + Ok(Some((match self.segment.read(offset, len)? { + Some(ret) => ret, + // TODO: maybe we should let users know that index file is wrong, or maybe we should + // optimize this as unreachable. + None => return Ok(None), + }, timestamp))) + } + + /// Read `len` packets from disk starting at `index`. If it is not possible to read `len`, it + /// returns the number of bytes still left to read. + #[inline] + pub(super) fn readv(&self, index: u64, len: u64, out: &mut Vec) -> io::Result> { + if index >= self.index.entries() { + return Ok(None) + } + let (offsets, left) = self.index.readv(index, len)?; + self.segment.readv(offsets, out)?; + Ok(Some(left)) + } + + /// Read `len` packets from disk starting at `index` as well as their timestamps. If it is not + /// possible to read `len`, it returns the number of bytes still left to read. + #[inline] + pub(super) fn readv_with_timestamps( + &self, + index: u64, + len: u64, + out: &mut Vec<(Bytes, u64)>, + ) -> io::Result> { + if index >= self.index.entries() { + return Ok(None) + } + let (offsets, left) = self.index.readv_with_timestamps(index, len)?; + self.segment.readv_with_timestamps(offsets, out)?; + Ok(Some(left)) + } + + /// Get the index that corresponds to the given timestamp, and if exact match is not found then + /// the entry with immediate next timestamp is returned. + #[inline] + pub(super) fn index_from_timestamp(&self, timestamp: u64) -> io::Result { + self.index.index_from_timestamp(timestamp) + } + + /// Checks whether the timestamp given is contained within the smallest and the largest + /// timestamps of the entries. Does **not** checks for exact match. + #[inline] + pub(super) fn is_timestamp_contained(&self, timestamp: u64) -> bool { + self.index.is_timestamp_contained(timestamp) + } + + /// Total number of packet appended. + #[inline(always)] + pub(super) fn entries(&self) -> u64 { + self.index.entries() + } +} + +#[cfg(test)] +mod test { + use bytes::Bytes; + use pretty_assertions::assert_eq; + use sha2::Sha256; + use tempfile::tempdir; + + use super::*; + + #[test] + fn new_and_read_chunk() { + let dir = tempdir().unwrap(); + let mut hasher = Sha256::new(); + + let mut v = Vec::with_capacity(20); + for i in 0..20u8 { + v.push((Bytes::from(vec![i; 1024]), i as u64 * 100)); + } + + let chunk = Chunk::new(dir.path(), 0, v, &mut hasher).unwrap(); + assert!(chunk.verify(&mut hasher).unwrap()); + + for i in 0..20u8 { + let byte = chunk.read(i as u64).unwrap().unwrap(); + assert_eq!(byte.len(), 1024); + assert_eq!(byte[0], i); + assert_eq!(byte[1023], i); + } + assert_eq!(chunk.read(20).unwrap(), None); + + for i in 0..20u8 { + let (byte, timestamp) = chunk.read_with_timestamps(i as u64).unwrap().unwrap(); + assert_eq!(byte.len(), 1024); + assert_eq!(byte[0], i); + assert_eq!(byte[1023], i); + assert_eq!(timestamp, i as u64 * 100); + } + assert_eq!(chunk.read_with_timestamps(20).unwrap(), None); + + let mut out = Vec::with_capacity(chunk.entries() as usize); + chunk.readv(0, chunk.entries(), &mut out).unwrap().unwrap(); + assert_eq!(chunk.readv(20, 1, &mut out).unwrap(), None); + + for (i, byte) in out.into_iter().enumerate() { + assert_eq!(byte.len(), 1024); + assert_eq!(byte[0], i as u8); + assert_eq!(byte[1023], i as u8); + } + + let mut out = Vec::with_capacity(chunk.entries() as usize); + chunk.readv_with_timestamps(0, chunk.entries(), &mut out).unwrap(); + assert_eq!(chunk.readv_with_timestamps(20, 1, &mut out).unwrap(), None); + + for (i, (byte, timestamp)) in out.into_iter().enumerate() { + assert_eq!(byte.len(), 1024); + assert_eq!(byte[0], i as u8); + assert_eq!(byte[1023], i as u8); + assert_eq!(timestamp, i as u64 * 100); + } + } + + #[test] + fn open_and_read_chunk() { + let dir = tempdir().unwrap(); + let mut hasher = Sha256::new(); + + let mut v = Vec::with_capacity(20); + for i in 0..20u8 { + v.push((Bytes::from(vec![i; 1024]), i as u64 * 100)); + } + + let chunk = Chunk::new(dir.path(), 0, v, &mut hasher).unwrap(); + assert!(chunk.verify(&mut hasher).unwrap()); + + drop(chunk); + + let (chunk, _, _) = Chunk::open(dir.path(), 0).unwrap(); + assert!(chunk.verify(&mut hasher).unwrap()); + + for i in 0..20u8 { + let byte = chunk.read(i as u64).unwrap().unwrap(); + assert_eq!(byte.len(), 1024); + assert_eq!(byte[0], i); + assert_eq!(byte[1023], i); + } + assert_eq!(chunk.read(20).unwrap(), None); + + for i in 0..20u8 { + let (byte, timestamp) = chunk.read_with_timestamps(i as u64).unwrap().unwrap(); + assert_eq!(byte.len(), 1024); + assert_eq!(byte[0], i); + assert_eq!(byte[1023], i); + assert_eq!(timestamp, i as u64 * 100); + } + assert_eq!(chunk.read_with_timestamps(20).unwrap(), None); + + let mut out = Vec::with_capacity(chunk.entries() as usize); + chunk.readv(0, chunk.entries(), &mut out).unwrap(); + assert_eq!(chunk.readv(20, 1, &mut out).unwrap(), None); + + for (i, byte) in out.into_iter().enumerate() { + assert_eq!(byte.len(), 1024); + assert_eq!(byte[0], i as u8); + assert_eq!(byte[1023], i as u8); + } + + let mut out = Vec::with_capacity(chunk.entries() as usize); + chunk.readv_with_timestamps(0, chunk.entries(), &mut out).unwrap(); + assert_eq!(chunk.readv_with_timestamps(20, 1, &mut out).unwrap(), None); + + for (i, (byte, timestamp)) in out.into_iter().enumerate() { + assert_eq!(byte.len(), 1024); + assert_eq!(byte[0], i as u8); + assert_eq!(byte[1023], i as u8); + assert_eq!(timestamp, i as u64 * 100); + } + } +} diff --git a/src/disk/index.rs b/src/disk/index.rs index 3eba6c0..24688df 100644 --- a/src/disk/index.rs +++ b/src/disk/index.rs @@ -1,263 +1,424 @@ -use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; -use memmap::MmapMut; -use std::fs::{File, OpenOptions}; -use std::io::{self, Write}; -use std::path::{Path, PathBuf}; - -const POS_WIDTH: u64 = 8; -const LEN_WIDTH: u64 = 8; -const ENTRY_WIDTH: u64 = POS_WIDTH + LEN_WIDTH; - -pub struct Index { - base_offset: u64, +use std::{ + fs::{File, OpenOptions}, + io::{self, Write}, + mem::{transmute, MaybeUninit}, + path::Path, +}; + +use log::warn; + +/// Size of the offset of packet, in bytes. +const OFFSET_SIZE: u64 = 8; +/// Size of the len of packet, in bytes. +const LEN_SIZE: u64 = 8; +/// Size of timestamp appended to each entry, in bytes. +const TIMESTAMP_SIZE: u64 = 8; +/// Size of the hash of segment file, stored at the start of index file. +const HASH_SIZE: u64 = 32; +/// Size of entry, in bytes. +const ENTRY_SIZE: u64 = TIMESTAMP_SIZE + OFFSET_SIZE + LEN_SIZE; + +/// Wrapper around a index file for convenient reading of bytes sizes. +/// +/// Does **not** check any of the constraint enforced by user, or that the index being read from/ +/// written to is valid. Simply performs what asked. +/// +/// +///### Index file format +/// +///The index file starts with the 32-bytes hash of the segment file, followed by entries. Each +///entry consists of 3 u64s, [ timestamp | offset | len ]. +/// +/// #### Note +/// It is the duty of the handler of this struct to ensure index file's size does not exceed the +/// specified limit. +#[derive(Debug)] +pub(super) struct Index { + /// The opened index file. file: File, - mmap: MmapMut, - pub(crate) size: u64, - pub(crate) max_size: u64, + /// Number of entries in the index file. + entries: u64, + /// The timestamp at which the index file starts. + start_time: u64, + /// The timestamp at which the index file starts. + end_time: u64, } impl Index { - pub fn new>( - dir: P, - base_offset: u64, - max_size: u64, - active: bool, - ) -> io::Result { - let file_name = format!("{:020}.index", base_offset); - let file_path: PathBuf = dir.as_ref().join(file_name); - let verify = file_path.exists(); - - let file = OpenOptions::new() - .read(true) - .append(true) - .create(true) - .open(&file_path)?; - let metadata = file.metadata()?; - let size = metadata.len() as u64; - - // truncate and mmap the file - // Old segment indexes are properly closed which shrinks the size of index file form maximum size - // Old segment indexes are immutable and hence we freeze the size to file size. - // For active segments, we set the size to max to be able to append more segment information - if active { - file.set_len(max_size)?; + /// Open a new index file. Does not create a new one, and throws error if does not exist. If + /// the open file does not have any entries, the timestamps will be assumed to be 0 (measured + /// since `UNIX_EPOCH`) + /// + /// Note that index file is opened immutably. + #[inline] + pub(super) fn open>(path: P) -> io::Result<(Self, u64, u64)> { + let file = OpenOptions::new().read(true).open(path)?; + let len = file.metadata()?.len(); + + let entries = if len <= HASH_SIZE { + 0 } else { - file.set_len(size)?; - } + (file.metadata()?.len() - HASH_SIZE) / ENTRY_SIZE + }; - let mmap = unsafe { MmapMut::map_mut(&file)? }; - let index = Index { - base_offset, + let mut index = Self { file, - mmap, - size, - max_size, + entries, + start_time: 0, + end_time: 0, }; - if verify { - index.verify()?; + if entries == 0 { + warn!("empty index file opened"); + Ok((index, 0, 0)) + } else { + let [start_time, _, _] = index.read_with_timestamps(0)?; + let [end_time, _, _] = index.read_with_timestamps(entries - 1)?; + index.start_time = start_time; + index.end_time = end_time; + Ok((index, start_time, end_time)) } - - Ok(index) } - pub fn base_offset(&self) -> u64 { - self.base_offset - } + /// Create a new index file. Throws error if does not exist. The `info` vector has 2-tuples as + /// elements, whose 1st element is the length of the packet inserted in segment file, and 2nd + /// element is timestamp in format of time since epoch. The hash may be of any len, but only + /// starting 32 bytes will be taken. + /// + /// Note that index file is opened immutably, after writing the given data. + pub(super) fn new>( + path: P, + hash: &[u8], + info: Vec<(u64, u64)>, + ) -> io::Result { + let mut file = OpenOptions::new() + .read(true) + .write(true) + .create_new(true) + .open(path)?; + let tail = info.len() as u64; + let mut offset = 0; + + let (start_time, end_time) = if let Some((_, end_time)) = info.last() { + (info.first().unwrap().1, *end_time) + } else { + warn!("empty index file created"); + (0, 0) + }; - /// Number of entries - pub fn count(&self) -> u64 { - self.size / ENTRY_WIDTH + let entries: Vec = info + .into_iter() + .map(|(len, timestamp)| { + let ret = [timestamp, offset, len]; + offset += len; + // SAFETY: we will read back from file in exact same manner. as representation will + // remain same, we don't need to change the length of vec either. + unsafe { transmute::<[u64; 3], [u8; 24]>(ret) } + }) + .flatten() + .collect(); + file.write_all(&hash[..32])?; + file.write_all(&entries[..])?; + + Ok(Self { + file, + entries: tail, + start_time, + end_time, + }) } - /// Index files which aren't closed will contains zeros as the mmap file wouldn't be truncated - /// Treating these files as corrupted will free a lot of special case code in index and segment - /// Facilitates easier intuition of logic & segment appends won't return wrong offset due to - /// incorrect size. We can just do size based segment jumps and use ? for error handling - fn verify(&self) -> io::Result<()> { - let count = self.count(); - if count == 0 { - let e = format!("Index {} should contain some data", self.base_offset); - return Err(io::Error::new(io::ErrorKind::InvalidData, e)); - } + /// Return the number of entries in the index. + #[inline] + pub(super) fn entries(&self) -> u64 { + self.entries + } - let (position, len) = self.read(count - 1)?; - if position == 0 || len == 0 { - let e = format!( - "Index {} has trailing 0s. Index corrupted", - self.base_offset - ); - return Err(io::Error::new(io::ErrorKind::InvalidData, e)); - } + /// Get the timestamp of the first entry. + #[inline] + pub(super) fn head_time(&self) -> u64 { + self.start_time + } - Ok(()) + /// Get the timestamp of the last entry. + #[inline] + pub(super) fn tail_time(&self) -> u64 { + self.end_time } - pub fn write(&mut self, pos: u64, len: u64) -> io::Result<()> { - if self.size + ENTRY_WIDTH > self.max_size { - return Err(io::Error::new(io::ErrorKind::Other, "Index full")); - } + /// Read the hash stored in the index file, which is the starting 32 bytes of the file. + #[inline] + pub(super) fn read_hash(&self) -> io::Result<[u8; 32]> { + let mut buf: [u8; 32] = unsafe { MaybeUninit::uninit().assume_init() }; + self.read_at(&mut buf, 0)?; + Ok(buf) + } - let start = self.size as usize; - let end = start + POS_WIDTH as usize; - let mut buf = &mut self.mmap.as_mut()[start..end]; - buf.write_u64::(pos)?; + /// Get the offset, size of packet at the given index, using the index file. + #[inline] + pub(super) fn read(&self, index: u64) -> io::Result<[u64; 2]> { + // NOTE: out of length reads are handled by `Chunks::read`. + let mut buf: [u8; 16] = unsafe { MaybeUninit::uninit().assume_init() }; + self.read_at(&mut buf, HASH_SIZE + ENTRY_SIZE * index + TIMESTAMP_SIZE)?; + // SAFETY: we are reading the same number of bytes, and we write in exact same manner. + Ok(unsafe { transmute::<[u8; 16], [u64; 2]>(buf) }) + } - let start = end; - let end = start + LEN_WIDTH as usize; - let mut buf = &mut self.mmap.as_mut()[start..end]; - buf.write_u64::(len)?; + /// Get the timestamp, offset and the size of the packet at the given index, found using the + /// index file. + #[inline] + pub(super) fn read_with_timestamps(&self, index: u64) -> io::Result<[u64; 3]> { + // NOTE: out of length reads are handled by `Chunks::read_with_timestamps`. + let mut buf: [u8; 24] = unsafe { MaybeUninit::uninit().assume_init() }; + self.read_at(&mut buf, HASH_SIZE + ENTRY_SIZE * index)?; + // SAFETY: we are reading the same number of bytes, and we write in exact same manner. + Ok(unsafe { transmute::<[u8; 24], [u64; 3]>(buf) }) + } - self.size += ENTRY_WIDTH; - Ok(()) + /// Get a vector of 2-arrays which have the offset and the size of the `len` packets, starting + /// at the `index`. If `len` is larger than number of packets stored in segment, it will return + /// as the 2nd element of the return tuple the number of packets still left to read. + + #[inline] + pub(super) fn readv(&self, index: u64, len: u64) -> io::Result<(Vec<[u64; 2]>, u64)> { + // NOTE: out of length reads are handled by `Chunks::readv`. + self.readv_with_timestamps(index, len).map(|(v, left)| { + ( + v.into_iter() + .map(|reads| [reads[1], reads[2]]) + .collect::>(), + left, + ) + }) } - /// Reads an offset from the index and returns segment record's position and size - pub fn read(&self, offset: u64) -> io::Result<(u64, u64)> { - if self.size == 0 { - return Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "No entries in index", - )); + /// Get a vector of 3-arrays which have the timestamp, offset and size of the `len` packets, + /// starting at the `index`. If `len` is larger than number of packets stored in segment, it + /// will return as the 2nd element of the return tuple the number of packets still left to + /// read. + #[inline] + pub(super) fn readv_with_timestamps( + &self, + index: u64, + len: u64, + ) -> io::Result<(Vec<[u64; 3]>, u64)> { + // NOTE: out of length reads are handled by `Chunks::readv_with_timestamps`. + let limit = index + len; + let (left, len) = if limit > self.entries { + ( + limit - self.entries, + ((self.entries - index) * ENTRY_SIZE) as usize, + ) + } else { + (0, (len * ENTRY_SIZE) as usize) + }; + + let mut buf = Vec::with_capacity(len); + // SAFETY: we have already preallocated the capacity. needed so that `read_at` fills it + // completely with u8. + unsafe { + buf.set_len(len); } - // entry of the target offset - let entry_position = offset * ENTRY_WIDTH; + self.read_at(buf.as_mut(), HASH_SIZE + ENTRY_SIZE * index)?; - // reading at invalid postion from a file is implementation dependent. handle this explicitly - // https://doc.rust-lang.org/std/io/trait.Seek.html#tymethod.seek - if self.size < entry_position + ENTRY_WIDTH { - return Err(io::Error::new( - io::ErrorKind::UnexpectedEof, - "Reading at an invalid offset", - )); + // SAFETY: needed beacuse of transmute. As new transmuted type is of different length, we + // need to make sure the length stored in vec also matches. + unsafe { + buf.set_len(len / ENTRY_SIZE as usize); } - // read position - let start = entry_position as usize; - let end = start + POS_WIDTH as usize; - let mut buf = &self.mmap[start..end]; - let position = buf.read_u64::()?; + // SAFETY: we have written to disk in exact same manner. + Ok((unsafe { transmute::, Vec<[u64; 3]>>(buf) }, left)) + } - // read len - let start = end; - let end = start + LEN_WIDTH as usize; - let mut buf = &self.mmap[start..end]; - let len = buf.read_u64::()?; + /// Get the index that corresponds to the given timestamp, and if exact match is not found then + /// the entry with immediate next timestamp is returned. + #[inline] + pub(super) fn index_from_timestamp(&self, timestamp: u64) -> io::Result { + let file_contents: Vec = self + .readv_with_timestamps(0, self.entries())? + .0 + .into_iter() + .map(|entry| entry[0]) + .collect(); + + Ok(match file_contents.binary_search(×tamp) { + Ok(idx) => idx as u64, + Err(idx) => idx as u64, + }) + } - Ok((position, len)) + /// Checks whether the timestamp given is contained within the smallest and the largest + /// timestamps of the entries. Does **not** checks for exact match. + #[inline] + pub(super) fn is_timestamp_contained(&self, timestamp: u64) -> bool { + self.start_time <= timestamp && timestamp <= self.end_time } - /// Returns starting position, size required to fit 'n' records, n (count) - /// Total size of records might cross the provided boundary. Use returned size - /// for allocating the buffer - pub fn readv(&self, offset: u64, size: u64) -> io::Result<(u64, u64, u64)> { - let mut count = 0; - let mut current_size = 0; - let mut next_offset = offset; - - // read the first record and fail if there is a problem - let (pos, len) = self.read(offset)?; - let mut last_record_size = len; - let start_position = pos; - - // all the errors here are soft failures which are just used to break the loop - loop { - // size reached. include the last record even though it crosses boundary - current_size += last_record_size; - next_offset += 1; - count += 1; - if current_size >= size { - break; + #[allow(unused_mut)] + #[inline] + fn read_at(&self, mut buf: &mut [u8], mut offset: u64) -> io::Result<()> { + #[cfg(target_family = "unix")] + { + use std::os::unix::prelude::FileExt; + self.file.read_exact_at(buf, offset) + } + #[cfg(target_family = "windows")] + { + use std::os::windows::fs::FileExt; + while !buf.is_empty() { + match self.seek_read(buf, offset) { + Ok(0) => return Ok(()), + Ok(n) => { + buf = &mut buf[n..]; + offset += n as u64; + } + Err(e) => return Err(e), + } + } + if !buf.is_empty() { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )) + } else { + Ok(()) } - - // estimate size till the previous record by reading current offset - let (_, len) = match self.read(next_offset) { - Ok(v) => v, - Err(_e) => break, - }; - - last_record_size = len; } - - Ok((start_position, current_size, count)) - } - - pub fn close(&mut self) -> io::Result<()> { - self.mmap.flush()?; - self.file.flush()?; - self.file.set_len(self.size)?; - Ok(()) } } #[cfg(test)] mod test { - use super::Index; + use pretty_assertions::assert_eq; use tempfile::tempdir; - fn write_entries(index: &mut Index, entries: Vec<(u64, u64)>) { - for (offset, (position, len)) in entries.into_iter().enumerate() { - let offset = offset as u64; - index.write(position, len).unwrap(); - let (p, l) = index.read(offset).unwrap(); - assert_eq!(len, l); - assert_eq!(position, p); + use super::*; + + #[test] + fn new_and_read_index() { + let dir = tempdir().unwrap(); + + #[rustfmt::skip] + let index = Index::new( + dir.path().join(format!("{:020}", 2).as_str()), + &[2; 32], + vec![(100, 1), (100, 2), (100, 3), (100, 4), (100, 5), (100, 6), (100, 7), (100, 8), (100, 9), (100, 10), + (200, 11), (200, 12), (200, 13), (200, 14), (200, 15), (200, 16), (200, 17), (200, 18), (200, 19), (200, 20),] + ).unwrap(); + + assert_eq!(index.entries(), 20); + assert_eq!(index.read(9).unwrap(), [900, 100]); + assert_eq!(index.read(19).unwrap(), [2800, 200]); + assert_eq!(index.read_hash().unwrap(), [2; 32]); + + let (v, _) = index.readv_with_timestamps(0, 20).unwrap(); + for i in 0..10 { + assert_eq!(v[i][0] as usize, (i + 1)); // timestamp + assert_eq!(v[i][1] as usize, 100 * i); // offset + assert_eq!(v[i][2], 100); // len + } + for i in 10..20 { + assert_eq!(v[i][0] as usize, (i + 1)); // timestamp + assert_eq!(v[i][1] as usize, 1000 + 200 * (i - 10)); // offset + assert_eq!(v[i][2], 200); // len } } #[test] - fn write_and_read_works_as_expected() { - let path = tempdir().unwrap(); + fn open_and_read_index() { + let dir = tempdir().unwrap(); + + #[rustfmt::skip] + let index = Index::new( + dir.path().join(format!("{:020}", 2).as_str()), + &[2; 32], + vec![(100, 1), (100, 2), (100, 3), (100, 4), (100, 5), (100, 6), (100, 7), (100, 8), (100, 9), (100, 10), + (200, 11), (200, 12), (200, 13), (200, 14), (200, 15), (200, 16), (200, 17), (200, 18), (200, 19), (200, 20),] + ).unwrap(); + + assert_eq!(index.entries(), 20); + assert_eq!(index.read_hash().unwrap(), [2; 32]); + + drop(index); + + let (index, start_time, end_time) = + Index::open(dir.path().join(format!("{:020}", 2).as_str())).unwrap(); + assert_eq!(index.read(19).unwrap(), [2800, 200]); + assert_eq!(index.read_hash().unwrap(), [2; 32]); + assert_eq!(start_time, 1); + assert_eq!(end_time, 20); + + let (v, _) = index.readv_with_timestamps(0, 20).unwrap(); + for i in 0..10 { + assert_eq!(v[i][0] as usize, (i + 1)); // timestamp + assert_eq!(v[i][1] as usize, 100 * i); // offset + assert_eq!(v[i][2], 100); // len + } + for i in 10..20 { + assert_eq!(v[i][0] as usize, (i + 1)); // timestamp + assert_eq!(v[i][1] as usize, 1000 + 200 * (i - 10)); // offset + assert_eq!(v[i][2], 200); // len + } + } - // create a new index - let mut index = Index::new(&path, 0, 1024, true).unwrap(); + #[test] + fn open_empty_index() { + let dir = tempdir().unwrap(); + + // creating an empty index file. + let index_path = dir.path().join(format!("{:020}", 10).as_str()); + std::fs::OpenOptions::new() + .write(true) + .create_new(true) + .open(index_path.clone()) + .unwrap(); + + let (index, start_time, end_time) = Index::open(index_path).unwrap(); + assert_eq!(start_time, 0); + assert_eq!(end_time, 0); assert!(index.read(0).is_err()); - - let entries = vec![(0, 100), (100, 100), (200, 600), (800, 200), (1000, 100)]; - - // write and read index - write_entries(&mut index, entries.clone()); - assert!(index.read(5).is_err()); - index.close().unwrap(); - - // try reading from an offset which is out of boundary - let out_of_boundary_offset = entries.len() as u64; - let o = index.read(out_of_boundary_offset); - assert!(o.is_err()); - - // reconstruct the index from file - let index = Index::new(path, 0, 1024, true).unwrap(); - let offset = index.count() - 1; - assert_eq!(offset, 4); + assert!(index.read(1).is_err()); } #[test] - fn bulk_reads_work_as_expected() { - let path = tempdir().unwrap(); - - // create a new index - let mut index = Index::new(&path, 0, 1024, true).unwrap(); + fn new_and_open_index() { + let dir = tempdir().unwrap(); + + let index_path = dir.path().join(format!("{:020}", 10).as_str()); + let index = Index::new( + index_path.clone(), + b"12345678123456781234567812345678", + vec![], + ) + .unwrap(); + assert_eq!(index.start_time, 0); + assert_eq!(index.end_time, 0); + drop(index); + + let (index, _, _) = Index::open(index_path).unwrap(); + assert_eq!(index.head_time(), 0); + assert_eq!(index.tail_time(), 0); + assert_eq!(&index.read_hash().unwrap(), b"12345678123456781234567812345678"); assert!(index.read(0).is_err()); + assert!(index.read(1).is_err()); + } - // vectors (position, size of the record) - let entries = vec![(0, 100), (100, 100), (200, 600), (800, 200), (1000, 100)]; - write_entries(&mut index, entries); - index.close().unwrap(); - - let (position, size, count) = index.readv(1, 1024).unwrap(); - assert_eq!(position, 100); - assert_eq!(size, 1000); - assert_eq!(count, 4); - - let entries = vec![(0, 100), (100, 100), (200, 600), (800, 200), (1000, 100)]; - write_entries(&mut index, entries); - index.close().unwrap(); - - // read less than size of a single record - let (offset, size, count) = index.readv(2, 100).unwrap(); - assert_eq!(offset, 200); - assert_eq!(size, 600); - assert_eq!(count, 1); + #[test] + fn test_index_from_timestamps() { + let dir = tempdir().unwrap(); + + #[rustfmt::skip] + let index = Index::new( + dir.path().join(format!("{:020}", 2).as_str()), + &[2; 32], + vec![(100, 10), (100, 20), (100, 30), (100, 40), (100, 50), (100, 60), (100, 70), (100, 80), (100, 90), (100, 100), + (200, 110), (200, 120), (200, 130), (200, 140), (200, 150), (200, 160), (200, 170), (200, 180), (200, 190), (200, 200),] + ).unwrap(); + + for i in 0..20 { + assert_eq!(index.index_from_timestamp(i * 10 + 5).unwrap(), i); + } } } diff --git a/src/disk/mod.rs b/src/disk/mod.rs index 812c099..a46ca31 100644 --- a/src/disk/mod.rs +++ b/src/disk/mod.rs @@ -1,565 +1,628 @@ -pub mod index; -pub mod segment; - -use index::Index; -use segment::Segment; - -use std::collections::HashMap; -use std::fs; -use std::io; -use std::path::PathBuf; - -struct Chunk { - index: Index, - segment: Segment, +use std::{ + fs, io, + path::{Path, PathBuf}, +}; + +use bytes::Bytes; +use fnv::FnvHashMap; +use sha2::{Digest, Sha256}; + +mod chunk; +mod index; +mod segment; + +use chunk::Chunk; + +/// A wrapper around all index and segment files on the disk. +#[allow(dead_code)] +pub(super) struct DiskHandler { + /// Hashmap for file handlers of index and segment files. + chunks: FnvHashMap, + /// Directory in which to store files in. + dir: PathBuf, + /// Starting index of segment files. + head: u64, + /// Ending index of segment files. + tail: u64, + /// Starting timestamp of files. + head_time: u64, + /// Ending timestamp of files. + tail_time: u64, + /// Invalid files. + invalid_files: Vec, + /// The hasher for segment files + hasher: Sha256, } -pub struct DiskLog { - dir: PathBuf, - max_segment_size: u64, - max_index_size: u64, - base_offsets: Vec, - max_segments: usize, - active_chunk: u64, - chunks: HashMap, +/// Enum which specifies all sort of invalid cases that can occur when reading segment-index pair +/// from the directory provided. +#[allow(dead_code)] +#[derive(Debug, Clone)] +pub enum InvalidType { + /// The name of the file is invalid. The file can be an index file or segment file, or maybe we + /// can not parse it's `file_stem` as u64. + InvalidName(PathBuf), + /// There is no index for the given index, but there is a segment file. + NoIndex(u64), + /// There is no segment file for the given index, but there is an index file. + NoSegment(u64), + /// The hash from index file does not match that which we get after hashing the segment file. + InvalidChecksum(u64), } -impl DiskLog { - pub fn new>( - dir: P, - max_index_size: u64, - max_segment_size: u64, - max_segments: usize, - ) -> io::Result { - let dir = dir.into(); - let _ = fs::create_dir_all(&dir); - if max_segment_size < 1024 || max_index_size < 100 { - panic!("size should be at least 1KB") +//TODO: Review all unwraps +impl DiskHandler { + /// Create a new disk handler. Reads the given directory for previously existing index-segment + /// pairs, and stores all the invalid files (see [`crate::disk::InvalidType`]) in a vector + /// which can be retrieved via [`DiskHandler::invalid_files`]. It also returns the index at + /// which the next segment should be inserted onto the disk, which also corresponds to index at + /// which segments should start from in the memory. + pub(super) fn new>(dir: P) -> io::Result<(u64, Self)> { + struct FileStatus { + index_found: bool, + segment_found: bool, } + // creating and reading given dir + let _ = fs::create_dir_all(&dir)?; let files = fs::read_dir(&dir)?; - let mut base_offsets = Vec::new(); + + let mut indices = Vec::new(); + let mut statuses: FnvHashMap = FnvHashMap::default(); + let mut invalid_files = Vec::new(); + let mut hasher = Sha256::new(); + for file in files { let path = file?.path(); - let offset = path.file_stem().unwrap().to_str().unwrap(); - let offset = offset.parse::().unwrap(); - base_offsets.push(offset); - } - base_offsets.sort(); - let mut chunks = HashMap::new(); + let file_index = match path.file_stem() { + // TODO: is this unwrap fine? + Some(s) => s.to_str().unwrap(), + None => { + invalid_files.push(InvalidType::InvalidName(path)); + continue; + } + }; - let active_segment = if let Some((last_offset, offsets)) = base_offsets.split_last() { - // Initialized filled segments - for base_offset in offsets.iter() { - let index = Index::new(&dir, *base_offset, max_index_size, false)?; - let segment = Segment::new(&dir, *base_offset)?; - let chunk = Chunk { index, segment }; - chunks.insert(*base_offset, chunk); + let offset = match file_index.parse::() { + Ok(n) => n, + Err(_) => { + invalid_files.push(InvalidType::InvalidName(path)); + continue; + } + }; + + // TODO: is this unwrap fine? + match path.extension().map(|s| s.to_str().unwrap()) { + Some("index") => { + if let Some(status) = statuses.get_mut(&offset) { + status.index_found = true; + } else { + statuses.insert( + offset, + FileStatus { + index_found: true, + segment_found: false, + }, + ); + } + } + Some("segment") => { + if let Some(status) = statuses.get_mut(&offset) { + status.segment_found = true; + } else { + statuses.insert( + offset, + FileStatus { + index_found: false, + segment_found: true, + }, + ); + } + } + _ => invalid_files.push(InvalidType::InvalidName(path)), } - // Initialize active segment - let index = Index::new(&dir, *last_offset, max_index_size, true)?; - let segment = Segment::new(&dir, *last_offset)?; - let mut chunk = Chunk { index, segment }; - - // Wrong counts due to unclosed segments are handled during initialization. We can just assume - // count is always right from here on - let next_offset = chunk.index.count(); - chunk.segment.set_next_offset(next_offset); - chunks.insert(*last_offset, chunk); - *last_offset + indices.push(offset); + } + + // getting the head and tail + indices.sort_unstable(); + let (inmemory_head, head, tail) = if let Some(tail) = indices.last() { + // unwrap fine as if last exists then first exists as well, even if they are the same + (*tail + 1, *indices.first().unwrap(), *tail) } else { - let index = Index::new(&dir, 0, max_index_size, true)?; - let segment = Segment::new(&dir, 0)?; - let chunk = Chunk { index, segment }; - chunks.insert(0, chunk); - base_offsets.push(0); - 0 + (0, 0, 0) }; - let log = DiskLog { - dir, - max_segment_size, - max_index_size, - max_segments, - base_offsets, - chunks, - active_chunk: active_segment, - }; + let mut start_time = 0; + let mut end_time = 0; + + // opening valid files, sorting the invalid ones + let mut chunks = FnvHashMap::default(); + for ( + index, + FileStatus { + index_found, + segment_found, + }, + ) in statuses.into_iter() + { + if !index_found { + invalid_files.push(InvalidType::NoIndex(index)); + } else if !segment_found { + invalid_files.push(InvalidType::NoSegment(index)); + } else { + let (chunk, chunk_start_time, chunk_end_time) = Chunk::open(&dir, index)?; + if !chunk.verify(&mut hasher)? { + invalid_files.push(InvalidType::InvalidChecksum(index)) + } else { + chunks.insert(index, chunk); + } + + if chunk_start_time < start_time { + start_time = chunk_start_time; + } + if chunk_end_time < end_time { + end_time = chunk_end_time; + } + } + } + + Ok(( + inmemory_head, + Self { + chunks, + dir: dir.as_ref().into(), + head, + tail, + head_time: start_time, + tail_time: end_time, + invalid_files, + hasher, + }, + )) + } + + /// Get the index of segment-index pair on the disk with lowest index. + #[allow(dead_code)] + #[inline] + pub(super) fn head(&self) -> u64 { + self.head + } - Ok(log) + /// Get the index of segment-index pair on the disk with highest index. + #[allow(dead_code)] + #[inline] + pub(super) fn tail(&self) -> u64 { + self.tail } - pub fn append(&mut self, record: &[u8]) -> io::Result<()> { - let active_chunk = if let Some(v) = self.chunks.get_mut(&self.active_chunk) { - v + /// Returns the total number of segments. + #[inline] + pub(super) fn len(&self) -> u64 { + self.chunks.len() as u64 + } + + /// Retrieve the invalid files (see [`crate::disk::InvalidType`]). + #[allow(dead_code)] + #[inline] + pub(super) fn invalid_files(&self) -> &Vec { + &self.invalid_files + } + + // /// Returns the number of entries for a particular segment. + // #[inline] + // pub(super) fn len_at(&self, index: u64) -> io::Result { + // Ok(self.chunks.get(&index).ok_or(io::Error::new( + // io::ErrorKind::Other, + // "No elemt at the given index", + // ))?.entries()) + // } + + /// Read a single packet from given offset in segment at given index. + #[inline] + pub(super) fn read(&self, index: u64, offset: u64) -> io::Result> { + if let Some(chunk) = self.chunks.get(&index) { + chunk.read(offset) } else { - return Err(io::Error::new(io::ErrorKind::Other, "No active segment")); - }; + Ok(None) + } + } - if active_chunk.segment.size() >= self.max_segment_size { - active_chunk.segment.close()?; - active_chunk.index.close()?; - - // update active chunk - let base_offset = active_chunk.index.base_offset() + active_chunk.index.count(); - let index = Index::new(&self.dir, base_offset, self.max_index_size, true)?; - let segment = Segment::new(&self.dir, base_offset)?; - let chunk = Chunk { index, segment }; - self.chunks.insert(base_offset, chunk); - self.base_offsets.push(base_offset); - self.active_chunk = base_offset; - - if self.base_offsets.len() > self.max_segments { - let remove_offset = self.base_offsets.remove(0); - self.remove(remove_offset)?; + #[inline] + pub(super) fn read_with_timestamps( + &self, + index: u64, + offset: u64, + ) -> io::Result> { + if let Some(chunk) = self.chunks.get(&index) { + chunk.read_with_timestamps(offset) + } else { + Ok(None) + } + } + + #[inline] + pub(super) fn index_from_timestamp(&self, timestamp: u64) -> io::Result<(u64, u64)> { + for (idx, chunk) in self.chunks.iter() { + if chunk.is_timestamp_contained(timestamp) { + return Ok((*idx, chunk.index_from_timestamp(timestamp)?)); } } + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("timestamp {} not contained by any segment", timestamp).as_str(), + )); + } - // write record to segment and index - let active_chunk = self.chunks.get_mut(&self.active_chunk).unwrap(); - let (_, position) = active_chunk.segment.append(record)?; - active_chunk.index.write(position, record.len() as u64)?; - Ok(()) + #[inline] + pub(super) fn is_timestamp_contained(&self, timestamp: u64) -> bool { + self.head_time <= timestamp && timestamp <= self.tail_time } - /// Read a record from correct segment - /// Returns data, next base offset and relative offset - pub fn read(&mut self, base_offset: u64, offset: u64) -> io::Result> { - let chunk = match self.chunks.get_mut(&base_offset) { - Some(segment) => segment, - None => { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid segment", - )) - } - }; + #[allow(dead_code)] + #[inline] + pub(super) fn head_time(&self) -> u64 { + self.head_time + } - let (position, len) = chunk.index.read(offset)?; - let mut payload = vec![0; len as usize]; - chunk.segment.read(position, &mut payload)?; - Ok(payload) + #[allow(dead_code)] + #[cfg(test)] + #[inline] + pub(super) fn tail_time(&self) -> u64 { + self.tail_time } - /// Goes through index and returns chunks which tell how to sweep segments to collect - /// necessary amount on data asked by the user - /// Corner cases: - /// When there is more data (in other segments) current eof should move to next segment - /// Empty segments are possible after moving to next segment - /// EOFs after some data is collected are not errors - fn indexv(&self, base_offset: u64, relative_offset: u64, size: u64) -> io::Result { - let mut chunks = Chunks { - base_offset, - relative_offset, - count: 0, - size: 0, - chunks: Vec::new(), + /// Read `len` packets, starting from the given offset in segment at given index. Does not care + /// about segment boundaries, and will keep on reading until length is met or we run out of + /// packets. Returns the number of packets left to read (which can be 0), but were not found, + /// and the index of next segment if exists. + #[inline] + pub(super) fn readv( + &self, + index: u64, + offset: u64, + len: u64, + out: &mut Vec, + ) -> io::Result)>> { + let chunk = if let Some(disk_segment) = self.chunks.get(&index) { + disk_segment + } else { + return Ok(None); + }; + let mut left = match chunk.readv(offset, len, out)? { + Some(ret) => ret, + None => return Ok(None), }; - loop { - // Get the chunk with given base offset - let chunk = match self.chunks.get(&chunks.base_offset) { - Some(c) => c, - None if chunks.count == 0 => { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "Invalid segment", - )) - } - None => break, - }; - - // If next relative offset is equal to index count => We've crossed the boundary - // NOTE: We are assuming the index file was closed properly. `index.count()` will - // count `unfilled zeros` due to mmap `set_len` if it was not closed properly - // FIXME for chunks with index which isn't closed properly, relative offset will - // FIXME be less than count but `readv` is going to return EOF - // Reads on indexes which aren't closed properly result in `EOF` when they encounter 0 length record as the mmaped - // segment isn't truncated. Index read goes past the actual size as the size calculation of the next boot is wrong. - // This block covers both usual EOFs during normal operations as well as EOFs due to unclosed index - // EOF due to unclosed index is a warning though - if chunks.relative_offset >= chunk.index.count() { - // break if we are already at the tail segment - if chunks.base_offset == *self.base_offsets.last().unwrap() { - chunks.relative_offset -= 1; - break; + let mut segment_idx = index; + + if left == 0 { + // if no more packets left in `chunk`, move onto next + if offset + len >= chunk.entries() { + segment_idx += 1; + while self.chunks.get(&segment_idx).is_none() { + segment_idx += 1; + if segment_idx > self.tail { + return Ok(Some((left, None))); + } } - - // we use 'total offsets' to go next segment. this remains same during subsequent - // tail reads if there are no appends. hence the above early return - chunks.base_offset = chunk.index.base_offset() + chunk.index.count(); - chunks.relative_offset = 0; - continue; } - // Get what to read from the segment and fill the buffer. Covers the case where the logic has just moved to next - // segment and the segment is empty - let read_size = size - chunks.size; - let (position, payload_size, count) = - chunk.index.readv(chunks.relative_offset, read_size)?; - chunks.relative_offset += count; - chunks.count += count; - chunks.size += payload_size; - chunks - .chunks - .push((chunks.base_offset, position, payload_size, count)); - if chunks.size >= size { - chunks.relative_offset -= 1; - break; - } + return Ok(Some((0, Some(segment_idx as u64)))); } - Ok(chunks) - } + while left > 0 { + segment_idx += 1; + while self.chunks.get(&segment_idx).is_none() { + segment_idx += 1; + if segment_idx > self.tail { + return Ok(Some((left, None))); + } + } - /// Reads multiple packets from the disk and return base offset and relative offset of the - /// Returns base offset, relative offset of the last record along with number of messages and count - /// Goes to next segment when relative off set crosses boundary - pub fn readv( - &mut self, - base_offset: u64, - relative_offset: u64, - size: u64, - ) -> io::Result<(u64, u64, u64, Vec)> { - let chunks = self.indexv(base_offset, relative_offset, size)?; - - // Fill the pre-allocated buffer - let mut out = vec![0; chunks.size as usize]; - let mut start = 0; - for c in chunks.chunks { - let chunk = match self.chunks.get_mut(&c.0) { - Some(c) => c, - None => break, + // unwrap fine as we already validated the index in the while loop + left = match self.chunks.get(&segment_idx).unwrap().readv(0, left, out)? { + Some(ret) => ret, + None => return Ok(None), }; - - let position = c.1; - let payload_size = c.2; - chunk - .segment - .read(position, &mut out[start..start + payload_size as usize])?; - start += payload_size as usize; } - Ok(( - chunks.base_offset, - chunks.relative_offset, - chunks.count, - out, - )) + Ok(Some((0, Some(segment_idx)))) + + // There are three possible cases for return of Ok(Some(_)): + // 1.) len = 0, next = Some(_) + // => we still have segment left to read, but len reached + // 2.) len = 0, next = None + // => len reached but no more segments, we were just able to fill it + // 3.) len > 0, next = None + // => let left, but we ran out of segments } - pub fn close(&mut self, base_offset: u64) -> io::Result<()> { - if let Some(chunk) = self.chunks.get_mut(&base_offset) { - chunk.index.close()?; - chunk.segment.close()?; - } + /// Read `len` packets, starting from the given offset in segment at given index. Does not care + /// about segment boundaries, and will keep on reading until length is met or we run out of + /// packets. Returns the number of packets left to read (which can be 0), but were not found, + /// and the index of next segment if exists. + #[inline] + pub(super) fn readv_with_timestamps( + &self, + index: u64, + offset: u64, + len: u64, + out: &mut Vec<(Bytes, u64)>, + ) -> io::Result)>> { + let chunk = if let Some(chunk) = self.chunks.get(&index) { + chunk + } else { + return Ok(None); + }; + let mut left = match chunk.readv_with_timestamps(offset, len, out)? { + Some(ret) => ret, + None => return Ok(None), + }; - Ok(()) - } + let mut segment_idx = index; + + if left == 0 { + // if no more packets left in `chunk`, move onto next + if offset + len >= chunk.entries() { + segment_idx += 1; + while self.chunks.get(&segment_idx).is_none() { + segment_idx += 1; + if segment_idx > self.tail { + return Ok(Some((left, None))); + } + } + } - // Removes segment with given base offset from the disk and the system - pub fn remove(&mut self, base_offset: u64) -> io::Result<()> { - if let Some(mut chunk) = self.chunks.remove(&base_offset) { - chunk.segment.close()?; + return Ok(Some((0, Some(segment_idx as u64)))); + } - let file: PathBuf = self.dir.clone(); - let index_file_name = format!("{:020}.index", base_offset); - let segment_file_name = format!("{:020}.segment", base_offset); + while left > 0 { + segment_idx += 1; + while self.chunks.get(&segment_idx).is_none() { + segment_idx += 1; + if segment_idx > self.tail { + return Ok(Some((left, None))); + } + } - // dbg!(file.join(&index_file_name)); - fs::remove_file(file.join(index_file_name))?; - fs::remove_file(file.join(segment_file_name))?; + // unwrap fine as we already validated the index in the while loop + left = match self + .chunks + .get(&segment_idx) + .unwrap() + .readv_with_timestamps(0, left, out)? + { + Some(ret) => ret, + None => return Ok(None), + }; } - Ok(()) + Ok(Some((0, Some(segment_idx)))) + + // There are three possible cases for return of Ok(Some(_)): + // 1.) len = 0, next = Some(_) + // => we still have segment left to read, but len reached + // 2.) len = 0, next = None + // => len reached but no more segments, we were just able to fill it + // 3.) len > 0, next = None + // => let left, but we ran out of segments } - pub fn close_all(&mut self) -> io::Result<()> { - for (_, chunk) in self.chunks.iter_mut() { - chunk.index.close()?; - chunk.segment.close()?; + /// Store a vector of bytes to the disk. Returns offset at which bytes were appended to the + /// segment at the given index. + #[inline] + pub(super) fn insert(&mut self, index: u64, data: Vec<(Bytes, u64)>) -> io::Result<()> { + let chunk = Chunk::new(&self.dir, index, data, &mut self.hasher)?; + + if chunk.tail_time() > self.tail_time { + self.tail_time = chunk.tail_time(); + } + if chunk.head_time() < self.head_time { + self.head_time = chunk.head_time(); } - Ok(()) - } + self.chunks.insert(index, chunk); - pub fn remove_all(&mut self) -> io::Result<()> { - self.close_all()?; - fs::remove_dir(&self.dir)?; + if index > self.tail { + self.tail = index; + } Ok(()) } } -/// Captured state while sweeping indexes collect a bulk of records -/// from segment/segments -/// TODO: 'chunks' vector arguments aren't readable -struct Chunks { - base_offset: u64, - relative_offset: u64, - count: u64, - size: u64, - chunks: Vec<(u64, u64, u64, u64)>, -} - #[cfg(test)] mod test { - use super::DiskLog; + use bytes::Bytes; use pretty_assertions::assert_eq; - use std::io; + use tempfile::tempdir; - #[test] - fn append_creates_and_deletes_segments_correctly() { - let dir = tempfile::tempdir().unwrap(); - let dir = dir.path(); - - let record_count = 100; - let max_index_size = record_count * 16; - let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap(); - let mut payload = vec![0u8; 1024]; - - // 200 1K iterations. 20 files ignoring deletes. 0.segment, 10.segment .... 199.segment - // considering deletes -> 110.segment .. 200.segment - for i in 0..200 { - payload[0] = i; - log.append(&payload).unwrap(); - } - - // Semi fill 200.segment - for i in 200..205 { - payload[0] = i; - log.append(&payload).unwrap(); - } + use super::*; + use crate::test::{random_packets_as_bytes, verify_bytes_as_random_packets}; - let data = log.read(10, 0); - match data { - Err(e) if e.kind() == io::ErrorKind::InvalidInput => (), - _ => panic!("Expecting an invalid input error"), - }; - - // read segment with base offset 110 - let base_offset = 110; - for i in 0..10 { - let data = log.read(base_offset, i).unwrap(); - let d = (base_offset + i) as u8; - assert_eq!(data[0], d); + #[test] + fn push_and_read_handler() { + let dir = tempdir().unwrap(); + let (_, mut handler) = DiskHandler::new(dir.path()).unwrap(); + let (ranpack_bytes, _) = random_packets_as_bytes(); + + // results in: + // - ( 0, [len] * 1 packets) + // - ( 1, [len] * 2 packets) + // ... + // - (19, [len] * 20 packets) + // + // where [len] = ranpack_bytes.len() + for i in 0..20 { + let mut v = Vec::with_capacity((i + 1) * ranpack_bytes.len()); + for _ in 0..=i { + v.extend( + ranpack_bytes + .clone() + .into_iter() + .map(|x| (x, i as u64 * 100)), + ); + } + handler.insert(i as u64, v).unwrap(); } - // read segment with base offset 190 - let base_offset = 110; - for i in 0..10 { - let data = log.read(base_offset, i).unwrap(); - let d = (base_offset + i) as u8; - assert_eq!(data[0], d); + for i in 0..20 { + let mut v = Vec::new(); + handler + .readv(i, 0, ranpack_bytes.len() as u64 * (i + 1), &mut v) + .unwrap(); + for _ in 0..=i { + let u = v.split_off(ranpack_bytes.len()); + verify_bytes_as_random_packets(u, ranpack_bytes.len()); + } } - // read 200.segment which is semi filled with 5 records - let base_offset = 200; - for i in 0..5 { - let data = log.read(base_offset, i).unwrap(); - let d = (base_offset + i) as u8; - assert_eq!(data[0], d); + for i in 0..20 { + let mut v = Vec::new(); + handler + .readv_with_timestamps(i, 0, ranpack_bytes.len() as u64 * (i + 1), &mut v) + .unwrap(); + for elem in v { + assert_eq!(elem.1, i * 100); + } } - let data = log.read(base_offset, 5); - match data { - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => (), - _ => panic!("Expecting end of file error"), - }; + let mut v = Vec::new(); + assert_eq!(handler.readv(20, 0, 1, &mut v).unwrap(), None); + let mut v = Vec::new(); + assert_eq!(handler.readv_with_timestamps(20, 0, 1, &mut v).unwrap(), None); } #[test] - fn multi_segment_reads_work_as_expected() { - let dir = tempfile::tempdir().unwrap(); - let dir = dir.path(); - - // 100K bytes - let record_count = 100; - let record_size = 1 * 1024; - - // 10 records per segment. 10 segments (0.segment - 90.segment) - let max_segment_size = 10 * 1024; - let max_index_size = record_count * 16; - let mut log = DiskLog::new(dir, max_index_size, max_segment_size, 100).unwrap(); - - // 100 1K iterations. 10 files ignoring deletes. - // 0.segment (data with 0 - 9), 10.segment (10 - 19) .... 90.segment (0 size) - // 10K per file - let mut payload = vec![0u8; record_size]; - for i in 0..record_count { - payload[0] = i as u8; - log.append(&payload).unwrap(); + fn push_and_read_handler_after_drop() { + let dir = tempdir().unwrap(); + let (_, mut handler) = DiskHandler::new(dir.path()).unwrap(); + let (ranpack_bytes, _) = random_packets_as_bytes(); + + // results in: + // - ( 0, [len] * 1 packets) + // - ( 1, [len] * 2 packets) + // ... + // - (19, [len] * 20 packets) + // + // where [len] = ranpack_bytes.len() + for i in 0..20 { + let mut v = Vec::with_capacity((i + 1) * ranpack_bytes.len()); + for _ in 0..=i { + v.extend( + ranpack_bytes + .clone() + .into_iter() + .map(|x| (x, i as u64 * 100)), + ); + } + handler.insert(i as u64, v).unwrap(); } - // Read all the segments - let base_offset = 0; - for i in 0..10 { - let data = log.read(base_offset, i).unwrap(); - let d = (base_offset + i) as u8; - assert_eq!(data[0], d); + drop(handler); + + let (_, handler) = DiskHandler::new(dir.path()).unwrap(); + for i in 0..20 { + let mut v = Vec::new(); + handler + .readv(i, 0, ranpack_bytes.len() as u64 * (i + 1), &mut v) + .unwrap(); + for _ in 0..=i { + let u = v.split_off(ranpack_bytes.len()); + verify_bytes_as_random_packets(u, ranpack_bytes.len()); + } } - } - #[test] - fn vectored_read_works_as_expected() { - let dir = tempfile::tempdir().unwrap(); - let dir = dir.path(); - - let record_count = 100; - let max_index_size = record_count * 16; - let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap(); - - // 90 1K iterations. 10 files ignoring deletes. - // 0.segment (data with 0 - 9), 10.segment (10 - 19) .... 90.segment (0 size) - // 10K per file - let mut payload = vec![0u8; 1024]; - for i in 0..90 { - payload[0] = i; - log.append(&payload).unwrap(); + for i in 0..20 { + let mut v = Vec::new(); + handler + .readv_with_timestamps(i, 0, ranpack_bytes.len() as u64 * (i + 1), &mut v) + .unwrap(); + for elem in v { + assert_eq!(elem.1, i * 100); + } } - - // Read 50K. Reads 0.segment - 4.segment - let (base_offset, relative_offset, count, data) = log.readv(0, 0, 50 * 1024).unwrap(); - assert_eq!(base_offset, 40); - assert_eq!(relative_offset, 9); - assert_eq!(count, 50); - - let total_size = data.len(); - assert_eq!(total_size, 50 * 1024); - - // Read 50.segment offset 0 - let data = log.read(50, 0).unwrap(); - assert_eq!(data[0], 50); } #[test] - fn vectored_reads_in_different_boots_works_as_expected() { - let dir = tempfile::tempdir().unwrap(); - let dir = dir.path(); - - let record_count = 100; - let max_index_size = record_count * 16; - let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap(); - - // 100 1K iterations. 10 files - // 0.segment (data with 0 - 9), 10.segment (10 - 19) .... 90.segment (90 - 99) - // 10K per file - let mut payload: Vec = vec![0u8; 1024]; - for i in 0..100 { - payload[0] = i; - log.append(&payload).unwrap(); + fn read_handler_from_returned_index() { + let dir = tempdir().unwrap(); + let (_, mut handler) = DiskHandler::new(dir.path()).unwrap(); + let (ranpack_bytes, _) = random_packets_as_bytes(); + + // results in: + // - ( 0, [len] * 1 packets) + // - ( 1, [len] * 2 packets) + // ... + // - (14, [len] * 15 packets) + // + // where [len] = ranpack_bytes.len() + for i in 0..15 { + let mut v = Vec::with_capacity((i + 1) * ranpack_bytes.len()); + for _ in 0..=i { + v.extend( + ranpack_bytes + .clone() + .into_iter() + .map(|x| (x, i as u64 * 100)), + ); + } + handler.insert(i as u64, v).unwrap(); } - log.close_all().unwrap(); - - // Boot 2. Read 50K. Reads 0.segment - 4.segment - let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap(); - let (base_offset, relative_offset, count, data) = log.readv(0, 0, 50 * 1024).unwrap(); - assert_eq!(base_offset, 40); - assert_eq!(relative_offset, 9); - assert_eq!(count, 50); - - for i in 0..count { - let start = i as usize * 1024; - let end = start + 1024; - let record = &data[start..end]; - assert_eq!(record[0], i as u8); + let mut v = Vec::new(); + let (mut left, mut ret) = handler.readv(0, 0, 10, &mut v).unwrap().unwrap(); + verify_bytes_as_random_packets(v, 10); + let mut offset = 0; + let mut v: Vec = Vec::new(); + + while let Some(seg) = ret { + v.clear(); + offset = if left > 0 { 0 } else { offset + 10 }; + let (new_left, new_ret) = handler.readv(seg, offset, 10, &mut v).unwrap().unwrap(); + left = new_left; + ret = new_ret; } - let total_size = data.len(); - assert_eq!(total_size, 50 * 1024); - - // Read 50.segment offset 0 - let data = log.read(50, 0).unwrap(); - assert_eq!(data[0], 50); - } - - #[test] - fn vectored_reads_on_unclosed_index_and_segment_works_as_expected() { - let dir = tempfile::tempdir().unwrap(); - let dir = dir.path(); - - // 15K bytes - let record_count = 15; - let record_size = 1 * 1024; - - let max_segment_size = 10 * 1024; - let max_index_size = record_count * 16; - let mut log = DiskLog::new(dir, max_index_size, max_segment_size, 100).unwrap(); - - // 10 records per segment. 2 segments. 0.segment, 10.segment (partially filled and unclosed) - let mut payload = vec![0u8; record_size]; - for i in 0..record_count { - payload[0] = i as u8; - log.append(&payload).unwrap(); - } + assert_eq!(left, 0); - // Last disk not closed. Index will be filled with zeros and segment entries in index are not flushed from buffer yet - // Trailing zero indexes are considered as corrupted indexes - if let Ok(_l) = DiskLog::new(dir, max_index_size, max_segment_size, 100) { - panic!("Expecting a corrupted index error due to trailing zeros in the index") - } + let mut v = Vec::new(); + assert_eq!(handler.readv(15, 0, 1, &mut v).unwrap(), None); + let mut v = Vec::new(); + assert_eq!(handler.readv_with_timestamps(15, 0, 1, &mut v).unwrap(), None); } #[test] - fn vectored_reads_crosses_boundary_correctly() { - let dir = tempfile::tempdir().unwrap(); - let dir = dir.path(); - - let record_count = 100; - let max_index_size = record_count * 16; - let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap(); - - // 25 1K iterations. 3 segments - // 0.segment (10K, data with 0 - 9), 10.segment (5K, data with 10 - 14) - let mut payload = vec![0u8; 1024]; - for i in 0..25 { - payload[0] = i; - log.append(&payload).unwrap(); + fn read_using_timestamps() { + let dir = tempdir().unwrap(); + let (_, mut handler) = DiskHandler::new(dir.path()).unwrap(); + let (ranpack_bytes, _) = random_packets_as_bytes(); + + // results in: + // - ( 0, [len] * 1 packets, 0 timestamp) + // - ( 1, [len] * 2 packets, 100 timestamp) + // ... + // - (14, [len] * 15 packets, 1400 timestamp) + // + // where [len] = ranpack_bytes.len() + for i in 0..15 { + let mut v = Vec::with_capacity((i + 1) * ranpack_bytes.len()); + for _ in 0..=i { + v.extend( + ranpack_bytes + .clone() + .into_iter() + .map(|x| (x, i as u64 * 100)), + ); + } + handler.insert(i as u64, v).unwrap(); } - // Read 15K. Crosses boundaries of the segment and offset will be in the middle of 2nd segment - let (base_offset, relative_offset, count, data) = log.readv(0, 0, 15 * 1024).unwrap(); - assert_eq!(base_offset, 10); - assert_eq!(relative_offset, 4); - assert_eq!(count, 15); - assert_eq!(data.len(), 15 * 1024); - - // Read 15K. Crosses boundaries of the segment and offset will be at last record of 3rd segment - let (base_offset, relative_offset, count, data) = log - .readv(base_offset, relative_offset + 1, 15 * 1024) - .unwrap(); - assert_eq!(base_offset, 20); - assert_eq!(relative_offset, 4); - assert_eq!(count, 10); - assert_eq!(data.len(), 10 * 1024); - } - - #[test] - fn vectored_read_more_than_full_chomp_works_as_expected() { - let dir = tempfile::tempdir().unwrap(); - let dir = dir.path(); - - let record_count = 100; - let max_index_size = record_count * 16; - let mut log = DiskLog::new(dir, max_index_size, 10 * 1024, 10).unwrap(); - - // 90 1K iterations. 10 files - // 0.segment (data with 0 - 9), 10.segment (10 - 19) .... 80.segment - // 10K per file. 90K in total - let mut payload = vec![0u8; 1024]; - for i in 0..90 { - payload[0] = i; - log.append(&payload).unwrap(); + for i in 0..15 { + assert_eq!(handler.index_from_timestamp(i * 100).unwrap().0, i) } - - // Read 200K. Crosses boundaries of all the segments - let (base_offset, relative_offset, count, data) = log.readv(0, 0, 200 * 1024).unwrap(); - assert_eq!(base_offset, 80); - assert_eq!(relative_offset, 9); - assert_eq!(count, 90); - assert_eq!(data.len(), 90 * 1024); } } diff --git a/src/disk/segment.rs b/src/disk/segment.rs index 127f81b..20e5f24 100644 --- a/src/disk/segment.rs +++ b/src/disk/segment.rs @@ -1,197 +1,277 @@ -use std::fs::{File, OpenOptions}; -use std::io::{self, BufWriter, Write}; -use std::path::Path; -use std::path::PathBuf; - -/// Segment of a disk. Writes go through a buffer writers to -/// reduce number of system calls. Reads are directly read from -/// the file as seek on buffer reader will dump the buffer anyway -/// Also multiple readers might be operating on a given segment -/// which makes the cursor movement very dynamic -pub struct Segment { +use std::{ + fs::{File, OpenOptions}, + io, + path::Path, +}; + +use bytes::{Bytes, BytesMut}; + +/// Wrapper around the segment file. +#[derive(Debug)] +pub(super) struct Segment { + /// A buffered reader for the segment file. file: File, - writer: BufWriter, + /// The total size of segment file in bytes. size: u64, - next_offset: u64, - max_record_size: u64, } +/// A wrapper around a single segment file for convenient reading of bytes. Does **not** enforce +/// any contraints and simply does what asked. Handler should enforce the contraints. +/// +/// #### Note +/// It is the duty of the handler of this struct to ensure index file's size does not exceed the +/// specified limit. impl Segment { - // TODO next offset should be initialized correctly for segments which are reconstructed. `append` - // TODO without this will result in wrong offsets in the return position - pub fn new>(dir: P, base_offset: u64) -> io::Result { - let file_name = format!("{:020}.segment", base_offset); - let file_path: PathBuf = dir.as_ref().join(file_name); + /// Open a new segment file. Will throw an error if file does not exist. + #[inline] + pub(super) fn open>(path: P) -> io::Result { + let file = OpenOptions::new().read(true).open(path)?; + let size = file.metadata()?.len(); + Ok(Self { file, size }) + } + + /// Create a new segment file. Will throw an error if file already exists. + #[inline] + pub(super) fn new>(path: P, bytes: Bytes) -> io::Result { let file = OpenOptions::new() .read(true) - .append(true) - .create(true) - .open(&file_path)?; - let metadata = file.metadata()?; - - // 1MB buffer size - // NOTE write perf is only increasing till a certain buffer size. bigger sizes after that is causing a degrade - let buf = BufWriter::with_capacity(1024 * 1024, file.try_clone()?); - let size = metadata.len(); - - let segment = Segment { - file, - writer: buf, - size, - next_offset: 0, - max_record_size: 10 * 1024, - }; - - Ok(segment) + .write(true) + .create_new(true) + .open(path)?; + let size = bytes.len() as u64; + let mut ret = Self { file, size }; + ret.write_at(&bytes, 0)?; + Ok(ret) } - pub fn size(&self) -> u64 { + #[inline] + /// Returns the size of the file the segment is holding. + pub(super) fn size(&self) -> u64 { self.size } - pub fn set_next_offset(&mut self, next_offset: u64) { - self.next_offset = next_offset; - } - - /// Appends record to the file and return its offset - pub fn append(&mut self, record: &[u8]) -> io::Result<(u64, u64)> { - let record_size = record.len() as u64; - if record_size > self.max_record_size { - return Err(io::Error::new( - io::ErrorKind::Other, - "Max record size exceeded", - )); + /// Reads `len` bytes from given `offset` in the file. + #[inline] + pub(super) fn read(&self, offset: u64, len: u64) -> io::Result> { + if offset + len > self.size { + return Ok(None); } + let len = len as usize; + let mut bytes = BytesMut::with_capacity(len); + // SAFETY: We fill it with the contents later on, and has already been allocated. + unsafe { bytes.set_len(len) }; + self.read_at(&mut bytes, offset)?; - // append record and increment size. cursor is moved to the end as per the docs - // so we probably don't have to worry about reading and writing simultaneously - self.writer.write_all(record)?; - let position = self.size; - self.size += record.len() as u64; + Ok(Some(bytes.freeze())) + } - // return current offset after incrementing next offset - let offset = self.next_offset; - self.next_offset += 1; + /// Get packets from given vector of indices and corresponding lens. + #[inline] + pub(super) fn readv(&self, offsets: Vec<[u64; 2]>, out: &mut Vec) -> io::Result<()> { + let total = if let Some(first) = offsets.first() { + let mut total = first[1]; + for offset in offsets.iter().skip(1) { + total += offset[1]; + } + total + } else { + // return empty if offsets given is empty. + return Ok(()); + }; - Ok((offset, position)) - } + let mut buf = match self.read(offsets[0][0], total)? { + Some(buf) => buf, + None => return Ok(()), + }; + + for offset in offsets.into_iter() { + out.push(buf.split_to(offset[1] as usize)); + } - /// Reads to fill the complete buffer. Returns number of bytes read - pub fn read(&mut self, position: u64, buf: &mut [u8]) -> io::Result { - // TODO: No need to flush segments which are already filled. Make this conditional and check perf - self.writer.flush()?; - self.read_at(position, buf) + Ok(()) } + /// Takes in the vector of 3-arrays, whose elements are timestamp, offset, len in this order. + /// Returns a vector of 2-tuples containing `(packet_data, timestamp)` #[inline] - #[cfg(target_family = "unix")] - fn read_at(&mut self, position: u64, mut buf: &mut [u8]) -> io::Result { - use std::os::unix::fs::FileExt; + pub(super) fn readv_with_timestamps( + &self, + offsets: Vec<[u64; 3]>, + out: &mut Vec<(Bytes, u64)>, + ) -> io::Result<()> { + let total = if let Some(first) = offsets.first() { + let mut total = first[2]; + for offset in offsets.iter().skip(1) { + total += offset[2]; + } + total + } else { + return Ok(()); + }; - self.file.read_exact_at(&mut buf, position)?; + let mut buf = match self.read(offsets[0][1], total)? { + Some(ret) => ret, + None => return Ok(()), + }; - Ok(buf.len() as u64) + for offset in offsets.into_iter() { + out.push((buf.split_to(offset[2] as usize), offset[0])); + } + + Ok(()) } + /// Get the actual size of the file by reading it's metadata. Used only for testing. + #[cfg(test)] #[inline] - #[cfg(target_family = "windows")] - fn read_at(&mut self, position: u64, mut buf: &mut [u8]) -> io::Result { - use std::io::{Read, Seek, SeekFrom}; - - self.file.seek(SeekFrom::Start(position))?; - self.file.read_exact(&mut buf)?; + fn actual_size(&self) -> io::Result { + Ok(self.file.metadata()?.len()) + } - Ok(buf.len() as u64) + #[allow(unused_mut)] + #[inline] + fn read_at(&self, mut buf: &mut [u8], mut offset: u64) -> io::Result<()> { + #[cfg(target_family = "unix")] + { + use std::os::unix::prelude::FileExt; + self.file.read_exact_at(buf, offset) + } + #[cfg(target_family = "windows")] + { + use std::os::windows::fs::FileExt; + while !buf.is_empty() { + match self.seek_write(buf, offset) { + Ok(0) => return Ok(()), + Ok(n) => { + buf = &buf[n..]; + offset += n as u64; + } + Err(e) => return Err(e), + } + } + if !buf.is_empty() { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "failed to write whole buffer", + )) + } else { + Ok(()) + } + } } - pub fn close(&mut self) -> io::Result<()> { - self.writer.flush()?; - Ok(()) + #[allow(unused_mut)] + #[inline] + fn write_at(&mut self, mut buf: &[u8], mut offset: u64) -> io::Result<()> { + #[cfg(target_family = "unix")] + { + use std::os::unix::prelude::FileExt; + self.file.write_all_at(buf, offset) + } + #[cfg(target_family = "windows")] + { + use std::os::windows::fs::FileExt; + while !buf.is_empty() { + match self.seek_read(buf, offset) { + Ok(0) => return Ok(()), + Ok(n) => { + buf = &mut buf[n..]; + offset += n as u64; + } + Err(e) => return Err(e), + } + } + if !buf.is_empty() { + Err(io::Error::new( + io::ErrorKind::UnexpectedEof, + "failed to fill whole buffer", + )) + } else { + Ok(()) + } + } } } #[cfg(test)] mod test { - use super::Segment; + use bytes::{BufMut, Bytes, BytesMut}; use pretty_assertions::assert_eq; + use tempfile::tempdir; - #[test] - fn second_time_initialization_happens_correctly() { - let record = b"hello timestone commitlog"; - let len = record.len(); - let dir = tempfile::tempdir().unwrap(); - let base_offset = 10; + use super::*; - // 1st boot - { - let mut segment = Segment::new(&dir, base_offset).unwrap(); - for i in 0..10 { - let (offset, _pos) = segment.append(record).unwrap(); - assert_eq!(offset, i) - } - - let mut next_pos = 0; - for _i in 0..10 { - let mut data = vec![0; len as usize]; - segment.read(next_pos, &mut data).unwrap(); - assert_eq!(&data, record); - next_pos += len as u64; - } + #[test] + fn new_and_read_segment() { + let dir = tempdir().unwrap(); - segment.close().unwrap(); + let mut buf = BytesMut::new(); + // 1024 * 20 bytes stored. + for i in 0..20u8 { + buf.put(Bytes::from(vec![i; 1024])); + } + let segment = Segment::new(dir.path().join(&format!("{:020}", 1)), buf.freeze()).unwrap(); + assert_eq!(segment.size(), 20 * 1024); + + assert_eq!(segment.actual_size().unwrap(), 20 * 1024); + for i in 0..20u8 { + let byte = segment.read(i as u64 * 1024, 1024).unwrap().unwrap(); + assert_eq!(byte.len(), 1024); + assert_eq!(byte[0], i); + assert_eq!(byte[1023], i); } - let record = b"iello timestone commitlog"; - - // 2nd boot - { - let mut segment = Segment::new(&dir, base_offset).unwrap(); - // we usually get this from index - segment.set_next_offset(10); - let mut position = 0; - for i in 10..20 { - let (offset, pos) = segment.append(record).unwrap(); - position = pos; - assert_eq!(offset, i) - } - - let mut data = vec![0; len]; - segment.read(position, &mut data).unwrap(); - assert_eq!(&data, record); - - segment.close().unwrap(); + let mut offsets = Vec::with_capacity(20); + for i in 0..20 { + offsets.push([i * 1024, 1024]); } + let mut out = Vec::with_capacity(20); + segment.readv(offsets, &mut out).unwrap(); + for (i, byte) in out.into_iter().enumerate() { + assert_eq!(byte.len(), 1024); + assert_eq!(byte[0], i as u8); + assert_eq!(byte[1023], i as u8); + } + + assert_eq!(segment.read(1024 * 20, 1024).unwrap(), None); + assert_eq!(segment.read(1024 * 21, 1024).unwrap(), None); } - /* #[test] - fn vectored_reads_works_as_expected() { - let dir = tempfile::tempdir().unwrap(); - let mut segment = Segment::new(&dir, 10, 10 * 1024 * 1024).unwrap(); - - // 100 1K appends - for i in 0..100 { - let record = vec![i; 1024]; - let (offset, _pos) = segment.append(&record).unwrap(); - assert_eq!(offset as u8, i) - } + fn open_and_read_segment() { + let dir = tempdir().unwrap(); - for i in 0..100 { - let data = segment.read(i).unwrap(); - assert_eq!(data[i as usize], i as u8); + let mut buf = BytesMut::new(); + for i in 0..20u8 { + buf.put(Bytes::from(vec![i; 1024])); + } + let segment = Segment::new(dir.path().join(&format!("{:020}", 1)), buf.freeze()).unwrap(); + assert_eq!(segment.size(), 20 * 1024); + + assert_eq!(segment.actual_size().unwrap(), 20 * 1024); + for i in 0..20u8 { + let byte = segment.read(i as u64 * 1024, 1024).unwrap().unwrap(); + assert_eq!(byte.len(), 1024); + assert_eq!(byte[0], i); + assert_eq!(byte[1023], i); } - // read 90K. leaves 10 elements - let (count, _data) = segment.read_vectored(0, 90 * 1024).unwrap(); - assert_eq!(count, 90); + drop(segment); - // read remaining elements individually - for i in 90..100 { - let data = segment.read(i).unwrap(); - assert_eq!(data[i as usize], i as u8); + let segment = Segment::open(dir.path().join(&format!("{:020}", 1))).unwrap(); + let mut offsets = Vec::with_capacity(20); + for i in 0..20 { + offsets.push([i * 1024, 1024]); + } + let mut out = Vec::with_capacity(20); + segment.readv(offsets, &mut out).unwrap(); + for (i, byte) in out.into_iter().enumerate() { + assert_eq!(byte.len(), 1024); + assert_eq!(byte[0], i as u8); + assert_eq!(byte[1023], i as u8); } - segment.close().unwrap(); + assert_eq!(segment.read(1024 * 20, 1024).unwrap(), None); + assert_eq!(segment.read(1024 * 21, 1024).unwrap(), None); } - */ } diff --git a/src/lib.rs b/src/lib.rs index acb9b4f..305de5c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,660 @@ -#[macro_use] -extern crate log; +use std::{collections::VecDeque, io, path::PathBuf}; + +use bytes::Bytes; mod disk; -mod memory; +mod segment; +use disk::DiskHandler; +use segment::Segment; + +// TODO: write tests for cases when we read stuff that does not exist yet, and thus Ok(None) should +// be written. Write such tests for each level of abstraction. + +/// The log which can store commits in memory, and push them onto disk when needed, as well as read +/// from disk any valid segment. See [`Self::new`] for more information on how exactly log is +/// stored onto disk. +/// +/// ### Invariants +/// - The active segment should have index `tail`. +/// - The segments in memory should have contiguous indices, though this need not be the case for +/// segment stored on disk. +/// - The total size in bytes for each segment in memory should not increase beyond the +/// max_segment_size by more than the overflowing bytes of the last packet. +pub struct CommitLog { + /// The index at which segments of memory start. + head: u64, + /// The index at which the current active segment is, and also marks the last valid segment as + /// well as last segment in memory. + tail: u64, + /// Maximum size of any segment in memory. + max_segment_size: usize, + /// Maximum number of segments in memory, apart from the active segment. + max_segments: usize, + /// The active segment, to which incoming [`Bytes`] are appended to. Note that the bytes are + /// themselves not mutable. + active_segment: Segment, + /// Total size of active segment, used for enforcing the contraints. + segments: VecDeque, + /// Total size of segments in memory apart from active_segment, used for enforcing the + /// contraints. + segments_size: usize, + /// A set of opened file handles to all the segments stored onto the disk. This is optional. + disk_handler: Option, +} + +impl CommitLog { + /// Create a new `CommitLog` with given contraints. If `None` is passed in for `dir` argument, + /// there will be no logs on the disk, and when memory limit is reached the segment at + /// `self.head` will be removed. If a valid path is passed, the directory will be created if + /// does not exist, and the segment at `self.head` will be stored onto disk instead of simply + /// being deleted. + pub fn new( + max_segment_size: usize, + max_segments: usize, + dir: Option, + ) -> io::Result { + if max_segment_size < 1024 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "minimum 'max_segment_size' should be 1KB, {} given", + max_segment_size, + ) + .as_str(), + )); + } + + if let Some(dir) = dir { + let (head, files) = DiskHandler::new(dir)?; + + return Ok(Self { + head, + tail: head, + max_segment_size, + max_segments, + active_segment: Segment::with_capacity(max_segment_size), + segments: VecDeque::with_capacity(max_segments as usize), + segments_size: 0, + disk_handler: Some(files), + }); + } + + Ok(Self { + head: 0, + tail: 0, + max_segment_size, + max_segments, + active_segment: Segment::with_capacity(max_segment_size), + segments: VecDeque::with_capacity(max_segments as usize), + segments_size: 0, + disk_handler: None, + }) + } + + #[inline] + pub fn next_offset(&self) -> (u64, u64) { + if self.active_segment.len() >= self.max_segment_size as u64 { + (self.tail + 1, 0) + } else { + (self.tail, self.active_segment.len()) + } + } + + /// Get the number of segment on the disk. + #[inline] + pub fn disk_len(&self) -> io::Result { + Ok(self + .disk_handler + .as_ref() + .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "disk log was never opened"))? + .len()) + } + + #[inline] + pub fn head_and_tail(&self) -> (u64, u64) { + ( + match &self.disk_handler { + Some(handler) => handler.head(), + None => self.head, + }, + self.tail, + ) + } + + /// Append a new [`Bytes`] to the active segment. + #[inline] + pub fn append(&mut self, bytes: Bytes) -> io::Result<(u64, u64)> { + self.apply_retention()?; + self.active_segment.push(bytes); + Ok((self.tail, self.active_segment.len() as u64)) + } + + /// Append a new [`Bytes`] to the active segment, with timestamp as given. + #[inline] + pub fn append_with_timestamp( + &mut self, + bytes: Bytes, + timestamp: u64, + ) -> io::Result<(u64, u64)> { + self.apply_retention()?; + self.active_segment.push_with_timestamp(bytes, timestamp); + Ok((self.tail, self.active_segment.len() as u64)) + } + + fn apply_retention(&mut self) -> io::Result<()> { + if self.active_segment.size() >= self.max_segment_size { + if self.segments.len() >= self.max_segments { + // TODO: unwrap might cause error if self.max_segments == 0 + let removed_segment = self.segments.pop_front().unwrap(); + self.segments_size -= removed_segment.size(); + + if let Some(files) = self.disk_handler.as_mut() { + files.insert(self.head, removed_segment.into_data())?; + } + + self.head += 1; + } + + // this replace is cheap as we only swap the 3 pointer that are held by Vec + let old_segment = std::mem::replace( + &mut self.active_segment, + Segment::with_capacity(self.max_segment_size), + ); + self.segments_size += old_segment.size(); + self.segments.push_back(old_segment); + self.tail += 1; + } + + Ok(()) + } + + /// Read a single [`Bytes`] from the logs. + pub fn read(&self, index: u64, offset: u64) -> io::Result> { + if index > self.tail { + return Ok(None); + } + + // in disk + if index < self.head { + if let Some(handler) = self.disk_handler.as_ref() { + return handler.read(index, offset); + } + + return Ok(None); + } + + // in memory segment + if index < self.tail { + let segment = &self.segments[(index - self.head) as usize]; + return Ok(segment.at(index)); + } + + // in active segment + Ok(self.active_segment.at(index)) + } + + pub fn read_with_timestamps( + &self, + index: u64, + offset: u64, + ) -> io::Result> { + if index > self.tail { + return Ok(None); + } + + // in disk + if index < self.head { + if let Some(handler) = self.disk_handler.as_ref() { + return handler.read_with_timestamps(index, offset); + } + + return Ok(None); + } + + // in memory segment + if index < self.tail { + let segment = &self.segments[(index - self.head) as usize]; + return Ok(segment.at_with_timestamp(index)); + } + + // in active segment + Ok(self.active_segment.at_with_timestamp(index)) + } + + /// Read vector of [`Bytes`] from the logs. Returns a tuple as follows: + /// + /// `(data, remaining_len, index, offset)` + /// + /// - `data` is the vector of `Bytes` which were read. + /// - `remaining_len` is the length left from the provided length which we were not able to + /// read. + /// - `index` is the next segment to start reading at. + /// - `offset` is the next offset within that segment to start reading at. + /// + /// Note that `index` and `offset` might currently not exist, and might be corresponding to + /// future data which will be put in log. + pub fn readv( + &self, + mut index: u64, + mut offset: u64, + len: u64, + ) -> io::Result, u64, u64, u64)>> { + if index > self.tail { + return Ok(None); + } + + let mut remaining_len = len; + let mut out = Vec::with_capacity(remaining_len as usize); + + if index < self.head { + if let Some(handler) = self.disk_handler.as_ref() { + let (new_len, next_index) = + match handler.readv(index, offset, remaining_len, &mut out)? { + Some(ret) => ret, + None => return Ok(None), + }; + + remaining_len = new_len; + // start reading from memory in next iteration if no segment left to read on + // disk + index = next_index.unwrap_or(self.head); + // start from beginning of next segment + offset = 0; + } else { + return Ok(None); + } + } + + if remaining_len == 0 { + return Ok(Some((out, remaining_len, index, offset))); + } + + if index < self.tail { + let segment = &self.segments[index as usize]; + remaining_len = match segment.readv(offset, remaining_len, &mut out) { + Some(rem_len) => rem_len, + None => return Ok(None), + }; + // read the next segment, or move onto the active segment + index += 1; + // start from beginning of next segment + offset = 0; + } + + if remaining_len == 0 { + return Ok(Some((out, remaining_len, index, offset))); + } + + remaining_len = match self.active_segment.readv(offset, remaining_len, &mut out) { + Some(rem_len) => rem_len, + None => return Ok(None), + }; + + Ok(Some((out, remaining_len, index, offset))) + } + + pub fn readv_with_timestamps( + &self, + mut index: u64, + mut offset: u64, + len: u64, + ) -> io::Result, u64, u64, u64)>> { + if index > self.tail { + return Ok(None); + } + + let mut remaining_len = len; + let mut out = Vec::with_capacity(remaining_len as usize); + + if index < self.head { + if let Some(handler) = self.disk_handler.as_ref() { + let (new_len, next_index) = + match handler.readv_with_timestamps(index, offset, remaining_len, &mut out)? { + Some(ret) => ret, + None => return Ok(None), + }; + + remaining_len = new_len; + // start reading from memory in next iteration if no segment left to read on + // disk + index = next_index.unwrap_or(self.head); + // start from beginning of next segment + offset = 0; + } else { + return Ok(None); + } + } + + if remaining_len == 0 { + return Ok(Some((out, remaining_len, index, offset))); + } + + if index < self.tail { + let segment = &self.segments[index as usize]; + remaining_len = match segment.readv_with_timestamps(offset, remaining_len, &mut out) { + Some(ret) => ret, + None => return Ok(None), + }; + // read the next segment, or move onto the active segment + index += 1; + // start from beginning of next segment + offset = 0; + } + + if remaining_len == 0 { + return Ok(Some((out, remaining_len, index, offset))); + } + + remaining_len = + match self.active_segment + .readv_with_timestamps(offset, remaining_len, &mut out) { + Some(ret) => ret, + None => return Ok(None) + }; + + Ok(Some((out, remaining_len, index, offset))) + } + + pub fn index_from_timestamp(&self, timestamp: u64) -> io::Result> { + // beyond even active segment + if self.active_segment.end_time() < timestamp { + return Ok(None); + } + + if self.active_segment.start_time() <= timestamp { + // found within active segment + return Ok(Some(( + self.tail, + self.active_segment.index_from_timestamp(timestamp), + ))); + } + + if self.segments.len() > 0 && self.segments.front().unwrap().start_time() <= timestamp { + for (i, segment) in self.segments.iter().enumerate() { + if segment.start_time() <= timestamp && timestamp <= segment.end_time() { + // found within segment in memory + return Ok(Some(( + i as u64 + self.head, + segment.index_from_timestamp(timestamp), + ))); + } + } + return Ok(None); + } + + let disk_handler = match self.disk_handler.as_ref() { + Some(disk_handler) => disk_handler, + None => return Ok(None), + }; + + if !disk_handler.is_timestamp_contained(timestamp) { + return Ok(None); + } + + disk_handler + .index_from_timestamp(timestamp) + .map(|ret| Some(ret)) + } + + pub fn read_from_timestamp(&self, timestamp: u64) -> io::Result> { + // beyond even active segment + if self.active_segment.end_time() < timestamp { + return Ok(None); + } + + if self.active_segment.start_time() <= timestamp { + // found within active segment + let idx = self.active_segment.index_from_timestamp(timestamp); + return Ok(self.active_segment.at_with_timestamp(idx)); + } + + if self.segments.len() > 0 && self.segments[0].start_time() >= timestamp { + for segment in self.segments.iter() { + if segment.start_time() <= timestamp && timestamp <= segment.end_time() { + // found within segment in memory + let idx = segment.index_from_timestamp(timestamp); + return Ok(segment.at_with_timestamp(idx)); + } + } + } + + let disk_handler = match self.disk_handler.as_ref() { + Some(disk_handler) => disk_handler, + None => return Ok(None), + }; + + if !disk_handler.is_timestamp_contained(timestamp) { + return Ok(None); + } + + let (segment_idx, offset) = disk_handler.index_from_timestamp(timestamp)?; + disk_handler.read_with_timestamps(segment_idx, offset) + } +} + +#[cfg(test)] +mod test { + use bytes::{Bytes, BytesMut}; + use mqttbytes::{ + v4::{read, ConnAck, ConnectReturnCode::Success, Packet, Publish, Subscribe}, + QoS, + }; + use pretty_assertions::assert_eq; + use tempfile::tempdir; + + use super::*; + + #[allow(dead_code)] + #[inline] + pub(crate) fn init_logging() { + use simplelog::{ + ColorChoice, CombinedLogger, Config, LevelFilter, TermLogger, TerminalMode, + }; + // Error ignored as will be called in multiple functions, and this causes error if called + // multiple times. + let _ = CombinedLogger::init(vec![TermLogger::new( + LevelFilter::Trace, + Config::default(), + TerminalMode::Mixed, + ColorChoice::Auto, + )]); + } + + // Total size of all packets = 197 bytes + #[rustfmt::skip] + #[inline] + pub(crate) fn random_packets() -> Vec { + vec![ + Packet::Publish (Publish::new ("broker1", QoS::AtMostOnce , "ayoad1" )), + Packet::Publish (Publish::new ("brker2" , QoS::AtMostOnce , "pyload2")), + Packet::Subscribe(Subscribe::new("broker1", QoS::AtMostOnce )), + Packet::Publish (Publish::new ("brr3" , QoS::AtMostOnce , "pyload3")), + Packet::Publish (Publish::new ("bruuuu4", QoS::AtMostOnce , "pyload4")), + Packet::ConnAck (ConnAck::new (Success , true )), + Packet::ConnAck (ConnAck::new (Success , true )), + Packet::Publish (Publish::new ("brrrr5" , QoS::AtMostOnce , "paylad5")), + Packet::ConnAck (ConnAck::new (Success , true )), + Packet::Publish (Publish::new ("bro44r6", QoS::AtMostOnce , "aylad6" )), + Packet::Subscribe(Subscribe::new("broker7", QoS::AtMostOnce )), + Packet::Publish (Publish::new ("broker7", QoS::AtMostOnce , "paylad7")), + Packet::Publish (Publish::new ("b8" , QoS::AtMostOnce , "payl8" )), + Packet::Subscribe(Subscribe::new("b8" , QoS::AtMostOnce )), + Packet::Subscribe(Subscribe::new("bro44r6", QoS::AtMostOnce )), + Packet::ConnAck (ConnAck::new (Success , true )), + ] + } + + pub(crate) fn random_packets_as_bytes() -> (Vec, usize) { + let ranpacks = random_packets(); + let mut bytes = Vec::with_capacity(ranpacks.len()); + let mut total_len = 0; + for packet in ranpacks.into_iter() { + let mut byte = BytesMut::default(); + match packet { + Packet::Publish(p) => { + p.write(&mut byte).unwrap(); + } + Packet::Subscribe(p) => { + p.write(&mut byte).unwrap(); + } + Packet::ConnAck(p) => { + p.write(&mut byte).unwrap(); + } + _ => panic!("unexpected packet type"), + } + total_len += byte.len(); + bytes.push(byte.freeze()); + } + (bytes, total_len) + } + + pub(crate) fn verify_bytes_as_random_packets(bytes: Vec, take: usize) { + let ranpacks = random_packets(); + for (ranpack, byte) in ranpacks.into_iter().zip(bytes.into_iter()).take(take) { + let readpack = read(&mut BytesMut::from(byte.as_ref()), byte.len()).unwrap(); + assert_eq!(readpack, ranpack); + } + } + + #[test] + fn active_segment() { + let (ranpack_bytes, len) = random_packets_as_bytes(); + let mut log = CommitLog::new(len * 10, 10, None).unwrap(); + + for _ in 0..5 { + for byte in ranpack_bytes.clone() { + log.append(byte).unwrap(); + } + } + + assert_eq!(log.active_segment.len() as usize, ranpack_bytes.len() * 5); + assert_eq!(log.active_segment.size() as usize, len * 5); + + for _ in 0..5 { + for byte in ranpack_bytes.clone() { + log.append(byte).unwrap(); + } + } + + assert_eq!(log.active_segment.len() as usize, ranpack_bytes.len() * 10); + assert_eq!(log.active_segment.size() as usize, len * 10); + } + + #[test] + fn memory_segment() { + let (ranpack_bytes, len) = random_packets_as_bytes(); + let mut log = CommitLog::new(len * 10, 10, None).unwrap(); + + for _ in 0..7 { + for byte in ranpack_bytes.clone() { + log.append(byte).unwrap(); + } + } + + assert_eq!(log.active_segment.len() as usize, ranpack_bytes.len() * 7); + assert_eq!(log.active_segment.size() as usize, len * 7); + + for _ in 0..70 { + for byte in ranpack_bytes.clone() { + log.append(byte).unwrap(); + } + } + + assert_eq!(log.active_segment.len() as usize, ranpack_bytes.len() * 7); + assert_eq!(log.active_segment.size() as usize, len * 7); + assert_eq!(log.segments[0].size() as usize, len * 10); + assert_eq!(log.segments[0].len() as usize, ranpack_bytes.len() * 10); + assert_eq!(log.segments.len(), 7); + } + + #[test] + fn disk_segment() { + let (ranpack_bytes, len) = random_packets_as_bytes(); + let dir = tempdir().unwrap(); + let mut log = CommitLog::new(len * 10, 5, Some(dir.path().into())).unwrap(); + + for _ in 0..5 { + for byte in ranpack_bytes.clone() { + log.append(byte).unwrap(); + } + } + + assert_eq!(log.active_segment.len() as usize, ranpack_bytes.len() * 5); + assert_eq!(log.active_segment.size() as usize, len * 5); + + for _ in 0..70 { + for byte in ranpack_bytes.clone() { + log.append(byte).unwrap(); + } + } + + assert_eq!(log.active_segment.size() as usize, 5 * len); + assert_eq!(log.active_segment.len() as usize, 5 * ranpack_bytes.len()); + assert_eq!(log.segments_size, len * 10 * 5); + assert_eq!(log.disk_handler.unwrap().len(), 2); + } + + #[test] + fn read_from_everywhere() { + let (ranpack_bytes, len) = random_packets_as_bytes(); + let dir = tempdir().unwrap(); + let mut log = CommitLog::new(len * 10, 5, Some(dir.path().into())).unwrap(); + + // 160 packets in active_segment, 800 packets in segment, 640 packets in disk + for _ in 0..100 { + for byte in ranpack_bytes.clone() { + log.append(byte).unwrap(); + } + } + + assert_eq!(log.active_segment.len() as usize, ranpack_bytes.len() * 10); + assert_eq!(log.segments.len(), 5); + assert_eq!(log.disk_handler.as_ref().unwrap().len(), 4); + assert_eq!(log.head_and_tail(), (0, 9)); + + let mut offset = 0; + let mut index = 0; + for _ in 0..100 { + let v = log.readv(index, offset, 16).unwrap().unwrap(); + index = v.1; + offset = v.2; + verify_bytes_as_random_packets(v.0, 16); + } + } + + #[test] + fn read_and_append_with_timestamps() { + let (ranpack_bytes, len) = random_packets_as_bytes(); + let dir = tempdir().unwrap(); + let mut log = CommitLog::new(len * 10, 5, Some(dir.path().into())).unwrap(); + + // 160 packets in active_segment, 800 packets in segment, 640 packets in disk = total of + // 1600 packes. + // timestamps = segment_id * 1000 + offset * 10; + for i in 0..100 { + for (j, byte) in ranpack_bytes.clone().into_iter().enumerate() { + log.append_with_timestamp(byte, i * 1000 + j as u64 * 10) + .unwrap(); + } + } + + assert_eq!(log.active_segment.len() as usize, ranpack_bytes.len() * 10); + assert_eq!(log.segments.len(), 5); + let disk_handler = log.disk_handler.as_ref().unwrap(); + assert_eq!(disk_handler.len(), 4); -pub use disk::DiskLog; -pub use memory::MemoryLog; + // the segment + for i in 0..10 { + for j in 0..10 { + for k in 0..ranpack_bytes.len() as u64 - 1 { + let idx = log + .index_from_timestamp(i * 10000 + j * 1000 + k * 10 + 5) + .unwrap() + .unwrap(); + assert_eq!(idx.0, i); + assert_eq!(idx.1, j * 16 + k + 1); + } + } + } + } +} diff --git a/src/memory/mod.rs b/src/memory/mod.rs deleted file mode 100644 index 8d7b16e..0000000 --- a/src/memory/mod.rs +++ /dev/null @@ -1,358 +0,0 @@ -mod segment; - -use fnv::FnvHashMap; -use segment::Segment; -use std::fmt::Debug; -use std::mem; - -/// Log is an inmemory commitlog (per topic) which splits data in segments. -/// It drops the oldest segment when retention policies are crossed. -/// Each segment is identified by base offset and a new segment is created -/// when ever current segment crosses disk limit -#[derive(Debug)] -pub struct MemoryLog { - /// Offset of the first segment - head_offset: u64, - /// Offset of the last segment - tail_offset: u64, - /// Maximum size of a segment - max_segment_size: usize, - /// Maximum number of segments - max_segments: usize, - /// Current active chunk to append - active_segment: Segment, - /// All the segments in a ringbuffer - segments: FnvHashMap>, -} - -impl MemoryLog { - /// Create a new log - pub fn new(max_segment_size: usize, max_segments: usize) -> MemoryLog { - if max_segment_size < 1024 { - panic!("size should be at least 1KB") - } - - MemoryLog { - head_offset: 0, - tail_offset: 0, - max_segment_size, - max_segments, - segments: FnvHashMap::default(), - active_segment: Segment::new(0), - } - } - - pub fn head_and_tail(&self) -> (u64, u64) { - (self.head_offset, self.tail_offset) - } - - /// Appends this record to the tail and returns the offset of this append. - /// When the current segment is full, this also create a new segment and - /// writes the record to it. - /// This function also handles retention by removing head segment - pub fn append(&mut self, size: usize, record: T) -> (u64, u64) { - let switch = self.apply_retention(); - let segment_id = self.tail_offset; - let offset = self.active_segment.append(record, size); - - // For debugging during flux. Will be removed later - if switch { - // println!("swch. segment = {}, next_offset = {}", segment_id, offset); - } - - (segment_id, offset) - } - - fn apply_retention(&mut self) -> bool { - if self.active_segment.size() >= self.max_segment_size { - let next_offset = self.active_segment.base_offset() + self.active_segment.len() as u64; - let last_active = mem::replace(&mut self.active_segment, Segment::new(next_offset)); - self.segments.insert(self.tail_offset, last_active); - self.tail_offset += 1; - - // if backlog + active segment count is greater than max segments, - // delete first segment and update head - if self.segments.len() + 1 > self.max_segments { - if self.segments.remove(&self.head_offset).is_some() { - self.head_offset += 1; - } - } - - return true; - } - - false - } - - pub fn next_offset(&self) -> (u64, u64) { - let segment_id = self.tail_offset; - let next_offset = self.active_segment.base_offset() + self.active_segment.len() as u64; - (segment_id, next_offset) - } - - /// Read a record from correct segment - pub fn read(&mut self, cursor: (u64, u64)) -> Option { - if cursor.0 == self.tail_offset { - return self.active_segment.read(cursor.1); - } - - match self.segments.get(&cursor.0) { - Some(segment) => segment.read(cursor.1), - None => None, - } - } - - /// Reads multiple packets from the disk and returns base offset and - /// offset of the next log. - /// When data of deleted segment is asked, returns data of the current head - /// **Note**: segment id is used to be able to pull directly from correct segment - /// **Note**: This method also returns full segment data when requested - /// data is not of active segment. Set your max_segment size keeping tail - /// latencies of all the concurrent connections mind - /// (some runtimes support internal preemption using await points) - pub fn readv(&mut self, mut cursor: (u64, u64), out: &mut Vec) -> Option<(u64, u64)> { - // TODO Fix usize to u64 conversions - // jump to head if the caller is trying to read deleted segment - if cursor.0 < self.head_offset { - warn!("Trying to read a deleted segment. Jumping"); - cursor = (self.head_offset, self.head_offset) - } - - loop { - // read from active segment if base offset matches active segment's base offset - if cursor.0 == self.tail_offset { - self.active_segment.readv(cursor.1, out); - // Return None if there is no data. Router will park the data request with - // this cursor to resume when there is more data - if out.is_empty() { - break None; - } - - cursor.1 += out.len() as u64; - break Some(cursor); - } - - // read from backlog segments - if let Some(segment) = self.segments.get(&cursor.0) { - segment.readv(cursor.1, out); - - if !out.is_empty() { - // We always read full segment. So we can always jump to next segment - cursor.0 += 1; - cursor.1 += out.len() as u64; - break (Some(cursor)); - } else { - // Jump to the next segment if the above readv return 0 element - // because of just being at the edge before next segment got - // added - // NOTE: This jump is necessary because, readv should always - // return data if there is data. Or else router registers this - // for notification even though there is data (which might - // cause a block) - cursor.0 += 1; - continue; - }; - } - } - } -} - -#[cfg(test)] -mod test { - use super::MemoryLog; - use pretty_assertions::assert_eq; - - #[test] - fn append_creates_and_deletes_segments_correctly() { - let mut log = MemoryLog::new(10 * 1024, 10); - - // 200 1K iterations. 10 1K records per file. 20 files ignoring deletes. - // segments: 0.segment (0 - 9), 1.segment (10 -19) .... 19.segment (190 - 200) - // considering deletes: 10.segment, 11.segment .. 19.segment - for i in 0..200 { - let payload = vec![i; 1024]; - log.append(payload.len(), payload); - } - - // Semi fill 200.segment. Deletes 100.segment - // considering deletes: 110.segment .. 190.segment - for i in 200..205 { - let payload = vec![i; 1024]; - log.append(payload.len(), payload); - } - - let data = log.read((9, 0)); - assert!(data.is_none()); - - // considering: 10.segment (100-109) .. 19.segment (190-199) - // read segment with base offset 11 - let segment_id = 11; - let base_offset = 110; - for i in 0..10 { - let data = log.read((segment_id, base_offset + i)).unwrap(); - let d = base_offset as u8 + i as u8; - assert_eq!(data[0], d); - } - - // read segment with base offset 190 (1 last segment before - // semi filled segment) - let segment_id = 19; - let base_offset = 190; - for i in 0..10 { - let data = log.read((segment_id, base_offset + i)).unwrap(); - let d = base_offset as u8 + i as u8; - assert_eq!(data[0], d); - } - - // read 200.segment which is semi filled with 5 records - let segment_id = 20; - let base_offset = 200; - for i in 0..5 { - let data = log.read((segment_id, base_offset + i)).unwrap(); - let d = base_offset as u8 + i as u8; - assert_eq!(data[0], d); - } - - let data = log.read((20, base_offset + 5)); - assert!(data.is_none()); - } - - #[test] - fn vectored_read_works_as_expected() { - let mut log = MemoryLog::new(10 * 1024, 10); - - // 90 1K iterations. 10 files - // 0.segment (data with 0 - 9), 1.segment (10 - 19) .... 8.segment (80 - 89) - // 10K per segment = 10 records per segment - for i in 0..90 { - let payload = vec![i; 1024]; - log.append(payload.len(), payload); - } - - let mut data = Vec::new(); - // Read a segment from start. This returns full segment - let next = log.readv((0, 0), &mut data); - assert_eq!(data.len(), 10); - assert_eq!(next, Some((1, 10))); - assert_eq!(data[0][0], 0); - assert_eq!(data[data.len() - 1][0], 9); - - // Read 5.segment - let data = log.read((5, 50)).unwrap(); - assert_eq!(data[0], 50); - - // Read a segment from the middle. This returns all the remaining elements - let mut data = Vec::new(); - let next = log.readv((1, 15), &mut data); - assert_eq!(data.len(), 5); - assert_eq!(next, Some((2, 20))); - assert_eq!(data[0][0], 15); - assert_eq!(data[data.len() - 1][0], 19); - - // Read a segment from scratch. gets full segment - let mut data = Vec::new(); - let next = log.readv((1, 10), &mut data); - assert_eq!(data.len(), 10); - assert_eq!(next, Some((2, 20))); - } - - #[test] - fn vectored_reads_from_active_segment_works_as_expected() { - let mut log = MemoryLog::new(10 * 1024, 10); - - // 200 1K iterations. 10 1K records per file. 20 files ignoring deletes. - // segments: 0.segment, 1.segment .... 19.segment - // considering deletes: 10.segment .. 19.segment - for i in 0..200 { - let payload = vec![i; 1024]; - log.append(payload.len(), payload); - } - - // Read active segment. Next shouldn't jump to next segment - let mut data = Vec::new(); - let next = log.readv((19, 190), &mut data); - assert_eq!(data.len(), 10); - assert_eq!(next, Some((19, 200))); - } - - #[test] - fn vectored_reads_from_active_segment_resumes_after_empty_reads_correctly() { - let mut log = MemoryLog::new(10 * 1024, 10); - - // 85 1K iterations. 10 files - // 0.segment (data with 0 - 9), 1.segment (10 - 19) .... 8.segment (80 - 84) - // 10 records per segment (1K each) - // 8.segment is semi filled - for i in 0..85 { - let payload = vec![i; 1024]; - log.append(payload.len(), payload); - } - - // read active segment - let mut data = Vec::new(); - let next = log.readv((8, 80), &mut data); - assert_eq!(data.len(), 5); - assert_eq!(next, Some((8, 85))); - - // fill active segment more - for i in 85..90 { - let payload = vec![i; 1024]; - log.append(payload.len(), payload); - } - - // read active segment - let mut data = Vec::new(); - let next = log.readv(next.unwrap(), &mut data); - assert_eq!(data.len(), 5); - assert_eq!(next, Some((8, 90))); - - let mut data = Vec::new(); - let next = log.readv(next.unwrap(), &mut data); - assert_eq!(data.len(), 0); - assert!(next.is_none()); - } - - #[test] - fn last_active_segment_read_jumps_to_next_segment_read_correctly() { - let mut log = MemoryLog::new(10 * 1024, 10); - - // 90 1K iterations. 9 files ignoring deletes. - // 0.segment (data with 0 - 9), 1.segment .... 8.segment (80 - 89) - // 10K per segment = 10 records per segment - for i in 0..90 { - let payload = vec![i; 1024]; - log.append(payload.len(), payload); - } - - // read active segment. there's no next segment. so active segment - // is not done yet - let mut data = Vec::new(); - let next = log.readv((8, 80), &mut data); - assert_eq!(data.len(), 10); - assert_eq!(next, Some((8, 90))); - - // append more which also changes active segment to 100.segment - for i in 90..110 { - let payload = vec![i; 1024]; - log.append(payload.len(), payload); - } - - // read from the next offset of previous active segment - let mut data = Vec::new(); - let next = log.readv(next.unwrap(), &mut data); - assert_eq!(data.len(), 10); - assert_eq!(next, Some((10, 100))); - - // read active segment again - let mut data = Vec::new(); - let next = log.readv(next.unwrap(), &mut data); - assert_eq!(data.len(), 10); - assert_eq!(next, Some((10, 110))); - - // read again when there is no more data - let mut data = Vec::new(); - let next = log.readv(next.unwrap(), &mut data); - assert_eq!(data.len(), 0); - assert!(next.is_none()); - } -} diff --git a/src/memory/segment.rs b/src/memory/segment.rs deleted file mode 100644 index 42c60a7..0000000 --- a/src/memory/segment.rs +++ /dev/null @@ -1,69 +0,0 @@ -use std::fmt::Debug; - -/// Segment of a disk. Writes go through a buffer writers to -/// reduce number of system calls. Reads are directly read from -/// the file as seek on buffer reader will dump the buffer anyway -/// Also multiple readers might be operating on a given segment -/// which makes the cursor movement very dynamic -#[derive(Debug)] -pub struct Segment { - base_offset: u64, - size: usize, - pub(crate) file: Vec, -} - -impl Segment { - pub fn new(base_offset: u64) -> Segment { - let file = Vec::with_capacity(10000); - - Segment { - base_offset, - file, - size: 0, - } - } - - pub fn base_offset(&self) -> u64 { - self.base_offset - } - - pub fn size(&self) -> usize { - self.size - } - - pub fn len(&self) -> usize { - self.file.len() - } - - /// Appends record to the file and return next offset - pub fn append(&mut self, record: T, len: usize) -> u64 { - self.file.push(record); - self.size += len; - - // return current offset after incrementing next offset - self.base_offset + self.file.len() as u64 - } - - /// Reads at an absolute offset - pub fn read(&self, offset: u64) -> Option { - if offset < self.base_offset { - return None; - } - - let offset = offset - self.base_offset; - match self.file.get(offset as usize) { - Some(record) => Some(record.clone()), - None => None, - } - } - - /// Reads multiple data from an offset to the end of segment - pub fn readv(&self, offset: u64, out: &mut Vec) { - if offset < self.base_offset { - return; - } - - let offset = offset - self.base_offset; - out.extend_from_slice(&self.file[offset as usize..]) - } -} diff --git a/src/segment.rs b/src/segment.rs new file mode 100644 index 0000000..cfc8473 --- /dev/null +++ b/src/segment.rs @@ -0,0 +1,173 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use bytes::Bytes; + +/// A struct for keeping Bytes in memory. +#[derive(Debug)] +pub(super) struct Segment { + data: Vec<(Bytes, u64)>, + size: usize, + start_time: u64, + end_time: u64, +} + +// TODO: verify that unwraps for system time are fine. +impl Segment { + /// Create a new segment with given capacity. + #[inline] + pub(super) fn with_capacity(capacity: usize) -> Self { + Self { + data: Vec::with_capacity(capacity), + size: 0, + start_time: 0, + end_time: 0, + } + } + + /// Create a new segment with given capacity, and the given `Bytes` and `timestamp` as the + /// first element. + #[allow(dead_code)] + #[inline] + pub(super) fn new(capacity: u64, byte: Bytes, timestamp: u64) -> Self { + let size = byte.len(); + let mut data = Vec::with_capacity(capacity as usize); + data.push((byte, timestamp)); + + Self { + data, + size, + start_time: timestamp, + end_time: timestamp, + } + } + + /// Push a new `Bytes` in the segment. + #[inline] + pub(super) fn push(&mut self, byte: Bytes) { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as u64; + + // ASSUMPTION: we don't do any inserts at 1970 according to system time. + if self.start_time == 0 { + self.start_time = now; + } + + self.end_time = now; + self.size += byte.len(); + self.data.push((byte, now)); + } + + /// Push a new element with the given timestamp. + #[allow(dead_code)] + #[inline] + pub(super) fn push_with_timestamp(&mut self, byte: Bytes, timestamp: u64) { + // ASSUMPTION: we don't do any inserts at 1970 according to system time. + if self.start_time == 0 { + self.start_time = timestamp; + } + + self.end_time = timestamp; + self.size += byte.len(); + self.data.push((byte, timestamp)); + } + + /// Get `Bytes` at the given index. + #[inline] + pub(super) fn at(&self, index: u64) -> Option { + if index > self.len() { + None + } else { + Some(self.data[index as usize].0.clone()) + } + } + + /// Get `Bytes` and the timestamp at the given index. + #[inline] + pub(super) fn at_with_timestamp(&self, index: u64) -> Option<(Bytes, u64)> { + if index > self.len() { + None + } else { + Some(self.data[index as usize].clone()) + } + } + + /// Retrieve the index which either matches the given timestamp or is the immediate next one. + #[inline] + pub(super) fn index_from_timestamp(&self, timestamp: u64) -> u64 { + match self.data.binary_search_by(|a| a.1.cmp(×tamp)) { + Ok(idx) => idx as u64, + Err(idx) => idx as u64, + } + } + + /// Get the number of `Bytes` in the segment. + #[inline] + pub(super) fn len(&self) -> u64 { + self.data.len() as u64 + } + + /// Get the total size in bytes of the segment. + #[inline] + pub(super) fn size(&self) -> usize { + self.size + } + + /// Convert the segment into `Vec`, consuming `self`. + #[inline] + pub(super) fn into_data(self) -> Vec<(Bytes, u64)> { + self.data + } + + /// Get the smallest timestamp of any packet in the segment. + #[inline] + pub(super) fn start_time(&self) -> u64 { + self.start_time + } + + /// Get the largest timestamp of any packet in the segment. + #[inline] + pub(super) fn end_time(&self) -> u64 { + self.end_time + } + + /// Read a range of data into `out`, doesn't add timestamp. + #[inline] + pub(super) fn readv(&self, index: u64, len: u64, out: &mut Vec) -> Option { + if index >= self.len() { + return None + } + + let mut limit = (index + len) as usize; + let mut left = 0; + if limit > self.data.len() { + left = limit - self.data.len(); + limit = self.data.len(); + } + out.extend(self.data[index as usize..limit].iter().map(|x| x.0.clone())); + Some(left as u64) + } + + /// Read a range of data into `out`, along with timestamp. + #[inline] + pub(super) fn readv_with_timestamps( + &self, + index: u64, + len: u64, + out: &mut Vec<(Bytes, u64)>, + ) -> Option { + if index >= self.len() { + return None + } + + let mut limit = (index + len) as usize; + let mut left = 0; + if limit > self.data.len() { + left = limit - self.data.len(); + limit = self.data.len(); + } + out.extend_from_slice(&self.data[index as usize..limit]); + Some(left as u64) + } +}