feat: add basic wal implementation for Edge (#24570)

* feat: add basic wal implementation for Edge

This WAL implementation uses some of the code from the wal crate, but departs pretty significantly from it in many ways. For now it uses simple JSON encoding for the serialized ops, but we may want to switch that to Protobuf at some point in the future. This version of the wal doesn't have its own buffering. That will be implemented higher up in the BufferImpl, which will use the wal and SegmentWriter to make data in the buffer durable.

The write flow will be that writes will come into the buffer and validate/update against an in memory Catalog. Once validated, writes will get buffered up in memory and then flushed into the WAL periodically (likely every 10-20ms). After being flushed to the wal, the entire batch of writes will be put into the in memory queryable buffer. After that responses will be sent back to the clients. This should reduce the write lock pressure on the in-memory buffer considerably.

In this PR:
- Update the Wal, WalSegmentWriter, and WalSegmentReader traits to line up with new design/understanding
- Implement wal (mainly just a way to identify segment files in a directory)
- Implement WalSegmentWriter (write header, op batch with crc, and track sequence number in segment, re-open existing file)
- Implement WalSegmentReader

* refactor: make Wal return impl reader/writer

* refactor: clean up wal segment open

* fix: WriteBuffer and Wal usage

Turn wal and write buffer references into a concrete type, rather than dyn.

* fix: have wal loading ignore invalid files
pull/24577/head
Paul Dix 2024-01-12 11:52:28 -05:00 committed by GitHub
parent 028a05fbde
commit 02b4d28637
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 736 additions and 70 deletions

5
Cargo.lock generated
View File

@ -2388,7 +2388,9 @@ version = "0.1.0"
dependencies = [
"arrow",
"async-trait",
"byteorder",
"chrono",
"crc32fast",
"data_types",
"datafusion",
"influxdb-line-protocol",
@ -2400,7 +2402,10 @@ dependencies = [
"schema",
"serde",
"serde_json",
"snap",
"test_helpers",
"thiserror",
"tokio",
"workspace-hack",
]

View File

@ -11,7 +11,6 @@ use influxdb3_server::{query_executor::QueryExecutorImpl, serve, CommonServerSta
use influxdb3_write::persister::PersisterImpl;
use influxdb3_write::wal::WalImpl;
use influxdb3_write::write_buffer::WriteBufferImpl;
use influxdb3_write::Wal;
use iox_query::exec::{Executor, ExecutorConfig};
use ioxd_common::reexport::trace_http::ctx::TraceHeaderParser;
use object_store::DynObjectStore;
@ -46,6 +45,9 @@ pub enum Error {
#[error("Server error: {0}")]
Server(#[from] influxdb3_server::Error),
#[error("Wal error: {0}")]
Wal(#[from] influxdb3_write::wal::Error),
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -231,9 +233,10 @@ pub async fn command(config: Config) -> Result<()> {
*config.http_bind_address,
);
let catalog = Arc::new(influxdb3_write::catalog::Catalog::new());
let wal: Option<Arc<dyn Wal>> = config
let wal: Option<Arc<WalImpl>> = config
.wal_directory
.map(|dir| Arc::new(WalImpl::new(dir)) as _);
.map(|dir| WalImpl::new(dir).map(Arc::new))
.transpose()?;
let write_buffer = Arc::new(WriteBufferImpl::new(Arc::clone(&catalog), wal));
let query_executor = QueryExecutorImpl::new(
catalog,

View File

@ -199,7 +199,7 @@ mod tests {
let write_buffer = Arc::new(influxdb3_write::write_buffer::WriteBufferImpl::new(
Arc::clone(&catalog),
None,
None::<Arc<influxdb3_write::wal::WalImpl>>,
));
let query_executor = crate::query_executor::QueryExecutorImpl::new(
catalog,

View File

@ -107,7 +107,7 @@ impl<W: WriteBuffer> QueryExecutor for QueryExecutorImpl<W> {
// This implementation is for the Flight service
#[async_trait]
impl<W: WriteBuffer> QueryNamespaceProvider for QueryExecutorImpl<W> {
type Db = QueryDatabase;
type Db = QueryDatabase<W>;
async fn db(
&self,
@ -135,18 +135,18 @@ impl<W: WriteBuffer> QueryNamespaceProvider for QueryExecutorImpl<W> {
}
}
#[derive(Debug, Clone)]
pub struct QueryDatabase {
#[derive(Debug)]
pub struct QueryDatabase<B> {
db_schema: Arc<DatabaseSchema>,
write_buffer: Arc<dyn WriteBuffer>,
write_buffer: Arc<B>,
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
}
impl QueryDatabase {
impl<B: WriteBuffer> QueryDatabase<B> {
pub fn new(
db_schema: Arc<DatabaseSchema>,
write_buffer: Arc<dyn WriteBuffer>,
write_buffer: Arc<B>,
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
) -> Self {
@ -160,7 +160,7 @@ impl QueryDatabase {
}
#[async_trait]
impl QueryNamespace for QueryDatabase {
impl<B: WriteBuffer> QueryNamespace for QueryDatabase<B> {
async fn chunks(
&self,
_table_name: &str,
@ -189,10 +189,17 @@ impl QueryNamespace for QueryDatabase {
}
fn new_query_context(&self, span_ctx: Option<SpanContext>) -> IOxSessionContext {
let qdb = Self::new(
Arc::clone(&self.db_schema),
Arc::clone(&self.write_buffer),
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
);
let mut cfg = self
.exec
.new_execution_config(ExecutorType::Query)
.with_default_catalog(Arc::new(self.clone()))
.with_default_catalog(Arc::new(qdb))
.with_span_context(span_ctx);
for (k, v) in self.datafusion_config.as_ref() {
@ -203,7 +210,7 @@ impl QueryNamespace for QueryDatabase {
}
}
impl CatalogProvider for QueryDatabase {
impl<B: WriteBuffer> CatalogProvider for QueryDatabase<B> {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
@ -215,15 +222,22 @@ impl CatalogProvider for QueryDatabase {
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
info!("CatalogProvider schema {}", name);
let qdb = Self::new(
Arc::clone(&self.db_schema),
Arc::clone(&self.write_buffer),
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
);
match name {
DEFAULT_SCHEMA => Some(Arc::new(self.clone())),
DEFAULT_SCHEMA => Some(Arc::new(qdb)),
_ => None,
}
}
}
#[async_trait]
impl SchemaProvider for QueryDatabase {
impl<B: WriteBuffer> SchemaProvider for QueryDatabase<B> {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}
@ -253,14 +267,14 @@ impl SchemaProvider for QueryDatabase {
}
#[derive(Debug)]
pub struct QueryTable {
pub struct QueryTable<B> {
db_schema: Arc<DatabaseSchema>,
name: Arc<str>,
schema: Schema,
write_buffer: Arc<dyn WriteBuffer>,
write_buffer: Arc<B>,
}
impl QueryTable {
impl<B: WriteBuffer> QueryTable<B> {
fn chunks(
&self,
ctx: &SessionState,
@ -279,7 +293,7 @@ impl QueryTable {
}
#[async_trait]
impl TableProvider for QueryTable {
impl<B: WriteBuffer> TableProvider for QueryTable<B> {
fn as_any(&self) -> &dyn Any {
self as &dyn Any
}

View File

@ -18,9 +18,17 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" }
arrow = { workspace = true }
async-trait = "0.1"
byteorder = "1.3.4"
chrono = "0.4"
crc32fast = "1.2.0"
datafusion = { workspace = true }
parking_lot = "0.11.1"
thiserror = "1.0"
tokio = { version = "1.35", features = ["macros", "fs", "io-util", "parking_lot", "rt-multi-thread", "sync", "time"] }
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.107"
snap = "1.0.0"
[dev-dependencies]
test_helpers = { path = "../test_helpers" }

View File

@ -75,7 +75,7 @@ pub trait Bufferer: Debug + Send + Sync + 'static {
) -> Result<Vec<Arc<dyn BufferSegment>>>;
/// Returns the configured WAL, if there is one.
fn wal(&self) -> Option<Arc<dyn Wal>>;
fn wal(&self) -> Option<Arc<impl Wal>>;
}
/// A segment in the buffer that corresponds to a single WAL segment file. It contains a catalog with any updates
@ -109,11 +109,40 @@ pub trait ChunkContainer: Debug + Send + Sync + 'static {
/// The segment identifier, which will be monotonically increasing.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct SegmentId(u64);
pub struct SegmentId(u32);
pub type SegmentIdBytes = [u8; 4];
impl SegmentId {
pub fn new(id: u32) -> Self {
Self(id)
}
pub fn as_bytes(&self) -> SegmentIdBytes {
self.0.to_be_bytes()
}
pub fn from_bytes(bytes: SegmentIdBytes) -> Self {
Self(u32::from_be_bytes(bytes))
}
pub fn next(&self) -> Self {
Self(self.0 + 1)
}
}
/// The sequence number of a batch of WAL operations.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct SequenceNumber(u64);
pub struct SequenceNumber(u32);
impl SequenceNumber {
pub fn new(id: u32) -> Self {
Self(id)
}
pub fn next(&self) -> Self {
Self(self.0 + 1)
}
}
#[async_trait]
pub trait Persister: Debug + Send + Sync + 'static {
@ -136,21 +165,18 @@ pub trait Persister: Debug + Send + Sync + 'static {
fn object_store(&self) -> Arc<dyn object_store::ObjectStore>;
}
#[async_trait]
pub trait Wal: Debug + Send + Sync + 'static {
/// Opens a writer to a segment, either creating a new file or appending to an existing file.
async fn open_segment_writer(&self, segment_id: SegmentId)
-> Result<Arc<dyn WalSegmentWriter>>;
fn open_segment_writer(&self, segment_id: SegmentId) -> wal::Result<impl WalSegmentWriter>;
/// Opens a reader to a segment file.
async fn open_segment_reader(&self, segment_id: SegmentId)
-> Result<Arc<dyn WalSegmentReader>>;
fn open_segment_reader(&self, segment_id: SegmentId) -> wal::Result<impl WalSegmentReader>;
/// Checks the WAL directory for any segment files and returns them.
async fn segment_files(&self) -> Result<Vec<SegmentFile>>;
fn segment_files(&self) -> wal::Result<Vec<SegmentFile>>;
/// Drops the WAL segment file from disk.
async fn drop_wal_segment(&self, segment_id: SegmentId) -> Result<()>;
/// Deletes the WAL segment file from disk.
fn delete_wal_segment(&self, segment_id: SegmentId) -> wal::Result<()>;
}
#[derive(Debug)]
@ -161,22 +187,21 @@ pub struct SegmentFile {
pub segment_id: SegmentId,
}
#[async_trait]
pub trait WalSegmentWriter: Debug + Send + Sync + 'static {
fn id(&self) -> SegmentId;
async fn write(&self, op: WalOp) -> Result<SequenceNumber>;
fn write_batch(&mut self, ops: Vec<WalOp>) -> wal::Result<SequenceNumber>;
}
pub trait WalSegmentReader: Debug + Send + Sync + 'static {
fn id(&self) -> SegmentId;
fn next_batch(&mut self) -> Result<Option<WalBatch>>;
fn next_batch(&mut self) -> wal::Result<Option<WalOpBatch>>;
}
/// Individual WalOps get batched into the WAL asynchronously. The batch is then written to the segment file.
#[derive(Debug, Serialize, Deserialize)]
pub struct WalBatch {
pub struct WalOpBatch {
pub sequence_number: SequenceNumber,
pub ops: Vec<WalOp>,
}
@ -185,14 +210,14 @@ pub struct WalBatch {
/// lands in object storage. Things in the WAL are buffered until they are persisted to object storage. The write
/// is called an `LpWrite` because it is a write of line protocol and we intend to have a new write protocol for
/// 3.0 that supports a different kind of schema.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub enum WalOp {
LpWrite(LpWriteOp),
}
/// A write of 1 or more lines of line protocol to a single database. The default time is set by the server at the
/// time the write comes in.
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize, Eq, PartialEq)]
pub struct LpWriteOp {
pub db_name: String,
pub lp: String,

View File

@ -1,44 +1,655 @@
//! This is the implementation of the `Wal` that the buffer uses to make buffered data durable
//! on disk.
use crate::{SegmentFile, SegmentId, Wal, WalSegmentReader, WalSegmentWriter};
use crate::{
SegmentFile, SegmentId, SegmentIdBytes, SequenceNumber, Wal, WalOp, WalOpBatch,
WalSegmentReader, WalSegmentWriter,
};
use async_trait::async_trait;
use std::path::PathBuf;
use std::sync::Arc;
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use crc32fast::Hasher;
use datafusion::parquet::file::reader::Length;
use observability_deps::tracing::{info, warn};
use snap::read::FrameDecoder;
use std::fmt::Debug;
use std::{
fs::{File, OpenOptions},
io::{self, BufReader, Cursor, Read, Write},
mem,
path::PathBuf,
};
use thiserror::Error;
/// The first bytes written into a segment file to identify it and its version.
type FileTypeIdentifier = [u8; 8];
const FILE_TYPE_IDENTIFIER: &[u8] = b"idb3.001";
/// File extension for segment files
const SEGMENT_FILE_EXTENSION: &str = "wal";
#[derive(Debug, Error)]
pub enum Error {
#[error("io error: {source}")]
Io {
#[from]
source: io::Error,
},
#[error("error converting wal batch to json: {source}")]
Json {
#[from]
source: serde_json::Error,
},
#[error("converting u64 to u32: {source}")]
TryFromU64 {
#[from]
source: std::num::TryFromIntError,
},
#[error("invalid segment file {segment_id:?} at {path:?}: {reason}")]
InvalidSegmentFile {
segment_id: SegmentId,
path: PathBuf,
reason: String,
},
#[error("unable to read crc for segment {segment_id:?}")]
UnableToReadCrc { segment_id: SegmentId },
#[error("unable to read length for segment data {segment_id:?}")]
UnableToReadLength { segment_id: SegmentId },
#[error("length mismatch for segment {segment_id:?}: expected {expected}, got {actual}")]
LengthMismatch {
segment_id: SegmentId,
expected: u32,
actual: u32,
},
#[error("checksum mismatch for segment {segment_id:?}: expected {expected}, got {actual}")]
ChecksumMismatch {
segment_id: SegmentId,
expected: u32,
actual: u32,
},
#[error("invalid segment file name {0:?}")]
InvalidSegmentFileName(String),
#[error("unable to parse segment id from file: {source}")]
UnableToParseSegmentId {
file_name: String,
source: std::num::ParseIntError,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Debug)]
pub struct WalImpl {
#[allow(dead_code)]
path: PathBuf,
root: PathBuf,
}
impl WalImpl {
pub fn new(path: impl Into<PathBuf>) -> Self {
Self { path: path.into() }
pub fn new(path: impl Into<PathBuf>) -> Result<Self> {
let root = path.into();
info!(wal_dir=?root, "Ensuring WAL directory exists");
std::fs::create_dir_all(&root)?;
// ensure the directory creation is actually fsync'd so that when we create files there
// we don't lose them (see: https://www.usenix.org/system/files/conference/osdi14/osdi14-paper-pillai.pdf)
File::open(&root)
.expect("should be able to open just-created directory")
.sync_all()
.expect("fsync failure");
Ok(Self { root })
}
fn open_segment_writer(&self, segment_id: SegmentId) -> Result<impl WalSegmentWriter> {
let writer = WalSegmentWriterImpl::new_or_open(self.root.clone(), segment_id)?;
Ok(writer)
}
fn open_segment_reader(&self, segment_id: SegmentId) -> Result<impl WalSegmentReader> {
let reader = WalSegmentReaderImpl::new(self.root.clone(), segment_id)?;
Ok(reader)
}
fn segment_files(&self) -> Result<Vec<SegmentFile>> {
let dir = std::fs::read_dir(&self.root)?;
let mut segment_files = Vec::new();
for child in dir.into_iter() {
let child = child?;
let meta = child.metadata()?;
if meta.is_file() {
let path = child.path();
if let Some(file_name) = path.file_stem() {
match file_name.to_str() {
Some(file_name) => {
if let Ok(segment_id) = segment_id_from_file_name(file_name) {
segment_files.push(SegmentFile { segment_id, path });
} else {
warn!(
file_name=?file_name,
"File in wal dir has an invalid file stem that doesn't parse to an integer segment id, ignoring"
);
}
}
None => {
warn!(
file_name=?file_name,
"File in wal dir has an invalid file stem that isn't valid UTF-8, ignoring"
);
}
}
} else {
warn!(
path=?path,
"File in wal dir doesn't have a file stem, ignoring"
);
}
}
}
segment_files.sort_by_key(|f| f.segment_id);
Ok(segment_files)
}
fn delete_wal_segment(&self, segment_id: SegmentId) -> Result<()> {
let path = build_segment_path(self.root.clone(), segment_id);
std::fs::remove_file(path)?;
Ok(())
}
}
impl Wal for WalImpl {
fn open_segment_writer(&self, segment_id: SegmentId) -> Result<impl WalSegmentWriter> {
self.open_segment_writer(segment_id)
}
fn open_segment_reader(&self, segment_id: SegmentId) -> Result<impl WalSegmentReader> {
self.open_segment_reader(segment_id)
}
fn segment_files(&self) -> Result<Vec<SegmentFile>> {
self.segment_files()
}
fn delete_wal_segment(&self, _segment_id: SegmentId) -> Result<()> {
self.delete_wal_segment(_segment_id)
}
}
#[derive(Debug)]
pub struct WalSegmentWriterImpl {
segment_id: SegmentId,
f: File,
bytes_written: usize,
sequence_number: SequenceNumber,
buffer: Vec<u8>,
}
impl WalSegmentWriterImpl {
pub fn new_or_open(root: PathBuf, segment_id: SegmentId) -> Result<Self> {
let path = build_segment_path(root, segment_id);
// if there's already a file there, validate its header and pull the sequence number from the last entry
if path.exists() {
if let Some(file_info) =
WalSegmentReaderImpl::read_segment_file_info_if_exists(path.clone(), segment_id)?
{
let f = OpenOptions::new().write(true).append(true).open(&path)?;
return Ok(Self {
segment_id,
f,
bytes_written: file_info
.bytes_written
.try_into()
.expect("file length must fit in usize"),
sequence_number: file_info.last_sequence_number,
buffer: Vec::with_capacity(8 * 1204), // 8kiB initial size
});
} else {
return Err(Error::InvalidSegmentFile {
segment_id,
path,
reason: "file exists but is invalid".to_string(),
});
}
}
// it's a new file, initialize it with the header and get ready to start writing
let mut f = OpenOptions::new().write(true).create(true).open(&path)?;
f.write_all(FILE_TYPE_IDENTIFIER)?;
let file_type_bytes_written = FILE_TYPE_IDENTIFIER.len();
let id_bytes = segment_id.as_bytes();
f.write_all(&id_bytes)?;
let id_bytes_written = id_bytes.len();
f.sync_all().expect("fsync failure");
let bytes_written = file_type_bytes_written + id_bytes_written;
Ok(Self {
segment_id,
f,
bytes_written,
sequence_number: SequenceNumber::new(0),
buffer: Vec::with_capacity(8 * 1204), // 8kiB initial size
})
}
fn write_batch(&mut self, ops: Vec<WalOp>) -> Result<SequenceNumber> {
// Ensure the write buffer is always empty before using it.
self.buffer.clear();
self.sequence_number = self.sequence_number.next();
let batch = WalOpBatch {
sequence_number: self.sequence_number,
ops,
};
let data = serde_json::to_vec(&batch)?;
// Only designed to support chunks up to `u32::max` bytes long.
let uncompressed_len = data.len();
u32::try_from(uncompressed_len)?;
// The chunk header is two u32 values, so write a dummy u64 value and
// come back to fill them in later.
self.buffer
.write_u64::<BigEndian>(0)
.expect("cannot fail to write to buffer");
// Compress the payload into the reused buffer, recording the crc hash
// as it is wrote.
let mut encoder = snap::write::FrameEncoder::new(HasherWrapper::new(&mut self.buffer));
encoder.write_all(&data)?;
let (checksum, buf) = encoder
.into_inner()
.expect("cannot fail to flush to a Vec")
.finalize();
// Adjust the compressed length to take into account the u64 padding
// above.
let compressed_len = buf.len() - mem::size_of::<u64>();
let compressed_len = u32::try_from(compressed_len)?;
// Go back and write the chunk header values
let mut buf = Cursor::new(buf);
buf.set_position(0);
buf.write_u32::<BigEndian>(checksum)?;
buf.write_u32::<BigEndian>(compressed_len)?;
// Write the entire buffer to the file
let buf = buf.into_inner();
let bytes_written = buf.len();
self.f.write_all(buf)?;
// fsync the fd
self.f.sync_all().expect("fsync failure");
self.bytes_written += bytes_written;
Ok(self.sequence_number)
}
}
#[async_trait]
impl Wal for WalImpl {
async fn open_segment_writer(
&self,
_segment_id: SegmentId,
) -> crate::Result<Arc<dyn WalSegmentWriter>> {
todo!()
impl WalSegmentWriter for WalSegmentWriterImpl {
fn id(&self) -> SegmentId {
self.segment_id
}
async fn open_segment_reader(
&self,
_segment_id: SegmentId,
) -> crate::Result<Arc<dyn WalSegmentReader>> {
todo!()
}
async fn segment_files(&self) -> crate::Result<Vec<SegmentFile>> {
todo!()
}
async fn drop_wal_segment(&self, _segment_id: SegmentId) -> crate::Result<()> {
todo!()
fn write_batch(&mut self, ops: Vec<WalOp>) -> Result<SequenceNumber> {
self.write_batch(ops)
}
}
#[derive(Debug)]
pub struct WalSegmentReaderImpl {
f: BufReader<File>,
segment_id: SegmentId,
}
impl WalSegmentReaderImpl {
pub fn new(root: impl Into<PathBuf>, segment_id: SegmentId) -> Result<Self> {
let path = build_segment_path(root, segment_id);
let f = BufReader::new(File::open(path.clone())?);
let mut reader = Self { f, segment_id };
let (file_type, id) = reader.read_header()?;
if file_type != FILE_TYPE_IDENTIFIER {
return Err(Error::InvalidSegmentFile {
segment_id,
path,
reason: format!(
"expected file type identifier {:?}, got {:?}",
FILE_TYPE_IDENTIFIER, file_type
),
});
}
if id != segment_id.as_bytes() {
return Err(Error::InvalidSegmentFile {
segment_id,
path,
reason: format!(
"expected segment id {:?} in file, got {:?}",
segment_id.as_bytes(),
id
),
});
}
Ok(reader)
}
fn read_segment_file_info_if_exists(
path: PathBuf,
segment_id: SegmentId,
) -> Result<Option<ExistingSegmentFileInfo>> {
let f = match File::open(path.clone()) {
Ok(f) => f,
Err(ref e) if e.kind() == io::ErrorKind::NotFound => return Ok(None),
Err(e) => return Err(e.into()),
};
let bytes_written = f.len().try_into()?;
let mut reader = Self {
f: BufReader::new(f),
segment_id,
};
let (file_type, _id) = reader.read_header()?;
if file_type != FILE_TYPE_IDENTIFIER {
return Err(Error::InvalidSegmentFile {
segment_id,
path,
reason: format!(
"expected file type identifier {:?}, got {:?}",
FILE_TYPE_IDENTIFIER, file_type
),
});
}
let mut last_block = None;
while let Some(block) = reader.next_segment_block()? {
last_block = Some(block);
}
if let Some(block) = last_block {
let batch: WalOpBatch = serde_json::from_slice(&block)?;
Ok(Some(ExistingSegmentFileInfo {
last_sequence_number: batch.sequence_number,
bytes_written,
}))
} else {
Ok(None)
}
}
pub fn next_batch(&mut self) -> Result<Option<WalOpBatch>> {
if let Some(data) = self.next_segment_block()? {
let batch: WalOpBatch = serde_json::from_slice(&data)?;
Ok(Some(batch))
} else {
Ok(None)
}
}
fn next_segment_block(&mut self) -> Result<Option<Vec<u8>>> {
let expected_checksum = match self.f.read_u32::<BigEndian>() {
Err(ref e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(None),
other => other?,
};
let expected_len: u32 = self.f.read_u32::<BigEndian>()?;
let compressed_read = self.f.by_ref().take(expected_len.into());
let hashing_read = CrcReader::new(compressed_read);
let mut decompressing_read = FrameDecoder::new(hashing_read);
let mut data = Vec::with_capacity(100);
decompressing_read.read_to_end(&mut data)?;
let (actual_compressed_len, actual_checksum) = decompressing_read.into_inner().checksum();
let actual_compressed_len = u32::try_from(actual_compressed_len)
.expect("segment blocks are only designed to support chunks up to u32::max bytes long");
if expected_len != actual_compressed_len {
return Err(Error::LengthMismatch {
segment_id: self.segment_id,
expected: expected_len,
actual: actual_compressed_len,
});
}
if expected_checksum != actual_checksum {
return Err(Error::ChecksumMismatch {
segment_id: self.segment_id,
expected: expected_checksum,
actual: actual_checksum,
});
}
Ok(Some(data))
}
fn read_array<const N: usize>(&mut self) -> Result<[u8; N]> {
let mut data = [0u8; N];
self.f.read_exact(&mut data)?;
Ok(data)
}
fn read_header(&mut self) -> Result<(FileTypeIdentifier, SegmentIdBytes)> {
Ok((self.read_array()?, self.read_array()?))
}
}
struct ExistingSegmentFileInfo {
last_sequence_number: SequenceNumber,
bytes_written: u32,
}
impl WalSegmentReader for WalSegmentReaderImpl {
fn id(&self) -> SegmentId {
self.segment_id
}
fn next_batch(&mut self) -> Result<Option<WalOpBatch>> {
self.next_batch()
}
}
struct CrcReader<R> {
inner: R,
hasher: Hasher,
bytes_seen: u64,
}
impl<R> CrcReader<R> {
fn new(inner: R) -> Self {
let hasher = Hasher::default();
Self {
inner,
hasher,
bytes_seen: 0,
}
}
fn checksum(self) -> (u64, u32) {
(self.bytes_seen, self.hasher.finalize())
}
}
impl<R> Read for CrcReader<R>
where
R: Read,
{
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
let len = self.inner.read(buf)?;
let len_u64 = u64::try_from(len).expect("Only designed to run on 32-bit systems or higher");
self.bytes_seen += len_u64;
self.hasher.update(&buf[..len]);
Ok(len)
}
}
/// A [`HasherWrapper`] acts as a [`Write`] decorator, recording the crc
/// checksum of the data wrote to the inner [`Write`] implementation.
struct HasherWrapper<W> {
inner: W,
hasher: Hasher,
}
impl<W> HasherWrapper<W> {
fn new(inner: W) -> Self {
Self {
inner,
hasher: Hasher::default(),
}
}
fn finalize(self) -> (u32, W) {
(self.hasher.finalize(), self.inner)
}
}
impl<W> Write for HasherWrapper<W>
where
W: Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.hasher.update(buf);
self.inner.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
fn build_segment_path(dir: impl Into<PathBuf>, id: SegmentId) -> PathBuf {
let mut path = dir.into();
path.push(format!("{:010}", id.0));
path.set_extension(SEGMENT_FILE_EXTENSION);
path
}
fn segment_id_from_file_name(name: &str) -> Result<SegmentId> {
let id = name
.parse::<u32>()
.map_err(|source| Error::UnableToParseSegmentId {
file_name: name.to_string(),
source,
})?;
Ok(SegmentId::new(id))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::LpWriteOp;
#[test]
fn segment_writer_reader() {
let dir = test_helpers::tmp_dir().unwrap().into_path();
let mut writer = WalSegmentWriterImpl::new_or_open(dir.clone(), SegmentId::new(0)).unwrap();
let wal_op = WalOp::LpWrite(LpWriteOp {
db_name: "foo".to_string(),
lp: "cpu host=a val=10i 10".to_string(),
default_time: 1,
});
writer.write_batch(vec![wal_op.clone()]).unwrap();
let mut reader = WalSegmentReaderImpl::new(dir, SegmentId::new(0)).unwrap();
let batch = reader.next_batch().unwrap().unwrap();
assert_eq!(batch.ops, vec![wal_op]);
assert_eq!(batch.sequence_number, SequenceNumber::new(1));
}
#[test]
fn segment_writer_can_open_previously_existing_segment() {
let dir = test_helpers::tmp_dir().unwrap().into_path();
let wal_op = WalOp::LpWrite(LpWriteOp {
db_name: "foo".to_string(),
lp: "cpu host=a val=10i 10".to_string(),
default_time: 1,
});
// open the file, write and close it
{
let mut writer =
WalSegmentWriterImpl::new_or_open(dir.clone(), SegmentId::new(0)).unwrap();
writer.write_batch(vec![wal_op.clone()]).unwrap();
}
// open it again, send a new write in and close it
{
let mut writer =
WalSegmentWriterImpl::new_or_open(dir.clone(), SegmentId::new(0)).unwrap();
writer.write_batch(vec![wal_op.clone()]).unwrap();
}
// ensure that we have two WalOpBatch in the file with the correct sequence numbers
let mut reader = WalSegmentReaderImpl::new(dir, SegmentId::new(0)).unwrap();
let batch = reader.next_batch().unwrap().unwrap();
assert_eq!(batch.ops, vec![wal_op.clone()]);
assert_eq!(batch.sequence_number, SequenceNumber::new(1));
let batch = reader.next_batch().unwrap().unwrap();
assert_eq!(batch.ops, vec![wal_op]);
assert_eq!(batch.sequence_number, SequenceNumber::new(2));
}
#[test]
fn wal_can_open_write_and_read_segments() {
let dir = test_helpers::tmp_dir().unwrap().into_path();
let wal_op = WalOp::LpWrite(LpWriteOp {
db_name: "foo".to_string(),
lp: "cpu host=a val=10i 10".to_string(),
default_time: 1,
});
let wal = WalImpl::new(dir.clone()).unwrap();
let mut writer = wal.open_segment_writer(SegmentId::new(0)).unwrap();
writer.write_batch(vec![wal_op.clone()]).unwrap();
let mut writer2 = wal.open_segment_writer(SegmentId::new(1)).unwrap();
writer2.write_batch(vec![wal_op.clone()]).unwrap();
let segments = wal.segment_files().unwrap();
assert_eq!(segments.len(), 2);
assert_eq!(segments[0].segment_id, SegmentId::new(0));
assert_eq!(segments[1].segment_id, SegmentId::new(1));
let mut reader = wal.open_segment_reader(SegmentId::new(0)).unwrap();
let batch = reader.next_batch().unwrap().unwrap();
assert_eq!(batch.ops, vec![wal_op.clone()]);
assert_eq!(batch.sequence_number, SequenceNumber::new(1));
}
}

View File

@ -60,15 +60,15 @@ pub struct WriteRequest<'a> {
}
#[derive(Debug)]
pub struct WriteBufferImpl {
pub struct WriteBufferImpl<W> {
catalog: Arc<Catalog>,
buffered_data: RwLock<HashMap<String, DatabaseBuffer>>,
#[allow(dead_code)]
wal: Option<Arc<dyn Wal>>,
wal: Option<Arc<W>>,
}
impl WriteBufferImpl {
pub fn new(catalog: Arc<Catalog>, wal: Option<Arc<dyn Wal>>) -> Self {
impl<W: Wal> WriteBufferImpl<W> {
pub fn new(catalog: Arc<Catalog>, wal: Option<Arc<W>>) -> Self {
Self {
catalog,
buffered_data: RwLock::new(HashMap::new()),
@ -179,7 +179,7 @@ impl WriteBufferImpl {
}
#[async_trait]
impl Bufferer for WriteBufferImpl {
impl<W: Wal> Bufferer for WriteBufferImpl<W> {
async fn write_lp(
&self,
database: NamespaceName<'static>,
@ -201,12 +201,12 @@ impl Bufferer for WriteBufferImpl {
todo!()
}
fn wal(&self) -> Option<Arc<dyn Wal>> {
fn wal(&self) -> Option<Arc<impl Wal>> {
self.wal.clone()
}
}
impl ChunkContainer for WriteBufferImpl {
impl<W: Wal> ChunkContainer for WriteBufferImpl<W> {
fn get_table_chunks(
&self,
database_name: &str,
@ -219,7 +219,7 @@ impl ChunkContainer for WriteBufferImpl {
}
}
impl WriteBuffer for WriteBufferImpl {}
impl<W: Wal> WriteBuffer for WriteBufferImpl<W> {}
#[derive(Debug, Default)]
struct DatabaseBuffer {