Merge pull request #7729 from influxdata/savage/wal-inspect-library-refactor

feat(wal): Add `wal_inspect` crate & a write op entry decoder
pull/24376/head
kodiakhq[bot] 2023-05-16 11:33:37 +00:00 committed by GitHub
commit 74a7b7102b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 618 additions and 21 deletions

24
Cargo.lock generated
View File

@ -6309,6 +6309,7 @@ dependencies = [
name = "wal"
version = "0.1.0"
dependencies = [
"assert_matches",
"async-trait",
"byteorder",
"bytes",
@ -6317,6 +6318,8 @@ dependencies = [
"dml",
"futures",
"generated_types",
"hashbrown 0.13.2",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"observability_deps",
@ -6333,6 +6336,27 @@ dependencies = [
"workspace-hack",
]
[[package]]
name = "wal_inspect"
version = "0.1.0"
dependencies = [
"assert_matches",
"data_types",
"dml",
"generated_types",
"hashbrown 0.13.2",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_pb",
"parquet_to_line_protocol",
"schema",
"test_helpers",
"thiserror",
"tokio",
"wal",
"workspace-hack",
]
[[package]]
name = "walkdir"
version = "2.3.3"

View File

@ -80,6 +80,7 @@ members = [
"tracker",
"trogging",
"wal",
"wal_inspect",
"workspace-hack",
]
default-members = ["influxdb_iox"]

View File

@ -10,7 +10,7 @@ use influxdb_line_protocol::{builder::FieldValue, FieldValue as LPFieldValue};
use schema::{InfluxColumnType, InfluxFieldType, Schema};
/// Converts a [`RecordBatch`] into line protocol lines.
pub(crate) fn convert_to_lines(
pub fn convert_to_lines(
measurement_name: &str,
iox_schema: &Schema,
batch: &RecordBatch,

View File

@ -31,7 +31,7 @@ use std::{
sync::Arc,
};
mod batch;
use batch::convert_to_lines;
pub use batch::convert_to_lines;
pub type Result<T = (), E = Error> = std::result::Result<T, E>;
#[derive(Debug, Snafu)]

View File

@ -13,6 +13,9 @@ crc32fast = "1.2.0"
data_types = { path = "../data_types" }
futures = "0.3"
generated_types = { path = "../generated_types" }
hashbrown.workspace = true
mutable_batch = { version = "0.1.0", path = "../mutable_batch" }
mutable_batch_pb = { version = "0.1.0", path = "../mutable_batch_pb" }
observability_deps = { path = "../observability_deps" }
once_cell = { version = "1.17", features = ["parking_lot"] }
parking_lot = "0.12"
@ -26,7 +29,7 @@ tokio-util = "0.7"
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies] # In alphabetical order
assert_matches = "1.5.0"
dml = { path = "../dml" }
mutable_batch_lp = { path = "../mutable_batch_lp" }
mutable_batch_pb = { path = "../mutable_batch_pb" }
test_helpers = { path = "../test_helpers" }

View File

@ -10,20 +10,6 @@
//! # WAL
//!
//! This crate provides a local-disk WAL for the IOx ingestion pipeline.
use crate::blocking::{
ClosedSegmentFileReader as RawClosedSegmentFileReader, OpenSegmentFileWriter,
};
use data_types::sequence_number_set::SequenceNumberSet;
use generated_types::{
google::{FieldViolation, OptionalField},
influxdata::iox::wal::v1::{
sequenced_wal_op::Op as WalOp, SequencedWalOp as ProtoSequencedWalOp,
},
};
use observability_deps::tracing::info;
use parking_lot::Mutex;
use snafu::prelude::*;
use std::{
collections::BTreeMap,
fs::File,
@ -32,9 +18,27 @@ use std::{
sync::{atomic::AtomicU64, Arc},
time::Duration,
};
use data_types::{sequence_number_set::SequenceNumberSet, NamespaceId};
use generated_types::{
google::{FieldViolation, OptionalField},
influxdata::iox::wal::v1::{
sequenced_wal_op::Op as WalOp, SequencedWalOp as ProtoSequencedWalOp,
},
};
use hashbrown::HashMap;
use mutable_batch::MutableBatch;
use mutable_batch_pb::decode::decode_database_batch;
use observability_deps::tracing::info;
use parking_lot::Mutex;
use snafu::prelude::*;
use tokio::{sync::watch, task::JoinHandle};
use writer_thread::WriterIoThreadHandle;
use crate::blocking::{
ClosedSegmentFileReader as RawClosedSegmentFileReader, OpenSegmentFileWriter,
};
pub mod blocking;
mod writer_thread;
@ -132,6 +136,19 @@ pub enum Error {
},
}
/// Errors that occur when decoding internal types from a WAL file.
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub enum DecodeError {
UnableToCreateMutableBatch {
source: mutable_batch_pb::decode::Error,
},
FailedToReadWal {
source: Error,
},
}
/// A specialized `Result` for WAL-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -554,6 +571,62 @@ impl std::fmt::Debug for ClosedSegmentFileReader {
}
}
/// An in-memory representation of a WAL write operation entry.
#[derive(Debug)]
pub struct WriteOpEntry {
pub namespace: NamespaceId,
pub table_batches: HashMap<i64, MutableBatch>,
}
/// A decoder that reads from a closed segment file and parses write
/// operations from their on-disk format to an internal format.
#[derive(Debug)]
pub struct WriteOpEntryDecoder {
reader: ClosedSegmentFileReader,
}
impl From<ClosedSegmentFileReader> for WriteOpEntryDecoder {
/// Creates a decoder which will use the closed segment file of `reader` to
/// decode write ops from their on-disk format.
fn from(reader: ClosedSegmentFileReader) -> Self {
Self { reader }
}
}
impl Iterator for WriteOpEntryDecoder {
type Item = Result<Vec<WriteOpEntry>, DecodeError>;
/// Reads a collection of write op entries in the next WAL entry batch from the
/// underlying closed segment. A returned Ok(None) indicates that there are no
/// more entries to be decoded from the underlying segment. A zero-length vector
/// may be returned if there are no writes in a WAL entry batch, but does not
/// indicate the decoder is consumed.
fn next(&mut self) -> Option<Self::Item> {
self.reader
.next_batch()
.context(FailedToReadWalSnafu)
.transpose()?
.map(|batch| {
batch
.into_iter()
.filter_map(|sequenced_op| match sequenced_op.op {
WalOp::Write(w) => Some(w),
WalOp::Delete(..) => None,
WalOp::Persist(..) => None,
})
.map(|w| {
Ok(WriteOpEntry {
namespace: NamespaceId::new(w.database_id),
table_batches: decode_database_batch(&w)
.context(UnableToCreateMutableBatchSnafu)?,
})
})
.collect::<Self::Item>()
})
.ok()
}
}
/// Metadata for a WAL segment that is no longer accepting writes, but can be read for replay
/// purposes.
#[derive(Debug, Clone)]
@ -575,7 +648,9 @@ impl ClosedSegment {
#[cfg(test)]
mod tests {
use super::*;
use std::collections::BTreeSet;
use assert_matches::assert_matches;
use data_types::{NamespaceId, SequenceNumber, TableId};
use dml::DmlWrite;
use generated_types::influxdata::{
@ -584,6 +659,10 @@ mod tests {
};
use mutable_batch_lp::lines_to_batches;
use super::*;
const TEST_NAMESPACE_ID: NamespaceId = NamespaceId::new(42);
#[tokio::test]
async fn wal_write_and_read_ops() {
let dir = test_helpers::tmp_dir().unwrap();
@ -681,6 +760,89 @@ mod tests {
);
}
#[tokio::test]
async fn decode_write_op_entries() {
let dir = test_helpers::tmp_dir().unwrap();
let wal = Wal::new(&dir.path()).await.unwrap();
let w1 = test_data("m1,t=foo v=1i 1");
let w2 = test_data("m2,u=foo w=2i 2");
let w3 = test_data("m1,t=foo v=3i 3");
let op1 = SequencedWalOp {
sequence_number: 0,
op: WalOp::Write(w1.to_owned()),
};
let op2 = SequencedWalOp {
sequence_number: 1,
op: WalOp::Write(w2.to_owned()),
};
let op3 = SequencedWalOp {
sequence_number: 2,
op: WalOp::Delete(test_delete()),
};
let op4 = SequencedWalOp {
sequence_number: 2,
op: WalOp::Persist(test_persist()),
};
// A third write entry coming after a delete and persist entry must still be yielded
let op5 = SequencedWalOp {
sequence_number: 3,
op: WalOp::Write(w3.to_owned()),
};
wal.write_op(op1.clone());
wal.write_op(op2.clone());
wal.write_op(op3.clone()).changed().await.unwrap();
wal.write_op(op4.clone());
wal.write_op(op5.clone()).changed().await.unwrap();
let (closed, _) = wal.rotate().unwrap();
let decoder = WriteOpEntryDecoder::from(
wal.reader_for_segment(closed.id)
.expect("failed to open reader for closed WAL segment"),
);
let wal_entries = decoder
.into_iter()
.map(|r| r.expect("unexpected bad entry"))
.collect::<Vec<_>>();
// The decoder should find 2 entries, with a total of 3 write ops
assert_eq!(wal_entries.len(), 2);
let write_op_entries = wal_entries.into_iter().flatten().collect::<Vec<_>>();
assert_eq!(write_op_entries.len(), 3);
assert_matches!(write_op_entries.get(0), Some(got_op1) => {
assert_op_shape(got_op1, &w1);
});
assert_matches!(write_op_entries.get(1), Some(got_op2) => {
assert_op_shape(got_op2, &w2);
});
assert_matches!(write_op_entries.get(2), Some(got_op3) => {
assert_op_shape(got_op3, &w3);
});
}
fn assert_op_shape(left: &WriteOpEntry, right: &DatabaseBatch) {
assert_eq!(left.namespace, NamespaceId::new(right.database_id));
assert_eq!(left.table_batches.len(), right.table_batches.len());
for right_tb in &right.table_batches {
let right_key = right_tb.table_id;
let left_mb = left
.table_batches
.get(&right_key)
.unwrap_or_else(|| panic!("left value missing table batch for key {right_key}"));
assert_eq!(
left_mb.column_names(),
right_tb
.columns
.iter()
.map(|c| c.column_name.as_str())
.collect::<BTreeSet<_>>()
)
}
}
fn test_data(lp: &str) -> DatabaseBatch {
let batches = lines_to_batches(lp, 0).unwrap();
let batches = batches
@ -690,7 +852,7 @@ mod tests {
.collect();
let write = DmlWrite::new(
NamespaceId::new(42),
TEST_NAMESPACE_ID,
batches,
"bananas".into(),
Default::default(),
@ -701,7 +863,7 @@ mod tests {
fn test_delete() -> DeletePayload {
DeletePayload {
database_id: 42,
database_id: TEST_NAMESPACE_ID.get(),
predicate: None,
table_name: "bananas".into(),
}
@ -709,7 +871,7 @@ mod tests {
fn test_persist() -> PersistOp {
PersistOp {
namespace_id: 42,
namespace_id: TEST_NAMESPACE_ID.get(),
parquet_file_uuid: "b4N4N4Z".into(),
partition_id: 43,
table_id: 44,

25
wal_inspect/Cargo.toml Normal file
View File

@ -0,0 +1,25 @@
[package]
name = "wal_inspect"
version.workspace = true
authors.workspace = true
edition.workspace = true
license.workspace = true
[dependencies] # In alphabetical order
data_types = { version = "0.1.0", path = "../data_types" }
dml = { version = "0.1.0", path = "../dml" }
generated_types = { version = "0.1.0", path = "../generated_types" }
hashbrown.workspace = true
mutable_batch = { version = "0.1.0", path = "../mutable_batch" }
mutable_batch_pb = { version = "0.1.0", path = "../mutable_batch_pb" }
parquet_to_line_protocol = { version = "0.1.0", path = "../parquet_to_line_protocol" }
schema = { version = "0.1.0", path = "../schema" }
thiserror = "1.0.40"
wal = { version = "0.1.0", path = "../wal" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies] # In alphabetical order
assert_matches = "1.5.0"
mutable_batch_lp = { path = "../mutable_batch_lp" }
test_helpers = { path = "../test_helpers" }
tokio = { version = "1.27", features = ["macros", "parking_lot", "rt-multi-thread", "sync", "time"] }

382
wal_inspect/src/lib.rs Normal file
View File

@ -0,0 +1,382 @@
//! # WAL Inspect
//!
//! This crate builds on top of the WAL implementation to provide tools for
//! inspecting individual segment files and translating them to human readable
//! formats.
#![deny(rustdoc::broken_intra_doc_links, rust_2018_idioms)]
#![warn(
clippy::clone_on_ref_ptr,
clippy::dbg_macro,
clippy::explicit_iter_loop,
clippy::future_not_send,
clippy::todo,
clippy::use_self,
missing_copy_implementations,
missing_debug_implementations,
missing_docs
)]
use std::io::Write;
use data_types::{NamespaceId, TableId};
use hashbrown::HashMap;
use mutable_batch::MutableBatch;
use parquet_to_line_protocol::convert_to_lines;
use thiserror::Error;
/// Errors emitted by a [`LineProtoWriter`] during operation.
#[derive(Debug, Error)]
pub enum WriteError {
/// The mutable batch is in a state that prevents obtaining
/// the data needed to write line protocol
#[error("failed to get required data from mutable batch: {0}")]
BadMutableBatch(#[from] mutable_batch::Error),
/// The record batch could not be mapped to line protocol
#[error("failed to map record batch to line protocol: {0}")]
ConvertToLineProtocolFailed(String),
/// A write failure caused by an IO error
#[error("failed to write translation: {0}")]
IoError(#[from] std::io::Error),
}
/// Provides namespaced write functionality from table-based mutable batches
/// to namespaced line protocol output.
#[derive(Debug)]
pub struct LineProtoWriter<W, F>
where
W: Write,
{
namespaced_output: HashMap<NamespaceId, W>,
new_write_sink: F,
table_name_index: HashMap<TableId, String>,
}
impl<W, F> LineProtoWriter<W, F>
where
W: Write,
{
/// Performs a best effort flush of all write destinations opened by the [`LineProtoWriter`].
pub fn flush(&mut self) -> Result<(), Vec<WriteError>> {
let mut errs = Vec::<WriteError>::new();
for w in self.namespaced_output.values_mut() {
if let Err(e) = w.flush() {
errs.push(WriteError::IoError(e));
}
}
if !errs.is_empty() {
return Err(errs);
}
Ok(())
}
}
impl<W, F> Drop for LineProtoWriter<W, F>
where
W: Write,
{
fn drop(&mut self) {
_ = self.flush()
}
}
impl<W, F> LineProtoWriter<W, F>
where
W: Write,
F: Fn(NamespaceId) -> Result<W, WriteError>,
{
/// Constructs a new [`LineProtoWriter`] that uses `new_write_sink` to
/// get the destination for each line protocol write by its namespace ID.
/// The `table_name_index` must provide a mapping from all table IDs to
/// table name to recover the measurement name, as WAL write entries do
/// not contain this information.
pub fn new(new_write_sink: F, table_name_index: HashMap<TableId, String>) -> Self {
Self {
namespaced_output: HashMap::new(),
new_write_sink,
table_name_index,
}
}
/// Writes the provided set of table batches as line protocol write entries
/// to the destination for the provided namespace ID.
pub fn write_namespaced_table_batches(
&mut self,
ns: NamespaceId,
table_batches: HashMap<i64, MutableBatch>,
) -> Result<(), WriteError> {
let sink = self
.namespaced_output
.entry(ns)
.or_insert((self.new_write_sink)(ns)?);
write_batches_as_line_proto(sink, &self.table_name_index, table_batches)
}
}
fn write_batches_as_line_proto<W>(
sink: &mut W,
table_name_index: &HashMap<TableId, String>,
table_batches: HashMap<i64, MutableBatch>,
) -> Result<(), WriteError>
where
W: Write,
{
for (table_id, mb) in table_batches {
let schema = mb.schema(schema::Projection::All)?;
let record_batch = mb.to_arrow(schema::Projection::All)?;
let measurement_name = table_name_index.get(&TableId::new(table_id)).ok_or(
WriteError::ConvertToLineProtocolFailed(format!(
"missing table name for id {}",
&table_id
)),
)?;
sink.write_all(
convert_to_lines(measurement_name, &schema, &record_batch)
.map_err(WriteError::ConvertToLineProtocolFailed)?
.as_slice(),
)?;
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::fs::{read_dir, OpenOptions};
use assert_matches::assert_matches;
use data_types::TableId;
use dml::DmlWrite;
use generated_types::influxdata::{
iox::wal::v1::sequenced_wal_op::Op, pbdata::v1::DatabaseBatch,
};
use mutable_batch_lp::lines_to_batches;
use wal::{SequencedWalOp, WriteOpEntry, WriteOpEntryDecoder};
use super::*;
#[tokio::test]
async fn translate_good_wal_segment_file() {
let test_dir = test_helpers::tmp_dir().expect("failed to create test dir");
let wal = wal::Wal::new(test_dir.path()).await.unwrap();
// Assign table IDs to the measurements and place some writes in the WAL
let (table_id_index, table_name_index) =
build_indexes([("m1", TableId::new(1)), ("m2", TableId::new(2))]);
let line1 = "m1,t=foo v=1i 1";
let line2 = r#"m2,t=bar v="arán" 1"#;
let line3 = "m1,t=foo v=2i 2";
// Generate a single entry
wal.write_op(SequencedWalOp {
sequence_number: 0,
op: Op::Write(encode_line(NamespaceId::new(1), &table_id_index, line1)),
});
wal.write_op(SequencedWalOp {
sequence_number: 1,
op: Op::Write(encode_line(NamespaceId::new(2), &table_id_index, line2)),
});
wal.write_op(SequencedWalOp {
sequence_number: 2,
op: Op::Write(encode_line(NamespaceId::new(1), &table_id_index, line3)),
})
.changed()
.await
.expect("WAL should have changed");
// Rotate the WAL and create the translator.
let (closed, _) = wal.rotate().expect("failed to rotate WAL");
let decoder = WriteOpEntryDecoder::from(
wal.reader_for_segment(closed.id())
.expect("failed to open reader for closed segment"),
);
let mut writer = LineProtoWriter::new(|_| Ok(Vec::<u8>::new()), table_name_index);
let decoded_entries = decoder
.into_iter()
.map(|r| r.expect("unexpected bad entry"))
.collect::<Vec<_>>();
assert_eq!(decoded_entries.len(), 1);
let decoded_ops = decoded_entries
.into_iter()
.flatten()
.collect::<Vec<WriteOpEntry>>();
assert_eq!(decoded_ops.len(), 3);
for entry in decoded_ops {
writer
.write_namespaced_table_batches(entry.namespace, entry.table_batches)
.expect("batch write should not fail");
}
// The WAL has been given a single entry containing three write ops
let results = &writer.namespaced_output;
// Assert that the namespaced writes contain ONLY the following:
//
// NamespaceId 1:
//
// m1,t=foo v=1i 1
// m1,t=foo v=2i 2
//
// NamespaceId 2:
//
// m2,t=bar v="arán" 1
//
assert_eq!(results.len(), 2);
assert_matches!(results.get(&NamespaceId::new(1)), Some(e) => {
assert_eq!(
String::from_utf8(e.to_owned()).unwrap().as_str(), format!("{}\n{}\n", line1, line3));
});
assert_matches!(results.get(&NamespaceId::new(2)), Some(e) => {
assert_eq!(
String::from_utf8(e.to_owned()).unwrap().as_str(), format!("{}\n", line2));
});
}
#[tokio::test]
async fn partial_translate_bad_wal_segment_file() {
let test_dir = test_helpers::tmp_dir().expect("failed to create test dir");
let wal = wal::Wal::new(test_dir.path()).await.unwrap();
let (table_id_index, table_name_index) =
build_indexes([("m3", TableId::new(3)), ("m4", TableId::new(4))]);
let line1 = "m3,s=baz v=3i 1";
let line2 = "m3,s=baz v=2i 2";
let line3 = "m4,s=qux v=2i 3";
let line4 = "m4,s=qux v=5i 4";
// Generate some WAL entries
wal.write_op(SequencedWalOp {
sequence_number: 0,
op: Op::Write(encode_line(NamespaceId::new(3), &table_id_index, line1)),
});
wal.write_op(SequencedWalOp {
sequence_number: 1,
op: Op::Write(encode_line(NamespaceId::new(3), &table_id_index, line2)),
})
.changed()
.await
.expect("WAL should have changed");
wal.write_op(SequencedWalOp {
sequence_number: 2,
op: Op::Write(encode_line(NamespaceId::new(4), &table_id_index, line3)),
});
wal.write_op(SequencedWalOp {
sequence_number: 3,
op: Op::Write(encode_line(NamespaceId::new(4), &table_id_index, line4)),
})
.changed()
.await
.expect("WAL should have changed");
// Get the path of the only segment file, then rotate it and add some
// garbage to the end.
let mut reader = read_dir(test_dir.path()).unwrap();
let closed_path = reader
.next()
.expect("no segment file found in WAL dir")
.unwrap()
.path();
assert_matches!(reader.next(), None); // Only 1 file should be in the WAL dir prior to rotation
let (closed, _) = wal.rotate().expect("failed to rotate WAL");
{
let mut file = OpenOptions::new()
.append(true)
.open(closed_path)
.expect("unable to open closed WAL segment for writing");
file.write_all(b"bananananananananas").unwrap();
}
// Create the translator and read as much as possible out of the bad segment file
let decoder = WriteOpEntryDecoder::from(
wal.reader_for_segment(closed.id())
.expect("failed to open reader for closed segment"),
);
let mut writer = LineProtoWriter::new(|_| Ok(Vec::<u8>::new()), table_name_index);
// The translator should be able to read all 2 good entries containing 4 write ops
let decoded_entries = decoder
.into_iter()
.map_while(|r| r.ok())
.collect::<Vec<_>>();
assert_eq!(decoded_entries.len(), 2);
let decoded_ops = decoded_entries
.into_iter()
.flatten()
.collect::<Vec<WriteOpEntry>>();
assert_eq!(decoded_ops.len(), 4);
for entry in decoded_ops {
writer
.write_namespaced_table_batches(entry.namespace, entry.table_batches)
.expect("batch write should not fail");
}
let results = &writer.namespaced_output;
// Assert that the namespaced writes contain ONLY the following:
//
// NamespaceId 3:
//
// m3,s=baz v=3i 1
// m3,s=baz v=2i 2
//
// NamespaceId 4:
//
// m4,s=qux v=2i 3
// m4,s=qux v=5i 4
//
assert_eq!(results.len(), 2);
assert_matches!(results.get(&NamespaceId::new(3)), Some(e) => {
assert_eq!(
String::from_utf8(e.to_owned()).unwrap().as_str(), format!("{}\n{}\n", line1, line2));
});
assert_matches!(results.get(&NamespaceId::new(4)), Some(e) => {
assert_eq!(
String::from_utf8(e.to_owned()).unwrap().as_str(), format!("{}\n{}\n", line3, line4));
});
}
fn build_indexes<'a>(
iter: impl IntoIterator<Item = (&'a str, TableId)>,
) -> (HashMap<String, TableId>, HashMap<TableId, String>) {
let table_id_index: HashMap<String, TableId> =
HashMap::from_iter(iter.into_iter().map(|(s, id)| (s.to_string(), id)));
let table_name_index: HashMap<TableId, String> = table_id_index
.clone()
.into_iter()
.map(|(name, id)| (id, name))
.collect();
(table_id_index, table_name_index)
}
fn encode_line(
ns: NamespaceId,
table_id_index: &HashMap<String, TableId>,
lp: &str,
) -> DatabaseBatch {
let batches = lines_to_batches(lp, 0).unwrap();
let batches = batches
.into_iter()
.map(|(table_name, batch)| {
(
table_id_index
.get(&table_name)
.expect("table name not present in table id index")
.to_owned(),
batch,
)
})
.collect();
let write = DmlWrite::new(ns, batches, "bananas".into(), Default::default());
mutable_batch_pb::encode::encode_write(ns.get(), &write)
}
}