diff --git a/Cargo.lock b/Cargo.lock index beba4caf8d..2b82024e0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6402,7 +6402,6 @@ dependencies = [ name = "wal_inspect" version = "0.1.0" dependencies = [ - "assert_matches", "data_types", "dml", "generated_types", diff --git a/influxdb_iox/src/commands/debug/mod.rs b/influxdb_iox/src/commands/debug/mod.rs index 6c9f9aa3cf..0159704844 100644 --- a/influxdb_iox/src/commands/debug/mod.rs +++ b/influxdb_iox/src/commands/debug/mod.rs @@ -70,7 +70,7 @@ where let connection = connection().await; skipped_compactions::command(connection, config).await? } - Command::Wal(config) => wal::command(config)?, + Command::Wal(config) => wal::command(config).await?, } Ok(()) diff --git a/influxdb_iox/src/commands/debug/wal/mod.rs b/influxdb_iox/src/commands/debug/wal/mod.rs index b5cb93da10..24d500a6e4 100644 --- a/influxdb_iox/src/commands/debug/wal/mod.rs +++ b/influxdb_iox/src/commands/debug/wal/mod.rs @@ -39,8 +39,8 @@ enum Command { } /// Executes a WAL debugging subcommand as directed by the config -pub fn command(config: Config) -> Result<(), Error> { +pub async fn command(config: Config) -> Result<(), Error> { match config.command { - Command::RegenerateLp(config) => regenerate_lp::command(config), + Command::RegenerateLp(config) => regenerate_lp::command(config).await, } } diff --git a/influxdb_iox/src/commands/debug/wal/regenerate_lp.rs b/influxdb_iox/src/commands/debug/wal/regenerate_lp.rs index df3d49e8be..e5be1d7ca0 100644 --- a/influxdb_iox/src/commands/debug/wal/regenerate_lp.rs +++ b/influxdb_iox/src/commands/debug/wal/regenerate_lp.rs @@ -1,10 +1,13 @@ //! A module providing a CLI command for regenerating line protocol from a WAL file. use std::fs::{create_dir_all, OpenOptions}; +use std::future::Future; use std::path::PathBuf; +use std::sync::Arc; +use data_types::NamespaceId; use observability_deps::tracing::{debug, error, info}; use wal::{ClosedSegmentFileReader, WriteOpEntryDecoder}; -use wal_inspect::{LineProtoWriter, NamespacedBatchWriter, WriteError}; +use wal_inspect::{LineProtoWriter, NamespaceDemultiplexer, TableBatchWriter, WriteError}; use super::Error; @@ -29,7 +32,7 @@ pub struct Config { /// Executes the `regenerate-lp` command with the provided configuration, reading /// write operation entries from a WAL file and mapping them to line protocol. -pub fn command(config: Config) -> Result<(), Error> { +pub async fn command(config: Config) -> Result<(), Error> { let decoder = WriteOpEntryDecoder::from( ClosedSegmentFileReader::from_path(&config.input) .map_err(|wal_err| Error::UnableToOpenWalFile { source: wal_err })?, @@ -37,12 +40,12 @@ pub fn command(config: Config) -> Result<(), Error> { match config.output_directory { Some(d) => { + let d = Arc::new(d); create_dir_all(d.as_path())?; - let writer = LineProtoWriter::new( - |namespace_id| { - let file_path = d - .as_path() - .join(format!("namespace_id_{}.lp", namespace_id)); + let namespace_demux = NamespaceDemultiplexer::new(move |namespace_id| { + let d = Arc::clone(&d); + async move { + let file_path = d.as_path().join(format!("namespace_{}.lp", namespace_id)); let mut open_options = OpenOptions::new().write(true).to_owned(); if config.force { @@ -57,31 +60,45 @@ pub fn command(config: Config) -> Result<(), Error> { "creating namespaced file as destination for regenerated line protocol", ); - open_options.open(&file_path).map_err(WriteError::IoError) - }, - None, - ); - decode_and_write_entries(decoder, writer) + Ok(LineProtoWriter::new( + open_options.open(&file_path).map_err(WriteError::IoError)?, + None, + )) + } + }); + decode_and_write_entries(decoder, namespace_demux).await } None => { - let writer = LineProtoWriter::new(|_namespace_id| Ok(std::io::stdout()), None); - decode_and_write_entries(decoder, writer) + let namespace_demux = NamespaceDemultiplexer::new(|_namespace_id| async { + let result: Result, WriteError> = + Ok(LineProtoWriter::new(std::io::stdout(), None)); + result + }); + decode_and_write_entries(decoder, namespace_demux).await } } } -fn decode_and_write_entries( +async fn decode_and_write_entries( decoder: WriteOpEntryDecoder, - mut writer: W, -) -> Result<(), Error> { + mut namespace_demux: NamespaceDemultiplexer, +) -> Result<(), Error> +where + T: TableBatchWriter + Send, + F: (Fn(NamespaceId) -> I) + Send + Sync, + I: Future> + Send, +{ let mut write_errors = vec![]; for (wal_entry_number, entry_batch) in decoder.enumerate() { for (write_op_number, entry) in entry_batch?.into_iter().enumerate() { let namespace_id = entry.namespace; debug!(%namespace_id, %wal_entry_number, %write_op_number, "regenerating line protocol for namespace from WAL write op entry"); - writer - .write_namespaced_table_batches(entry.namespace, entry.table_batches) + namespace_demux + .get(namespace_id) + .await + .map(|writer| writer.write_table_batches(entry.table_batches.into_iter())) + .and_then(|write_result| write_result) // flatten out the write result if Ok .unwrap_or_else(|err| { error!( %namespace_id, diff --git a/wal/src/lib.rs b/wal/src/lib.rs index b9695321d4..7d000a9263 100644 --- a/wal/src/lib.rs +++ b/wal/src/lib.rs @@ -24,7 +24,7 @@ use std::{ time::Duration, }; -use data_types::{sequence_number_set::SequenceNumberSet, NamespaceId}; +use data_types::{sequence_number_set::SequenceNumberSet, NamespaceId, TableId}; use generated_types::{ google::{FieldViolation, OptionalField}, influxdata::iox::wal::v1::{ @@ -580,7 +580,7 @@ impl std::fmt::Debug for ClosedSegmentFileReader { #[derive(Debug)] pub struct WriteOpEntry { pub namespace: NamespaceId, - pub table_batches: HashMap, + pub table_batches: HashMap, } /// A decoder that reads from a closed segment file and parses write @@ -624,7 +624,10 @@ impl Iterator for WriteOpEntryDecoder { Ok(WriteOpEntry { namespace: NamespaceId::new(w.database_id), table_batches: decode_database_batch(&w) - .context(UnableToCreateMutableBatchSnafu)?, + .context(UnableToCreateMutableBatchSnafu)? + .into_iter() + .map(|(id, mb)| (TableId::new(id), mb)) + .collect(), }) }) .collect::() @@ -886,7 +889,7 @@ mod tests { assert_eq!(left.namespace, NamespaceId::new(right.database_id)); assert_eq!(left.table_batches.len(), right.table_batches.len()); for right_tb in &right.table_batches { - let right_key = right_tb.table_id; + let right_key = TableId::new(right_tb.table_id); let left_mb = left .table_batches .get(&right_key) diff --git a/wal_inspect/Cargo.toml b/wal_inspect/Cargo.toml index e35aa3965c..bd74cd7eb7 100644 --- a/wal_inspect/Cargo.toml +++ b/wal_inspect/Cargo.toml @@ -15,7 +15,6 @@ thiserror = "1.0.40" workspace-hack = { version = "0.1", path = "../workspace-hack" } [dev-dependencies] # In alphabetical order -assert_matches = "1.5.0" dml = { version = "0.1.0", path = "../dml" } generated_types = { version = "0.1.0", path = "../generated_types" } mutable_batch_lp = { path = "../mutable_batch_lp" } diff --git a/wal_inspect/src/lib.rs b/wal_inspect/src/lib.rs index dab35c356c..53992dcd1e 100644 --- a/wal_inspect/src/lib.rs +++ b/wal_inspect/src/lib.rs @@ -21,168 +21,165 @@ // Workaround for "unused crate" lint false positives. use workspace_hack as _; -use std::borrow::Cow; +use std::error::Error; use std::io::Write; +use std::{borrow::Cow, future::Future}; use data_types::{NamespaceId, TableId}; -use hashbrown::HashMap; +use hashbrown::{hash_map::Entry, HashMap}; use mutable_batch::MutableBatch; use parquet_to_line_protocol::convert_to_lines; use thiserror::Error; -/// Errors emitted by a [`LineProtoWriter`] during operation. +/// Errors emitted by a [`TableBatchWriter`] during operation. #[derive(Debug, Error)] pub enum WriteError { /// The mutable batch is in a state that prevents obtaining - /// the data needed to write line protocol + /// the data needed to write to the [`TableBatchWriter`]'s + /// underlying implementation. #[error("failed to get required data from mutable batch: {0}")] BadMutableBatch(#[from] mutable_batch::Error), /// The record batch could not be mapped to line protocol - #[error("failed to map record batch to line protocol: {0}")] - ConvertToLineProtocolFailed(String), + #[error("failed to translate record batch: {0}")] + RecordBatchTranslationFailure(String), /// A write failure caused by an IO error - #[error("failed to write translation: {0}")] + #[error("failed to write table batch: {0}")] IoError(#[from] std::io::Error), } -/// A [`NamespacedBatchWriter`] takes a namespace and a set of associated table -/// batch writes and writes them elsewhere. -pub trait NamespacedBatchWriter { - /// Writes out each table batch to a destination associated with the given - /// namespace ID. - fn write_namespaced_table_batches( - &mut self, - ns: NamespaceId, - table_batches: HashMap, - ) -> Result<(), WriteError>; +/// The [`TableBatchWriter`] trait provides functionality to write table-ID +/// mutable batches to the implementation defined destination and format. +pub trait TableBatchWriter { + /// The bounds which [`TableBatchWriter`] implementors must adhere to when + /// returning errors for a failed write. + type WriteError: Error + Into; + + /// Write out `table_batches` to the implementation defined destination and format. + fn write_table_batches(&mut self, table_batches: B) -> Result<(), Self::WriteError> + where + B: Iterator; } -/// Provides namespaced write functionality from table-based mutable batches -/// to namespaced line protocol output. +/// NamespaceDemultiplexer is a delegator from [`NamespaceId`] to some namespaced +/// type, lazily initialising instances as required. #[derive(Debug)] -pub struct LineProtoWriter -where - W: Write, -{ - namespaced_output: HashMap, - new_write_sink: F, - - table_name_index: Option>, +pub struct NamespaceDemultiplexer { + // The map used to hold currently initialised `T` and lookup within. + demux_map: HashMap, + // Mechanism to initialise a new `T` when no entry is found in the + // `demux_map`. + init_new: F, } -impl LineProtoWriter +impl NamespaceDemultiplexer where - W: Write, + T: Send, + F: (Fn(NamespaceId) -> I) + Send + Sync, + I: Future> + Send, { - /// Performs a best effort flush of all write destinations opened by the [`LineProtoWriter`]. - pub fn flush(&mut self) -> Result<(), Vec> { - let mut errs = Vec::::new(); - for w in self.namespaced_output.values_mut() { - if let Err(e) = w.flush() { - errs.push(WriteError::IoError(e)); + /// Creates a [`NamespaceDemultiplexer`] that uses `F` to lazily initialise + /// instances of [`T`] when there is no entry in the map for a given [`NamespaceId`]. + pub fn new(init_new: F) -> Self { + Self { + demux_map: Default::default(), + init_new, + } + } + + /// Looks up the [`T`] corresponding to `namespace_id`, initialising a new + /// instance through the provided mechanism if no entry exists yet. + pub async fn get(&mut self, namespace_id: NamespaceId) -> Result<&mut T, E> { + match self.demux_map.entry(namespace_id) { + Entry::Occupied(entry) => Ok(entry.into_mut()), + Entry::Vacant(empty_entry) => { + let value = (self.init_new)(namespace_id).await?; + Ok(empty_entry.insert(value)) } } - if !errs.is_empty() { - return Err(errs); - } - Ok(()) } } -impl Drop for LineProtoWriter +/// The [`LineProtoWriter`] enables rewriting table-keyed mutable batches as +/// line protocol to a [`Write`] implementation. +#[derive(Debug)] +pub struct LineProtoWriter +where + W: Write, +{ + sink: W, + table_name_lookup: Option>, +} + +impl LineProtoWriter +where + W: Write, +{ + /// Constructs a new [`LineProtoWriter`] that writes batches of table writes + /// to `sink` as line protocol. + /// + /// If provided, `table_name_lookup` MUST contain an exhaustive mapping from + /// table ID to corresponding table name so that the correct measurement name + /// can be placed in the line proto. WAL entries store only table ID, not + /// name and thus cannot be inferred. + /// + /// If no lookup is given then measurement names are written as table IDs. + pub fn new(sink: W, table_name_lookup: Option>) -> Self { + Self { + sink, + table_name_lookup, + } + } +} + +impl Drop for LineProtoWriter where W: Write, { fn drop(&mut self) { - _ = self.flush() + _ = self.sink.flush() } } -impl LineProtoWriter +impl TableBatchWriter for LineProtoWriter where W: Write, - F: Fn(NamespaceId) -> Result, { - /// Constructs a new [`LineProtoWriter`] that uses `new_write_sink` to - /// get the destination for each line protocol write by its namespace ID. - /// - /// The optional `table_name_index` is used to provide a mapping for all table IDs - /// to the corresponding table name to recover the measurement name, as WAL write - /// entries do not contain this information. If supplied, the index MUST be exhaustive. - /// - /// If no index is given then measurement names are written as table IDs. - pub fn new(new_write_sink: F, table_name_index: Option>) -> Self { - Self { - namespaced_output: HashMap::new(), - new_write_sink, - table_name_index, - } - } -} + type WriteError = WriteError; -impl NamespacedBatchWriter for LineProtoWriter -where - W: Write, - F: Fn(NamespaceId) -> Result, -{ /// Writes the provided set of table batches as line protocol write entries /// to the destination for the provided namespace ID. - fn write_namespaced_table_batches( - &mut self, - ns: NamespaceId, - table_batches: HashMap, - ) -> Result<(), WriteError> { - let sink = self - .namespaced_output - .entry(ns) - .or_insert((self.new_write_sink)(ns)?); - - write_batches_as_line_proto( - sink, - self.table_name_index.as_ref(), - table_batches.into_iter(), - ) + fn write_table_batches(&mut self, table_batches: B) -> Result<(), Self::WriteError> + where + B: Iterator, + { + for (table_id, mb) in table_batches { + let schema = mb.schema(schema::Projection::All)?; + let record_batch = mb.to_arrow(schema::Projection::All)?; + let measurement_name = match &self.table_name_lookup { + Some(idx) => Cow::Borrowed(idx.get(&table_id).ok_or( + WriteError::RecordBatchTranslationFailure(format!( + "missing table name for id {}", + &table_id + )), + )?), + None => Cow::Owned(table_id.to_string()), + }; + self.sink.write_all( + convert_to_lines(&measurement_name, &schema, &record_batch) + .map_err(WriteError::RecordBatchTranslationFailure)? + .as_slice(), + )?; + } + Ok(()) } } -fn write_batches_as_line_proto( - sink: &mut W, - table_name_index: Option<&HashMap>, - table_batches: B, -) -> Result<(), WriteError> -where - W: Write, - B: Iterator, -{ - for (table_id, mb) in table_batches { - let schema = mb.schema(schema::Projection::All)?; - let record_batch = mb.to_arrow(schema::Projection::All)?; - let table_id = TableId::new(table_id); - let measurement_name = match table_name_index { - Some(idx) => Cow::Borrowed(idx.get(&table_id).ok_or( - WriteError::ConvertToLineProtocolFailed(format!( - "missing table name for id {}", - &table_id - )), - )?), - None => Cow::Owned(table_id.to_string()), - }; - sink.write_all( - convert_to_lines(&measurement_name, &schema, &record_batch) - .map_err(WriteError::ConvertToLineProtocolFailed)? - .as_slice(), - )?; - } - Ok(()) -} - #[cfg(test)] mod tests { use std::collections::BTreeMap; - use assert_matches::assert_matches; use data_types::TableId; use dml::DmlWrite; use generated_types::influxdata::{ @@ -229,7 +226,13 @@ mod tests { wal.reader_for_segment(closed.id()) .expect("failed to open reader for closed segment"), ); - let mut writer = LineProtoWriter::new(|_| Ok(Vec::::new()), Some(table_name_index)); + let mut namespace_demux = NamespaceDemultiplexer::new(|_namespace_id| async { + let result: Result>, WriteError> = Ok(LineProtoWriter::new( + Vec::::new(), + Some(table_name_index.clone()), + )); + result + }); let decoded_entries = decoder .into_iter() @@ -243,14 +246,13 @@ mod tests { assert_eq!(decoded_ops.len(), 3); for entry in decoded_ops { - writer - .write_namespaced_table_batches(entry.namespace, entry.table_batches) - .expect("batch write should not fail"); + namespace_demux + .get(entry.namespace) + .await + .expect("failed to get namespaced writer") + .write_table_batches(entry.table_batches.into_iter()) + .expect("should not fail to write table batches"); } - // The WAL has been given a single entry containing three write ops - - let results = &writer.namespaced_output; - // Assert that the namespaced writes contain ONLY the following: // // NamespaceId 1: @@ -262,20 +264,31 @@ mod tests { // // m2,t=bar v="arán" 1 // - assert_eq!(results.len(), 2); - assert_matches!(results.get(&NamespaceId::new(1)), Some(e) => { - assert_eq!( - String::from_utf8(e.to_owned()).unwrap().as_str(), format!("{}\n{}\n", line1, line3)); - }); - assert_matches!(results.get(&NamespaceId::new(2)), Some(e) => { - assert_eq!( - String::from_utf8(e.to_owned()).unwrap().as_str(), format!("{}\n", line2)); - }); + assert_eq!(namespace_demux.demux_map.len(), 2); + let ns_1_results = namespace_demux + .get(NamespaceId::new(1)) + .await + .expect("failed to get namespaced writer") + .sink + .clone(); + assert_eq!( + String::from_utf8(ns_1_results).unwrap().as_str(), + format!("{}\n{}\n", line1, line3) + ); + let ns_2_results = namespace_demux + .get(NamespaceId::new(2)) + .await + .expect("failed to get namespaced writer") + .sink + .clone(); + assert_eq!( + String::from_utf8(ns_2_results).unwrap().as_str(), + format!("{}\n", line2) + ); } #[test] fn write_line_proto_without_index() { - let mut sink = Vec::::new(); let batches = BTreeMap::from_iter( lines_to_batches( r#"m1,t=foo v=1i 1 @@ -287,14 +300,17 @@ m2,t=bar v="arán" 1"#, .collect::>() .into_iter() .enumerate() - .map(|(i, (_table_name, batch))| (i as i64, batch)), + .map(|(i, (_table_name, batch))| (TableId::new(i as i64), batch)), ); - write_batches_as_line_proto(&mut sink, None, batches.into_iter()) + let mut lp_writer = LineProtoWriter::new(Vec::new(), None); + + lp_writer + .write_table_batches(batches.into_iter()) .expect("write back to line proto should succeed"); assert_eq!( - String::from_utf8(sink).expect("invalid output"), + String::from_utf8(lp_writer.sink.clone()).expect("invalid output"), r#"0,t=foo v=1i 1 1,t=bar v="arán" 1 "#