diff --git a/wal_inspect/src/lib.rs b/wal_inspect/src/lib.rs index f09fd598af..b99cbed6b8 100644 --- a/wal_inspect/src/lib.rs +++ b/wal_inspect/src/lib.rs @@ -15,6 +15,7 @@ missing_debug_implementations, missing_docs )] +use std::borrow::Cow; use std::io::Write; use data_types::{NamespaceId, TableId}; @@ -62,7 +63,7 @@ where namespaced_output: HashMap, new_write_sink: F, - table_name_index: HashMap, + table_name_index: Option>, } impl LineProtoWriter @@ -100,10 +101,13 @@ where { /// Constructs a new [`LineProtoWriter`] that uses `new_write_sink` to /// get the destination for each line protocol write by its namespace ID. - /// The `table_name_index` must provide a mapping from all table IDs to - /// table name to recover the measurement name, as WAL write entries do - /// not contain this information. - pub fn new(new_write_sink: F, table_name_index: HashMap) -> Self { + /// + /// The optional `table_name_index` is used to provide a mapping for all table IDs + /// to the corresponding table name to recover the measurement name, as WAL write + /// entries do not contain this information. If supplied, the index MUST be exhaustive. + /// + /// If no index is given then measurement names are written as table IDs. + pub fn new(new_write_sink: F, table_name_index: Option>) -> Self { Self { namespaced_output: HashMap::new(), new_write_sink, @@ -135,7 +139,7 @@ where fn write_batches_as_line_proto( sink: &mut W, - table_name_index: &HashMap, + table_name_index: &Option>, table_batches: HashMap, ) -> Result<(), WriteError> where @@ -144,14 +148,18 @@ where for (table_id, mb) in table_batches { let schema = mb.schema(schema::Projection::All)?; let record_batch = mb.to_arrow(schema::Projection::All)?; - let measurement_name = table_name_index.get(&TableId::new(table_id)).ok_or( - WriteError::ConvertToLineProtocolFailed(format!( - "missing table name for id {}", - &table_id - )), - )?; + let table_id = TableId::new(table_id); + let measurement_name = match table_name_index { + Some(idx) => Cow::Borrowed(idx.get(&table_id).ok_or( + WriteError::ConvertToLineProtocolFailed(format!( + "missing table name for id {}", + &table_id + )), + )?), + None => Cow::Owned(table_id.to_string()), + }; sink.write_all( - convert_to_lines(measurement_name, &schema, &record_batch) + convert_to_lines(&measurement_name, &schema, &record_batch) .map_err(WriteError::ConvertToLineProtocolFailed)? .as_slice(), )?; @@ -210,7 +218,7 @@ mod tests { wal.reader_for_segment(closed.id()) .expect("failed to open reader for closed segment"), ); - let mut writer = LineProtoWriter::new(|_| Ok(Vec::::new()), table_name_index); + let mut writer = LineProtoWriter::new(|_| Ok(Vec::::new()), Some(table_name_index)); let decoded_entries = decoder .into_iter() @@ -315,7 +323,7 @@ mod tests { wal.reader_for_segment(closed.id()) .expect("failed to open reader for closed segment"), ); - let mut writer = LineProtoWriter::new(|_| Ok(Vec::::new()), table_name_index); + let mut writer = LineProtoWriter::new(|_| Ok(Vec::::new()), Some(table_name_index)); // The translator should be able to read all 2 good entries containing 4 write ops let decoded_entries = decoder @@ -359,6 +367,39 @@ mod tests { }); } + #[test] + fn write_line_proto_without_index() { + let mut sink = Vec::::new(); + let batches = lines_to_batches( + r#"m1,t=foo v=1i 1 +m2,t=bar v="arán" 1"#, + 0, + ) + .expect("failed to create batches from line proto") + .into_iter() + // Trim the non-numeric characters from the batches to make ID derivation deterministic + // (map enumeration is not ordered). + .map(|(table_name, batch)| { + ( + table_name + .trim_matches(|c| !char::is_numeric(c)) + .parse::() + .expect("failed to convert table name to id"), + batch, + ) + }) + .collect(); + + write_batches_as_line_proto(&mut sink, &None, batches) + .expect("write back to line proto should succeed"); + + assert_eq!( + String::from_utf8(sink).expect("invalid output"), + r#"1,t=foo v=1i 1 +2,t=bar v="arán" 1"# + ); + } + fn build_indexes<'a>( iter: impl IntoIterator, ) -> (HashMap, HashMap) {