refactor(`wal_inspect`): Make `LineProtoWriter` namespace unaware

Instead, the type responsible for initialising it handles namespaced
`Write` initialisation and management, as well as the failure paths that
may need handling. This commit introduces a `NamespaceDemultiplexer`
type with a generic implementation allowing fallible `async` lazy init
of any type from a given `NamespaceId`. This paves the way for catalog-aware
initialisation of `LineProtoWriter`s.
pull/24376/head
Fraser Savage 2023-05-25 15:12:01 +01:00
parent 0b9b775fbd
commit 51d59f8216
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
7 changed files with 194 additions and 160 deletions

1
Cargo.lock generated
View File

@ -6402,7 +6402,6 @@ dependencies = [
name = "wal_inspect"
version = "0.1.0"
dependencies = [
"assert_matches",
"data_types",
"dml",
"generated_types",

View File

@ -70,7 +70,7 @@ where
let connection = connection().await;
skipped_compactions::command(connection, config).await?
}
Command::Wal(config) => wal::command(config)?,
Command::Wal(config) => wal::command(config).await?,
}
Ok(())

View File

@ -39,8 +39,8 @@ enum Command {
}
/// Executes a WAL debugging subcommand as directed by the config
pub fn command(config: Config) -> Result<(), Error> {
pub async fn command(config: Config) -> Result<(), Error> {
match config.command {
Command::RegenerateLp(config) => regenerate_lp::command(config),
Command::RegenerateLp(config) => regenerate_lp::command(config).await,
}
}

View File

@ -1,10 +1,13 @@
//! A module providing a CLI command for regenerating line protocol from a WAL file.
use std::fs::{create_dir_all, OpenOptions};
use std::future::Future;
use std::path::PathBuf;
use std::sync::Arc;
use data_types::NamespaceId;
use observability_deps::tracing::{debug, error, info};
use wal::{ClosedSegmentFileReader, WriteOpEntryDecoder};
use wal_inspect::{LineProtoWriter, NamespacedBatchWriter, WriteError};
use wal_inspect::{LineProtoWriter, NamespaceDemultiplexer, TableBatchWriter, WriteError};
use super::Error;
@ -29,7 +32,7 @@ pub struct Config {
/// Executes the `regenerate-lp` command with the provided configuration, reading
/// write operation entries from a WAL file and mapping them to line protocol.
pub fn command(config: Config) -> Result<(), Error> {
pub async fn command(config: Config) -> Result<(), Error> {
let decoder = WriteOpEntryDecoder::from(
ClosedSegmentFileReader::from_path(&config.input)
.map_err(|wal_err| Error::UnableToOpenWalFile { source: wal_err })?,
@ -37,12 +40,12 @@ pub fn command(config: Config) -> Result<(), Error> {
match config.output_directory {
Some(d) => {
let d = Arc::new(d);
create_dir_all(d.as_path())?;
let writer = LineProtoWriter::new(
|namespace_id| {
let file_path = d
.as_path()
.join(format!("namespace_id_{}.lp", namespace_id));
let namespace_demux = NamespaceDemultiplexer::new(move |namespace_id| {
let d = Arc::clone(&d);
async move {
let file_path = d.as_path().join(format!("namespace_{}.lp", namespace_id));
let mut open_options = OpenOptions::new().write(true).to_owned();
if config.force {
@ -57,31 +60,45 @@ pub fn command(config: Config) -> Result<(), Error> {
"creating namespaced file as destination for regenerated line protocol",
);
open_options.open(&file_path).map_err(WriteError::IoError)
},
Ok(LineProtoWriter::new(
open_options.open(&file_path).map_err(WriteError::IoError)?,
None,
);
decode_and_write_entries(decoder, writer)
))
}
});
decode_and_write_entries(decoder, namespace_demux).await
}
None => {
let writer = LineProtoWriter::new(|_namespace_id| Ok(std::io::stdout()), None);
decode_and_write_entries(decoder, writer)
let namespace_demux = NamespaceDemultiplexer::new(|_namespace_id| async {
let result: Result<LineProtoWriter<std::io::Stdout>, WriteError> =
Ok(LineProtoWriter::new(std::io::stdout(), None));
result
});
decode_and_write_entries(decoder, namespace_demux).await
}
}
}
fn decode_and_write_entries<W: NamespacedBatchWriter>(
async fn decode_and_write_entries<T, F, I>(
decoder: WriteOpEntryDecoder,
mut writer: W,
) -> Result<(), Error> {
mut namespace_demux: NamespaceDemultiplexer<T, F>,
) -> Result<(), Error>
where
T: TableBatchWriter<WriteError = wal_inspect::WriteError> + Send,
F: (Fn(NamespaceId) -> I) + Send + Sync,
I: Future<Output = Result<T, WriteError>> + Send,
{
let mut write_errors = vec![];
for (wal_entry_number, entry_batch) in decoder.enumerate() {
for (write_op_number, entry) in entry_batch?.into_iter().enumerate() {
let namespace_id = entry.namespace;
debug!(%namespace_id, %wal_entry_number, %write_op_number, "regenerating line protocol for namespace from WAL write op entry");
writer
.write_namespaced_table_batches(entry.namespace, entry.table_batches)
namespace_demux
.get(namespace_id)
.await
.map(|writer| writer.write_table_batches(entry.table_batches.into_iter()))
.and_then(|write_result| write_result) // flatten out the write result if Ok
.unwrap_or_else(|err| {
error!(
%namespace_id,

View File

@ -24,7 +24,7 @@ use std::{
time::Duration,
};
use data_types::{sequence_number_set::SequenceNumberSet, NamespaceId};
use data_types::{sequence_number_set::SequenceNumberSet, NamespaceId, TableId};
use generated_types::{
google::{FieldViolation, OptionalField},
influxdata::iox::wal::v1::{
@ -580,7 +580,7 @@ impl std::fmt::Debug for ClosedSegmentFileReader {
#[derive(Debug)]
pub struct WriteOpEntry {
pub namespace: NamespaceId,
pub table_batches: HashMap<i64, MutableBatch>,
pub table_batches: HashMap<TableId, MutableBatch>,
}
/// A decoder that reads from a closed segment file and parses write
@ -624,7 +624,10 @@ impl Iterator for WriteOpEntryDecoder {
Ok(WriteOpEntry {
namespace: NamespaceId::new(w.database_id),
table_batches: decode_database_batch(&w)
.context(UnableToCreateMutableBatchSnafu)?,
.context(UnableToCreateMutableBatchSnafu)?
.into_iter()
.map(|(id, mb)| (TableId::new(id), mb))
.collect(),
})
})
.collect::<Self::Item>()
@ -886,7 +889,7 @@ mod tests {
assert_eq!(left.namespace, NamespaceId::new(right.database_id));
assert_eq!(left.table_batches.len(), right.table_batches.len());
for right_tb in &right.table_batches {
let right_key = right_tb.table_id;
let right_key = TableId::new(right_tb.table_id);
let left_mb = left
.table_batches
.get(&right_key)

View File

@ -15,7 +15,6 @@ thiserror = "1.0.40"
workspace-hack = { version = "0.1", path = "../workspace-hack" }
[dev-dependencies] # In alphabetical order
assert_matches = "1.5.0"
dml = { version = "0.1.0", path = "../dml" }
generated_types = { version = "0.1.0", path = "../generated_types" }
mutable_batch_lp = { path = "../mutable_batch_lp" }

View File

@ -21,168 +21,165 @@
// Workaround for "unused crate" lint false positives.
use workspace_hack as _;
use std::borrow::Cow;
use std::error::Error;
use std::io::Write;
use std::{borrow::Cow, future::Future};
use data_types::{NamespaceId, TableId};
use hashbrown::HashMap;
use hashbrown::{hash_map::Entry, HashMap};
use mutable_batch::MutableBatch;
use parquet_to_line_protocol::convert_to_lines;
use thiserror::Error;
/// Errors emitted by a [`LineProtoWriter`] during operation.
/// Errors emitted by a [`TableBatchWriter`] during operation.
#[derive(Debug, Error)]
pub enum WriteError {
/// The mutable batch is in a state that prevents obtaining
/// the data needed to write line protocol
/// the data needed to write to the [`TableBatchWriter`]'s
/// underlying implementation.
#[error("failed to get required data from mutable batch: {0}")]
BadMutableBatch(#[from] mutable_batch::Error),
/// The record batch could not be mapped to line protocol
#[error("failed to map record batch to line protocol: {0}")]
ConvertToLineProtocolFailed(String),
#[error("failed to translate record batch: {0}")]
RecordBatchTranslationFailure(String),
/// A write failure caused by an IO error
#[error("failed to write translation: {0}")]
#[error("failed to write table batch: {0}")]
IoError(#[from] std::io::Error),
}
/// A [`NamespacedBatchWriter`] takes a namespace and a set of associated table
/// batch writes and writes them elsewhere.
pub trait NamespacedBatchWriter {
/// Writes out each table batch to a destination associated with the given
/// namespace ID.
fn write_namespaced_table_batches(
&mut self,
ns: NamespaceId,
table_batches: HashMap<i64, MutableBatch>,
) -> Result<(), WriteError>;
/// The [`TableBatchWriter`] trait provides functionality to write table-ID
/// mutable batches to the implementation defined destination and format.
pub trait TableBatchWriter {
/// The bounds which [`TableBatchWriter`] implementors must adhere to when
/// returning errors for a failed write.
type WriteError: Error + Into<WriteError>;
/// Write out `table_batches` to the implementation defined destination and format.
fn write_table_batches<B>(&mut self, table_batches: B) -> Result<(), Self::WriteError>
where
B: Iterator<Item = (TableId, MutableBatch)>;
}
/// Provides namespaced write functionality from table-based mutable batches
/// to namespaced line protocol output.
/// NamespaceDemultiplexer is a delegator from [`NamespaceId`] to some namespaced
/// type, lazily initialising instances as required.
#[derive(Debug)]
pub struct LineProtoWriter<W, F>
where
W: Write,
{
namespaced_output: HashMap<NamespaceId, W>,
new_write_sink: F,
table_name_index: Option<HashMap<TableId, String>>,
pub struct NamespaceDemultiplexer<T, F> {
// The map used to hold currently initialised `T` and lookup within.
demux_map: HashMap<NamespaceId, T>,
// Mechanism to initialise a new `T` when no entry is found in the
// `demux_map`.
init_new: F,
}
impl<W, F> LineProtoWriter<W, F>
impl<T, F, I, E> NamespaceDemultiplexer<T, F>
where
W: Write,
T: Send,
F: (Fn(NamespaceId) -> I) + Send + Sync,
I: Future<Output = Result<T, E>> + Send,
{
/// Performs a best effort flush of all write destinations opened by the [`LineProtoWriter`].
pub fn flush(&mut self) -> Result<(), Vec<WriteError>> {
let mut errs = Vec::<WriteError>::new();
for w in self.namespaced_output.values_mut() {
if let Err(e) = w.flush() {
errs.push(WriteError::IoError(e));
/// Creates a [`NamespaceDemultiplexer`] that uses `F` to lazily initialise
/// instances of [`T`] when there is no entry in the map for a given [`NamespaceId`].
pub fn new(init_new: F) -> Self {
Self {
demux_map: Default::default(),
init_new,
}
}
if !errs.is_empty() {
return Err(errs);
/// Looks up the [`T`] corresponding to `namespace_id`, initialising a new
/// instance through the provided mechanism if no entry exists yet.
pub async fn get(&mut self, namespace_id: NamespaceId) -> Result<&mut T, E> {
match self.demux_map.entry(namespace_id) {
Entry::Occupied(entry) => Ok(entry.into_mut()),
Entry::Vacant(empty_entry) => {
let value = (self.init_new)(namespace_id).await?;
Ok(empty_entry.insert(value))
}
}
Ok(())
}
}
impl<W, F> Drop for LineProtoWriter<W, F>
/// The [`LineProtoWriter`] enables rewriting table-keyed mutable batches as
/// line protocol to a [`Write`] implementation.
#[derive(Debug)]
pub struct LineProtoWriter<W>
where
W: Write,
{
sink: W,
table_name_lookup: Option<HashMap<TableId, String>>,
}
impl<W> LineProtoWriter<W>
where
W: Write,
{
/// Constructs a new [`LineProtoWriter`] that writes batches of table writes
/// to `sink` as line protocol.
///
/// If provided, `table_name_lookup` MUST contain an exhaustive mapping from
/// table ID to corresponding table name so that the correct measurement name
/// can be placed in the line proto. WAL entries store only table ID, not
/// name and thus cannot be inferred.
///
/// If no lookup is given then measurement names are written as table IDs.
pub fn new(sink: W, table_name_lookup: Option<HashMap<TableId, String>>) -> Self {
Self {
sink,
table_name_lookup,
}
}
}
impl<W> Drop for LineProtoWriter<W>
where
W: Write,
{
fn drop(&mut self) {
_ = self.flush()
_ = self.sink.flush()
}
}
impl<W, F> LineProtoWriter<W, F>
impl<W> TableBatchWriter for LineProtoWriter<W>
where
W: Write,
F: Fn(NamespaceId) -> Result<W, WriteError>,
{
/// Constructs a new [`LineProtoWriter`] that uses `new_write_sink` to
/// get the destination for each line protocol write by its namespace ID.
///
/// The optional `table_name_index` is used to provide a mapping for all table IDs
/// to the corresponding table name to recover the measurement name, as WAL write
/// entries do not contain this information. If supplied, the index MUST be exhaustive.
///
/// If no index is given then measurement names are written as table IDs.
pub fn new(new_write_sink: F, table_name_index: Option<HashMap<TableId, String>>) -> Self {
Self {
namespaced_output: HashMap::new(),
new_write_sink,
table_name_index,
}
}
}
type WriteError = WriteError;
impl<W, F> NamespacedBatchWriter for LineProtoWriter<W, F>
where
W: Write,
F: Fn(NamespaceId) -> Result<W, WriteError>,
{
/// Writes the provided set of table batches as line protocol write entries
/// to the destination for the provided namespace ID.
fn write_namespaced_table_batches(
&mut self,
ns: NamespaceId,
table_batches: HashMap<i64, MutableBatch>,
) -> Result<(), WriteError> {
let sink = self
.namespaced_output
.entry(ns)
.or_insert((self.new_write_sink)(ns)?);
write_batches_as_line_proto(
sink,
self.table_name_index.as_ref(),
table_batches.into_iter(),
)
}
}
fn write_batches_as_line_proto<W, B>(
sink: &mut W,
table_name_index: Option<&HashMap<TableId, String>>,
table_batches: B,
) -> Result<(), WriteError>
where
W: Write,
B: Iterator<Item = (i64, MutableBatch)>,
{
fn write_table_batches<B>(&mut self, table_batches: B) -> Result<(), Self::WriteError>
where
B: Iterator<Item = (TableId, MutableBatch)>,
{
for (table_id, mb) in table_batches {
let schema = mb.schema(schema::Projection::All)?;
let record_batch = mb.to_arrow(schema::Projection::All)?;
let table_id = TableId::new(table_id);
let measurement_name = match table_name_index {
let measurement_name = match &self.table_name_lookup {
Some(idx) => Cow::Borrowed(idx.get(&table_id).ok_or(
WriteError::ConvertToLineProtocolFailed(format!(
WriteError::RecordBatchTranslationFailure(format!(
"missing table name for id {}",
&table_id
)),
)?),
None => Cow::Owned(table_id.to_string()),
};
sink.write_all(
self.sink.write_all(
convert_to_lines(&measurement_name, &schema, &record_batch)
.map_err(WriteError::ConvertToLineProtocolFailed)?
.map_err(WriteError::RecordBatchTranslationFailure)?
.as_slice(),
)?;
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use assert_matches::assert_matches;
use data_types::TableId;
use dml::DmlWrite;
use generated_types::influxdata::{
@ -229,7 +226,13 @@ mod tests {
wal.reader_for_segment(closed.id())
.expect("failed to open reader for closed segment"),
);
let mut writer = LineProtoWriter::new(|_| Ok(Vec::<u8>::new()), Some(table_name_index));
let mut namespace_demux = NamespaceDemultiplexer::new(|_namespace_id| async {
let result: Result<LineProtoWriter<Vec<u8>>, WriteError> = Ok(LineProtoWriter::new(
Vec::<u8>::new(),
Some(table_name_index.clone()),
));
result
});
let decoded_entries = decoder
.into_iter()
@ -243,14 +246,13 @@ mod tests {
assert_eq!(decoded_ops.len(), 3);
for entry in decoded_ops {
writer
.write_namespaced_table_batches(entry.namespace, entry.table_batches)
.expect("batch write should not fail");
namespace_demux
.get(entry.namespace)
.await
.expect("failed to get namespaced writer")
.write_table_batches(entry.table_batches.into_iter())
.expect("should not fail to write table batches");
}
// The WAL has been given a single entry containing three write ops
let results = &writer.namespaced_output;
// Assert that the namespaced writes contain ONLY the following:
//
// NamespaceId 1:
@ -262,20 +264,31 @@ mod tests {
//
// m2,t=bar v="arán" 1
//
assert_eq!(results.len(), 2);
assert_matches!(results.get(&NamespaceId::new(1)), Some(e) => {
assert_eq!(namespace_demux.demux_map.len(), 2);
let ns_1_results = namespace_demux
.get(NamespaceId::new(1))
.await
.expect("failed to get namespaced writer")
.sink
.clone();
assert_eq!(
String::from_utf8(e.to_owned()).unwrap().as_str(), format!("{}\n{}\n", line1, line3));
});
assert_matches!(results.get(&NamespaceId::new(2)), Some(e) => {
String::from_utf8(ns_1_results).unwrap().as_str(),
format!("{}\n{}\n", line1, line3)
);
let ns_2_results = namespace_demux
.get(NamespaceId::new(2))
.await
.expect("failed to get namespaced writer")
.sink
.clone();
assert_eq!(
String::from_utf8(e.to_owned()).unwrap().as_str(), format!("{}\n", line2));
});
String::from_utf8(ns_2_results).unwrap().as_str(),
format!("{}\n", line2)
);
}
#[test]
fn write_line_proto_without_index() {
let mut sink = Vec::<u8>::new();
let batches = BTreeMap::from_iter(
lines_to_batches(
r#"m1,t=foo v=1i 1
@ -287,14 +300,17 @@ m2,t=bar v="arán" 1"#,
.collect::<BTreeMap<_, _>>()
.into_iter()
.enumerate()
.map(|(i, (_table_name, batch))| (i as i64, batch)),
.map(|(i, (_table_name, batch))| (TableId::new(i as i64), batch)),
);
write_batches_as_line_proto(&mut sink, None, batches.into_iter())
let mut lp_writer = LineProtoWriter::new(Vec::new(), None);
lp_writer
.write_table_batches(batches.into_iter())
.expect("write back to line proto should succeed");
assert_eq!(
String::from_utf8(sink).expect("invalid output"),
String::from_utf8(lp_writer.sink.clone()).expect("invalid output"),
r#"0,t=foo v=1i 1
1,t=bar v="arán" 1
"#