From 02b4d28637a210cfc78511da121780e3106aee6e Mon Sep 17 00:00:00 2001 From: Paul Dix Date: Fri, 12 Jan 2024 11:52:28 -0500 Subject: [PATCH] feat: add basic wal implementation for Edge (#24570) * feat: add basic wal implementation for Edge This WAL implementation uses some of the code from the wal crate, but departs pretty significantly from it in many ways. For now it uses simple JSON encoding for the serialized ops, but we may want to switch that to Protobuf at some point in the future. This version of the wal doesn't have its own buffering. That will be implemented higher up in the BufferImpl, which will use the wal and SegmentWriter to make data in the buffer durable. The write flow will be that writes will come into the buffer and validate/update against an in memory Catalog. Once validated, writes will get buffered up in memory and then flushed into the WAL periodically (likely every 10-20ms). After being flushed to the wal, the entire batch of writes will be put into the in memory queryable buffer. After that responses will be sent back to the clients. This should reduce the write lock pressure on the in-memory buffer considerably. In this PR: - Update the Wal, WalSegmentWriter, and WalSegmentReader traits to line up with new design/understanding - Implement wal (mainly just a way to identify segment files in a directory) - Implement WalSegmentWriter (write header, op batch with crc, and track sequence number in segment, re-open existing file) - Implement WalSegmentReader * refactor: make Wal return impl reader/writer * refactor: clean up wal segment open * fix: WriteBuffer and Wal usage Turn wal and write buffer references into a concrete type, rather than dyn. * fix: have wal loading ignore invalid files --- Cargo.lock | 5 + influxdb3/src/commands/serve.rs | 9 +- influxdb3_server/src/lib.rs | 2 +- influxdb3_server/src/query_executor.rs | 44 +- influxdb3_write/Cargo.toml | 8 + influxdb3_write/src/lib.rs | 59 ++- influxdb3_write/src/wal.rs | 663 ++++++++++++++++++++++++- influxdb3_write/src/write_buffer.rs | 16 +- 8 files changed, 736 insertions(+), 70 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4569ae8e58..e79a1a549e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2388,7 +2388,9 @@ version = "0.1.0" dependencies = [ "arrow", "async-trait", + "byteorder", "chrono", + "crc32fast", "data_types", "datafusion", "influxdb-line-protocol", @@ -2400,7 +2402,10 @@ dependencies = [ "schema", "serde", "serde_json", + "snap", + "test_helpers", "thiserror", + "tokio", "workspace-hack", ] diff --git a/influxdb3/src/commands/serve.rs b/influxdb3/src/commands/serve.rs index 702dbf60f2..9dd3f48c8c 100644 --- a/influxdb3/src/commands/serve.rs +++ b/influxdb3/src/commands/serve.rs @@ -11,7 +11,6 @@ use influxdb3_server::{query_executor::QueryExecutorImpl, serve, CommonServerSta use influxdb3_write::persister::PersisterImpl; use influxdb3_write::wal::WalImpl; use influxdb3_write::write_buffer::WriteBufferImpl; -use influxdb3_write::Wal; use iox_query::exec::{Executor, ExecutorConfig}; use ioxd_common::reexport::trace_http::ctx::TraceHeaderParser; use object_store::DynObjectStore; @@ -46,6 +45,9 @@ pub enum Error { #[error("Server error: {0}")] Server(#[from] influxdb3_server::Error), + + #[error("Wal error: {0}")] + Wal(#[from] influxdb3_write::wal::Error), } pub type Result = std::result::Result; @@ -231,9 +233,10 @@ pub async fn command(config: Config) -> Result<()> { *config.http_bind_address, ); let catalog = Arc::new(influxdb3_write::catalog::Catalog::new()); - let wal: Option> = config + let wal: Option> = config .wal_directory - .map(|dir| Arc::new(WalImpl::new(dir)) as _); + .map(|dir| WalImpl::new(dir).map(Arc::new)) + .transpose()?; let write_buffer = Arc::new(WriteBufferImpl::new(Arc::clone(&catalog), wal)); let query_executor = QueryExecutorImpl::new( catalog, diff --git a/influxdb3_server/src/lib.rs b/influxdb3_server/src/lib.rs index b68dd4dfa9..2a71559cf8 100644 --- a/influxdb3_server/src/lib.rs +++ b/influxdb3_server/src/lib.rs @@ -199,7 +199,7 @@ mod tests { let write_buffer = Arc::new(influxdb3_write::write_buffer::WriteBufferImpl::new( Arc::clone(&catalog), - None, + None::>, )); let query_executor = crate::query_executor::QueryExecutorImpl::new( catalog, diff --git a/influxdb3_server/src/query_executor.rs b/influxdb3_server/src/query_executor.rs index ea04c53afc..1b3301924a 100644 --- a/influxdb3_server/src/query_executor.rs +++ b/influxdb3_server/src/query_executor.rs @@ -107,7 +107,7 @@ impl QueryExecutor for QueryExecutorImpl { // This implementation is for the Flight service #[async_trait] impl QueryNamespaceProvider for QueryExecutorImpl { - type Db = QueryDatabase; + type Db = QueryDatabase; async fn db( &self, @@ -135,18 +135,18 @@ impl QueryNamespaceProvider for QueryExecutorImpl { } } -#[derive(Debug, Clone)] -pub struct QueryDatabase { +#[derive(Debug)] +pub struct QueryDatabase { db_schema: Arc, - write_buffer: Arc, + write_buffer: Arc, exec: Arc, datafusion_config: Arc>, } -impl QueryDatabase { +impl QueryDatabase { pub fn new( db_schema: Arc, - write_buffer: Arc, + write_buffer: Arc, exec: Arc, datafusion_config: Arc>, ) -> Self { @@ -160,7 +160,7 @@ impl QueryDatabase { } #[async_trait] -impl QueryNamespace for QueryDatabase { +impl QueryNamespace for QueryDatabase { async fn chunks( &self, _table_name: &str, @@ -189,10 +189,17 @@ impl QueryNamespace for QueryDatabase { } fn new_query_context(&self, span_ctx: Option) -> IOxSessionContext { + let qdb = Self::new( + Arc::clone(&self.db_schema), + Arc::clone(&self.write_buffer), + Arc::clone(&self.exec), + Arc::clone(&self.datafusion_config), + ); + let mut cfg = self .exec .new_execution_config(ExecutorType::Query) - .with_default_catalog(Arc::new(self.clone())) + .with_default_catalog(Arc::new(qdb)) .with_span_context(span_ctx); for (k, v) in self.datafusion_config.as_ref() { @@ -203,7 +210,7 @@ impl QueryNamespace for QueryDatabase { } } -impl CatalogProvider for QueryDatabase { +impl CatalogProvider for QueryDatabase { fn as_any(&self) -> &dyn Any { self as &dyn Any } @@ -215,15 +222,22 @@ impl CatalogProvider for QueryDatabase { fn schema(&self, name: &str) -> Option> { info!("CatalogProvider schema {}", name); + let qdb = Self::new( + Arc::clone(&self.db_schema), + Arc::clone(&self.write_buffer), + Arc::clone(&self.exec), + Arc::clone(&self.datafusion_config), + ); + match name { - DEFAULT_SCHEMA => Some(Arc::new(self.clone())), + DEFAULT_SCHEMA => Some(Arc::new(qdb)), _ => None, } } } #[async_trait] -impl SchemaProvider for QueryDatabase { +impl SchemaProvider for QueryDatabase { fn as_any(&self) -> &dyn Any { self as &dyn Any } @@ -253,14 +267,14 @@ impl SchemaProvider for QueryDatabase { } #[derive(Debug)] -pub struct QueryTable { +pub struct QueryTable { db_schema: Arc, name: Arc, schema: Schema, - write_buffer: Arc, + write_buffer: Arc, } -impl QueryTable { +impl QueryTable { fn chunks( &self, ctx: &SessionState, @@ -279,7 +293,7 @@ impl QueryTable { } #[async_trait] -impl TableProvider for QueryTable { +impl TableProvider for QueryTable { fn as_any(&self) -> &dyn Any { self as &dyn Any } diff --git a/influxdb3_write/Cargo.toml b/influxdb3_write/Cargo.toml index ba66362ccf..803b10ef8f 100644 --- a/influxdb3_write/Cargo.toml +++ b/influxdb3_write/Cargo.toml @@ -18,9 +18,17 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" } arrow = { workspace = true } async-trait = "0.1" +byteorder = "1.3.4" chrono = "0.4" +crc32fast = "1.2.0" datafusion = { workspace = true } parking_lot = "0.11.1" thiserror = "1.0" +tokio = { version = "1.35", features = ["macros", "fs", "io-util", "parking_lot", "rt-multi-thread", "sync", "time"] } serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.107" +snap = "1.0.0" + +[dev-dependencies] +test_helpers = { path = "../test_helpers" } + diff --git a/influxdb3_write/src/lib.rs b/influxdb3_write/src/lib.rs index e320aaa577..c7fd9f56b0 100644 --- a/influxdb3_write/src/lib.rs +++ b/influxdb3_write/src/lib.rs @@ -75,7 +75,7 @@ pub trait Bufferer: Debug + Send + Sync + 'static { ) -> Result>>; /// Returns the configured WAL, if there is one. - fn wal(&self) -> Option>; + fn wal(&self) -> Option>; } /// A segment in the buffer that corresponds to a single WAL segment file. It contains a catalog with any updates @@ -109,11 +109,40 @@ pub trait ChunkContainer: Debug + Send + Sync + 'static { /// The segment identifier, which will be monotonically increasing. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] -pub struct SegmentId(u64); +pub struct SegmentId(u32); +pub type SegmentIdBytes = [u8; 4]; + +impl SegmentId { + pub fn new(id: u32) -> Self { + Self(id) + } + + pub fn as_bytes(&self) -> SegmentIdBytes { + self.0.to_be_bytes() + } + + pub fn from_bytes(bytes: SegmentIdBytes) -> Self { + Self(u32::from_be_bytes(bytes)) + } + + pub fn next(&self) -> Self { + Self(self.0 + 1) + } +} /// The sequence number of a batch of WAL operations. #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] -pub struct SequenceNumber(u64); +pub struct SequenceNumber(u32); + +impl SequenceNumber { + pub fn new(id: u32) -> Self { + Self(id) + } + + pub fn next(&self) -> Self { + Self(self.0 + 1) + } +} #[async_trait] pub trait Persister: Debug + Send + Sync + 'static { @@ -136,21 +165,18 @@ pub trait Persister: Debug + Send + Sync + 'static { fn object_store(&self) -> Arc; } -#[async_trait] pub trait Wal: Debug + Send + Sync + 'static { /// Opens a writer to a segment, either creating a new file or appending to an existing file. - async fn open_segment_writer(&self, segment_id: SegmentId) - -> Result>; + fn open_segment_writer(&self, segment_id: SegmentId) -> wal::Result; /// Opens a reader to a segment file. - async fn open_segment_reader(&self, segment_id: SegmentId) - -> Result>; + fn open_segment_reader(&self, segment_id: SegmentId) -> wal::Result; /// Checks the WAL directory for any segment files and returns them. - async fn segment_files(&self) -> Result>; + fn segment_files(&self) -> wal::Result>; - /// Drops the WAL segment file from disk. - async fn drop_wal_segment(&self, segment_id: SegmentId) -> Result<()>; + /// Deletes the WAL segment file from disk. + fn delete_wal_segment(&self, segment_id: SegmentId) -> wal::Result<()>; } #[derive(Debug)] @@ -161,22 +187,21 @@ pub struct SegmentFile { pub segment_id: SegmentId, } -#[async_trait] pub trait WalSegmentWriter: Debug + Send + Sync + 'static { fn id(&self) -> SegmentId; - async fn write(&self, op: WalOp) -> Result; + fn write_batch(&mut self, ops: Vec) -> wal::Result; } pub trait WalSegmentReader: Debug + Send + Sync + 'static { fn id(&self) -> SegmentId; - fn next_batch(&mut self) -> Result>; + fn next_batch(&mut self) -> wal::Result>; } /// Individual WalOps get batched into the WAL asynchronously. The batch is then written to the segment file. #[derive(Debug, Serialize, Deserialize)] -pub struct WalBatch { +pub struct WalOpBatch { pub sequence_number: SequenceNumber, pub ops: Vec, } @@ -185,14 +210,14 @@ pub struct WalBatch { /// lands in object storage. Things in the WAL are buffered until they are persisted to object storage. The write /// is called an `LpWrite` because it is a write of line protocol and we intend to have a new write protocol for /// 3.0 that supports a different kind of schema. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub enum WalOp { LpWrite(LpWriteOp), } /// A write of 1 or more lines of line protocol to a single database. The default time is set by the server at the /// time the write comes in. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)] pub struct LpWriteOp { pub db_name: String, pub lp: String, diff --git a/influxdb3_write/src/wal.rs b/influxdb3_write/src/wal.rs index bb59a2a1b8..2ad0bb88cc 100644 --- a/influxdb3_write/src/wal.rs +++ b/influxdb3_write/src/wal.rs @@ -1,44 +1,655 @@ //! This is the implementation of the `Wal` that the buffer uses to make buffered data durable //! on disk. -use crate::{SegmentFile, SegmentId, Wal, WalSegmentReader, WalSegmentWriter}; +use crate::{ + SegmentFile, SegmentId, SegmentIdBytes, SequenceNumber, Wal, WalOp, WalOpBatch, + WalSegmentReader, WalSegmentWriter, +}; use async_trait::async_trait; -use std::path::PathBuf; -use std::sync::Arc; +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use crc32fast::Hasher; +use datafusion::parquet::file::reader::Length; +use observability_deps::tracing::{info, warn}; +use snap::read::FrameDecoder; +use std::fmt::Debug; +use std::{ + fs::{File, OpenOptions}, + io::{self, BufReader, Cursor, Read, Write}, + mem, + path::PathBuf, +}; +use thiserror::Error; + +/// The first bytes written into a segment file to identify it and its version. +type FileTypeIdentifier = [u8; 8]; +const FILE_TYPE_IDENTIFIER: &[u8] = b"idb3.001"; + +/// File extension for segment files +const SEGMENT_FILE_EXTENSION: &str = "wal"; + +#[derive(Debug, Error)] +pub enum Error { + #[error("io error: {source}")] + Io { + #[from] + source: io::Error, + }, + + #[error("error converting wal batch to json: {source}")] + Json { + #[from] + source: serde_json::Error, + }, + + #[error("converting u64 to u32: {source}")] + TryFromU64 { + #[from] + source: std::num::TryFromIntError, + }, + #[error("invalid segment file {segment_id:?} at {path:?}: {reason}")] + InvalidSegmentFile { + segment_id: SegmentId, + path: PathBuf, + reason: String, + }, + + #[error("unable to read crc for segment {segment_id:?}")] + UnableToReadCrc { segment_id: SegmentId }, + + #[error("unable to read length for segment data {segment_id:?}")] + UnableToReadLength { segment_id: SegmentId }, + + #[error("length mismatch for segment {segment_id:?}: expected {expected}, got {actual}")] + LengthMismatch { + segment_id: SegmentId, + expected: u32, + actual: u32, + }, + + #[error("checksum mismatch for segment {segment_id:?}: expected {expected}, got {actual}")] + ChecksumMismatch { + segment_id: SegmentId, + expected: u32, + actual: u32, + }, + + #[error("invalid segment file name {0:?}")] + InvalidSegmentFileName(String), + + #[error("unable to parse segment id from file: {source}")] + UnableToParseSegmentId { + file_name: String, + source: std::num::ParseIntError, + }, +} + +pub type Result = std::result::Result; #[derive(Debug)] pub struct WalImpl { - #[allow(dead_code)] - path: PathBuf, + root: PathBuf, } impl WalImpl { - pub fn new(path: impl Into) -> Self { - Self { path: path.into() } + pub fn new(path: impl Into) -> Result { + let root = path.into(); + info!(wal_dir=?root, "Ensuring WAL directory exists"); + std::fs::create_dir_all(&root)?; + + // ensure the directory creation is actually fsync'd so that when we create files there + // we don't lose them (see: https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-pillai.pdf) + File::open(&root) + .expect("should be able to open just-created directory") + .sync_all() + .expect("fsync failure"); + + Ok(Self { root }) + } + + fn open_segment_writer(&self, segment_id: SegmentId) -> Result { + let writer = WalSegmentWriterImpl::new_or_open(self.root.clone(), segment_id)?; + Ok(writer) + } + + fn open_segment_reader(&self, segment_id: SegmentId) -> Result { + let reader = WalSegmentReaderImpl::new(self.root.clone(), segment_id)?; + Ok(reader) + } + + fn segment_files(&self) -> Result> { + let dir = std::fs::read_dir(&self.root)?; + + let mut segment_files = Vec::new(); + + for child in dir.into_iter() { + let child = child?; + let meta = child.metadata()?; + if meta.is_file() { + let path = child.path(); + + if let Some(file_name) = path.file_stem() { + match file_name.to_str() { + Some(file_name) => { + if let Ok(segment_id) = segment_id_from_file_name(file_name) { + segment_files.push(SegmentFile { segment_id, path }); + } else { + warn!( + file_name=?file_name, + "File in wal dir has an invalid file stem that doesn't parse to an integer segment id, ignoring" + ); + } + } + None => { + warn!( + file_name=?file_name, + "File in wal dir has an invalid file stem that isn't valid UTF-8, ignoring" + ); + } + } + } else { + warn!( + path=?path, + "File in wal dir doesn't have a file stem, ignoring" + ); + } + } + } + + segment_files.sort_by_key(|f| f.segment_id); + + Ok(segment_files) + } + + fn delete_wal_segment(&self, segment_id: SegmentId) -> Result<()> { + let path = build_segment_path(self.root.clone(), segment_id); + std::fs::remove_file(path)?; + Ok(()) + } +} + +impl Wal for WalImpl { + fn open_segment_writer(&self, segment_id: SegmentId) -> Result { + self.open_segment_writer(segment_id) + } + + fn open_segment_reader(&self, segment_id: SegmentId) -> Result { + self.open_segment_reader(segment_id) + } + + fn segment_files(&self) -> Result> { + self.segment_files() + } + + fn delete_wal_segment(&self, _segment_id: SegmentId) -> Result<()> { + self.delete_wal_segment(_segment_id) + } +} + +#[derive(Debug)] +pub struct WalSegmentWriterImpl { + segment_id: SegmentId, + f: File, + bytes_written: usize, + sequence_number: SequenceNumber, + + buffer: Vec, +} + +impl WalSegmentWriterImpl { + pub fn new_or_open(root: PathBuf, segment_id: SegmentId) -> Result { + let path = build_segment_path(root, segment_id); + + // if there's already a file there, validate its header and pull the sequence number from the last entry + if path.exists() { + if let Some(file_info) = + WalSegmentReaderImpl::read_segment_file_info_if_exists(path.clone(), segment_id)? + { + let f = OpenOptions::new().write(true).append(true).open(&path)?; + + return Ok(Self { + segment_id, + f, + bytes_written: file_info + .bytes_written + .try_into() + .expect("file length must fit in usize"), + sequence_number: file_info.last_sequence_number, + buffer: Vec::with_capacity(8 * 1204), // 8kiB initial size + }); + } else { + return Err(Error::InvalidSegmentFile { + segment_id, + path, + reason: "file exists but is invalid".to_string(), + }); + } + } + + // it's a new file, initialize it with the header and get ready to start writing + let mut f = OpenOptions::new().write(true).create(true).open(&path)?; + + f.write_all(FILE_TYPE_IDENTIFIER)?; + let file_type_bytes_written = FILE_TYPE_IDENTIFIER.len(); + + let id_bytes = segment_id.as_bytes(); + f.write_all(&id_bytes)?; + let id_bytes_written = id_bytes.len(); + + f.sync_all().expect("fsync failure"); + + let bytes_written = file_type_bytes_written + id_bytes_written; + + Ok(Self { + segment_id, + f, + bytes_written, + sequence_number: SequenceNumber::new(0), + buffer: Vec::with_capacity(8 * 1204), // 8kiB initial size + }) + } + + fn write_batch(&mut self, ops: Vec) -> Result { + // Ensure the write buffer is always empty before using it. + self.buffer.clear(); + + self.sequence_number = self.sequence_number.next(); + + let batch = WalOpBatch { + sequence_number: self.sequence_number, + ops, + }; + + let data = serde_json::to_vec(&batch)?; + + // Only designed to support chunks up to `u32::max` bytes long. + let uncompressed_len = data.len(); + u32::try_from(uncompressed_len)?; + + // The chunk header is two u32 values, so write a dummy u64 value and + // come back to fill them in later. + self.buffer + .write_u64::(0) + .expect("cannot fail to write to buffer"); + + // Compress the payload into the reused buffer, recording the crc hash + // as it is wrote. + let mut encoder = snap::write::FrameEncoder::new(HasherWrapper::new(&mut self.buffer)); + encoder.write_all(&data)?; + let (checksum, buf) = encoder + .into_inner() + .expect("cannot fail to flush to a Vec") + .finalize(); + + // Adjust the compressed length to take into account the u64 padding + // above. + let compressed_len = buf.len() - mem::size_of::(); + let compressed_len = u32::try_from(compressed_len)?; + + // Go back and write the chunk header values + let mut buf = Cursor::new(buf); + buf.set_position(0); + + buf.write_u32::(checksum)?; + buf.write_u32::(compressed_len)?; + + // Write the entire buffer to the file + let buf = buf.into_inner(); + let bytes_written = buf.len(); + self.f.write_all(buf)?; + + // fsync the fd + self.f.sync_all().expect("fsync failure"); + + self.bytes_written += bytes_written; + + Ok(self.sequence_number) } } #[async_trait] -impl Wal for WalImpl { - async fn open_segment_writer( - &self, - _segment_id: SegmentId, - ) -> crate::Result> { - todo!() +impl WalSegmentWriter for WalSegmentWriterImpl { + fn id(&self) -> SegmentId { + self.segment_id } - async fn open_segment_reader( - &self, - _segment_id: SegmentId, - ) -> crate::Result> { - todo!() - } - - async fn segment_files(&self) -> crate::Result> { - todo!() - } - - async fn drop_wal_segment(&self, _segment_id: SegmentId) -> crate::Result<()> { - todo!() + fn write_batch(&mut self, ops: Vec) -> Result { + self.write_batch(ops) + } +} + +#[derive(Debug)] +pub struct WalSegmentReaderImpl { + f: BufReader, + segment_id: SegmentId, +} + +impl WalSegmentReaderImpl { + pub fn new(root: impl Into, segment_id: SegmentId) -> Result { + let path = build_segment_path(root, segment_id); + let f = BufReader::new(File::open(path.clone())?); + + let mut reader = Self { f, segment_id }; + + let (file_type, id) = reader.read_header()?; + + if file_type != FILE_TYPE_IDENTIFIER { + return Err(Error::InvalidSegmentFile { + segment_id, + path, + reason: format!( + "expected file type identifier {:?}, got {:?}", + FILE_TYPE_IDENTIFIER, file_type + ), + }); + } + + if id != segment_id.as_bytes() { + return Err(Error::InvalidSegmentFile { + segment_id, + path, + reason: format!( + "expected segment id {:?} in file, got {:?}", + segment_id.as_bytes(), + id + ), + }); + } + + Ok(reader) + } + + fn read_segment_file_info_if_exists( + path: PathBuf, + segment_id: SegmentId, + ) -> Result> { + let f = match File::open(path.clone()) { + Ok(f) => f, + Err(ref e) if e.kind() == io::ErrorKind::NotFound => return Ok(None), + Err(e) => return Err(e.into()), + }; + + let bytes_written = f.len().try_into()?; + + let mut reader = Self { + f: BufReader::new(f), + segment_id, + }; + + let (file_type, _id) = reader.read_header()?; + + if file_type != FILE_TYPE_IDENTIFIER { + return Err(Error::InvalidSegmentFile { + segment_id, + path, + reason: format!( + "expected file type identifier {:?}, got {:?}", + FILE_TYPE_IDENTIFIER, file_type + ), + }); + } + + let mut last_block = None; + + while let Some(block) = reader.next_segment_block()? { + last_block = Some(block); + } + + if let Some(block) = last_block { + let batch: WalOpBatch = serde_json::from_slice(&block)?; + + Ok(Some(ExistingSegmentFileInfo { + last_sequence_number: batch.sequence_number, + bytes_written, + })) + } else { + Ok(None) + } + } + + pub fn next_batch(&mut self) -> Result> { + if let Some(data) = self.next_segment_block()? { + let batch: WalOpBatch = serde_json::from_slice(&data)?; + + Ok(Some(batch)) + } else { + Ok(None) + } + } + + fn next_segment_block(&mut self) -> Result>> { + let expected_checksum = match self.f.read_u32::() { + Err(ref e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None), + other => other?, + }; + + let expected_len: u32 = self.f.read_u32::()?; + + let compressed_read = self.f.by_ref().take(expected_len.into()); + let hashing_read = CrcReader::new(compressed_read); + let mut decompressing_read = FrameDecoder::new(hashing_read); + + let mut data = Vec::with_capacity(100); + decompressing_read.read_to_end(&mut data)?; + + let (actual_compressed_len, actual_checksum) = decompressing_read.into_inner().checksum(); + let actual_compressed_len = u32::try_from(actual_compressed_len) + .expect("segment blocks are only designed to support chunks up to u32::max bytes long"); + + if expected_len != actual_compressed_len { + return Err(Error::LengthMismatch { + segment_id: self.segment_id, + expected: expected_len, + actual: actual_compressed_len, + }); + } + + if expected_checksum != actual_checksum { + return Err(Error::ChecksumMismatch { + segment_id: self.segment_id, + expected: expected_checksum, + actual: actual_checksum, + }); + } + + Ok(Some(data)) + } + + fn read_array(&mut self) -> Result<[u8; N]> { + let mut data = [0u8; N]; + self.f.read_exact(&mut data)?; + Ok(data) + } + + fn read_header(&mut self) -> Result<(FileTypeIdentifier, SegmentIdBytes)> { + Ok((self.read_array()?, self.read_array()?)) + } +} + +struct ExistingSegmentFileInfo { + last_sequence_number: SequenceNumber, + bytes_written: u32, +} + +impl WalSegmentReader for WalSegmentReaderImpl { + fn id(&self) -> SegmentId { + self.segment_id + } + + fn next_batch(&mut self) -> Result> { + self.next_batch() + } +} + +struct CrcReader { + inner: R, + hasher: Hasher, + bytes_seen: u64, +} + +impl CrcReader { + fn new(inner: R) -> Self { + let hasher = Hasher::default(); + Self { + inner, + hasher, + bytes_seen: 0, + } + } + + fn checksum(self) -> (u64, u32) { + (self.bytes_seen, self.hasher.finalize()) + } +} + +impl Read for CrcReader +where + R: Read, +{ + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let len = self.inner.read(buf)?; + let len_u64 = u64::try_from(len).expect("Only designed to run on 32-bit systems or higher"); + + self.bytes_seen += len_u64; + self.hasher.update(&buf[..len]); + Ok(len) + } +} + +/// A [`HasherWrapper`] acts as a [`Write`] decorator, recording the crc +/// checksum of the data wrote to the inner [`Write`] implementation. +struct HasherWrapper { + inner: W, + hasher: Hasher, +} + +impl HasherWrapper { + fn new(inner: W) -> Self { + Self { + inner, + hasher: Hasher::default(), + } + } + + fn finalize(self) -> (u32, W) { + (self.hasher.finalize(), self.inner) + } +} + +impl Write for HasherWrapper +where + W: Write, +{ + fn write(&mut self, buf: &[u8]) -> io::Result { + self.hasher.update(buf); + self.inner.write(buf) + } + + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } +} + +fn build_segment_path(dir: impl Into, id: SegmentId) -> PathBuf { + let mut path = dir.into(); + path.push(format!("{:010}", id.0)); + path.set_extension(SEGMENT_FILE_EXTENSION); + path +} + +fn segment_id_from_file_name(name: &str) -> Result { + let id = name + .parse::() + .map_err(|source| Error::UnableToParseSegmentId { + file_name: name.to_string(), + source, + })?; + Ok(SegmentId::new(id)) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::LpWriteOp; + + #[test] + fn segment_writer_reader() { + let dir = test_helpers::tmp_dir().unwrap().into_path(); + + let mut writer = WalSegmentWriterImpl::new_or_open(dir.clone(), SegmentId::new(0)).unwrap(); + let wal_op = WalOp::LpWrite(LpWriteOp { + db_name: "foo".to_string(), + lp: "cpu host=a val=10i 10".to_string(), + default_time: 1, + }); + writer.write_batch(vec![wal_op.clone()]).unwrap(); + + let mut reader = WalSegmentReaderImpl::new(dir, SegmentId::new(0)).unwrap(); + let batch = reader.next_batch().unwrap().unwrap(); + + assert_eq!(batch.ops, vec![wal_op]); + assert_eq!(batch.sequence_number, SequenceNumber::new(1)); + } + + #[test] + fn segment_writer_can_open_previously_existing_segment() { + let dir = test_helpers::tmp_dir().unwrap().into_path(); + let wal_op = WalOp::LpWrite(LpWriteOp { + db_name: "foo".to_string(), + lp: "cpu host=a val=10i 10".to_string(), + default_time: 1, + }); + + // open the file, write and close it + { + let mut writer = + WalSegmentWriterImpl::new_or_open(dir.clone(), SegmentId::new(0)).unwrap(); + writer.write_batch(vec![wal_op.clone()]).unwrap(); + } + + // open it again, send a new write in and close it + { + let mut writer = + WalSegmentWriterImpl::new_or_open(dir.clone(), SegmentId::new(0)).unwrap(); + writer.write_batch(vec![wal_op.clone()]).unwrap(); + } + + // ensure that we have two WalOpBatch in the file with the correct sequence numbers + let mut reader = WalSegmentReaderImpl::new(dir, SegmentId::new(0)).unwrap(); + let batch = reader.next_batch().unwrap().unwrap(); + + assert_eq!(batch.ops, vec![wal_op.clone()]); + assert_eq!(batch.sequence_number, SequenceNumber::new(1)); + + let batch = reader.next_batch().unwrap().unwrap(); + + assert_eq!(batch.ops, vec![wal_op]); + assert_eq!(batch.sequence_number, SequenceNumber::new(2)); + } + + #[test] + fn wal_can_open_write_and_read_segments() { + let dir = test_helpers::tmp_dir().unwrap().into_path(); + let wal_op = WalOp::LpWrite(LpWriteOp { + db_name: "foo".to_string(), + lp: "cpu host=a val=10i 10".to_string(), + default_time: 1, + }); + + let wal = WalImpl::new(dir.clone()).unwrap(); + let mut writer = wal.open_segment_writer(SegmentId::new(0)).unwrap(); + writer.write_batch(vec![wal_op.clone()]).unwrap(); + + let mut writer2 = wal.open_segment_writer(SegmentId::new(1)).unwrap(); + writer2.write_batch(vec![wal_op.clone()]).unwrap(); + + let segments = wal.segment_files().unwrap(); + assert_eq!(segments.len(), 2); + assert_eq!(segments[0].segment_id, SegmentId::new(0)); + assert_eq!(segments[1].segment_id, SegmentId::new(1)); + + let mut reader = wal.open_segment_reader(SegmentId::new(0)).unwrap(); + let batch = reader.next_batch().unwrap().unwrap(); + assert_eq!(batch.ops, vec![wal_op.clone()]); + assert_eq!(batch.sequence_number, SequenceNumber::new(1)); } } diff --git a/influxdb3_write/src/write_buffer.rs b/influxdb3_write/src/write_buffer.rs index 8e6eec95cf..238589a899 100644 --- a/influxdb3_write/src/write_buffer.rs +++ b/influxdb3_write/src/write_buffer.rs @@ -60,15 +60,15 @@ pub struct WriteRequest<'a> { } #[derive(Debug)] -pub struct WriteBufferImpl { +pub struct WriteBufferImpl { catalog: Arc, buffered_data: RwLock>, #[allow(dead_code)] - wal: Option>, + wal: Option>, } -impl WriteBufferImpl { - pub fn new(catalog: Arc, wal: Option>) -> Self { +impl WriteBufferImpl { + pub fn new(catalog: Arc, wal: Option>) -> Self { Self { catalog, buffered_data: RwLock::new(HashMap::new()), @@ -179,7 +179,7 @@ impl WriteBufferImpl { } #[async_trait] -impl Bufferer for WriteBufferImpl { +impl Bufferer for WriteBufferImpl { async fn write_lp( &self, database: NamespaceName<'static>, @@ -201,12 +201,12 @@ impl Bufferer for WriteBufferImpl { todo!() } - fn wal(&self) -> Option> { + fn wal(&self) -> Option> { self.wal.clone() } } -impl ChunkContainer for WriteBufferImpl { +impl ChunkContainer for WriteBufferImpl { fn get_table_chunks( &self, database_name: &str, @@ -219,7 +219,7 @@ impl ChunkContainer for WriteBufferImpl { } } -impl WriteBuffer for WriteBufferImpl {} +impl WriteBuffer for WriteBufferImpl {} #[derive(Debug, Default)] struct DatabaseBuffer {