feat(wal_inspect): Make table_name_index optional for line proto writing

This will allow for decoding a WAL into line protocol without connecting
to a catalog service (or having an equivalent name lookup mechanism).
pull/24376/head
Fraser Savage 2023-05-17 10:46:31 +01:00
parent e7a0ed4b24
commit d530b728d4
No known key found for this signature in database
GPG Key ID: DE47C33CE8C5C446
1 changed files with 56 additions and 15 deletions

View File

@ -15,6 +15,7 @@
missing_debug_implementations, missing_debug_implementations,
missing_docs missing_docs
)] )]
use std::borrow::Cow;
use std::io::Write; use std::io::Write;
use data_types::{NamespaceId, TableId}; use data_types::{NamespaceId, TableId};
@ -62,7 +63,7 @@ where
namespaced_output: HashMap<NamespaceId, W>, namespaced_output: HashMap<NamespaceId, W>,
new_write_sink: F, new_write_sink: F,
table_name_index: HashMap<TableId, String>, table_name_index: Option<HashMap<TableId, String>>,
} }
impl<W, F> LineProtoWriter<W, F> impl<W, F> LineProtoWriter<W, F>
@ -100,10 +101,13 @@ where
{ {
/// Constructs a new [`LineProtoWriter`] that uses `new_write_sink` to /// Constructs a new [`LineProtoWriter`] that uses `new_write_sink` to
/// get the destination for each line protocol write by its namespace ID. /// get the destination for each line protocol write by its namespace ID.
/// The `table_name_index` must provide a mapping from all table IDs to ///
/// table name to recover the measurement name, as WAL write entries do /// The optional `table_name_index` is used to provide a mapping for all table IDs
/// not contain this information. /// to the corresponding table name to recover the measurement name, as WAL write
pub fn new(new_write_sink: F, table_name_index: HashMap<TableId, String>) -> Self { /// entries do not contain this information. If supplied, the index MUST be exhaustive.
///
/// If no index is given then measurement names are written as table IDs.
pub fn new(new_write_sink: F, table_name_index: Option<HashMap<TableId, String>>) -> Self {
Self { Self {
namespaced_output: HashMap::new(), namespaced_output: HashMap::new(),
new_write_sink, new_write_sink,
@ -135,7 +139,7 @@ where
fn write_batches_as_line_proto<W>( fn write_batches_as_line_proto<W>(
sink: &mut W, sink: &mut W,
table_name_index: &HashMap<TableId, String>, table_name_index: &Option<HashMap<TableId, String>>,
table_batches: HashMap<i64, MutableBatch>, table_batches: HashMap<i64, MutableBatch>,
) -> Result<(), WriteError> ) -> Result<(), WriteError>
where where
@ -144,14 +148,18 @@ where
for (table_id, mb) in table_batches { for (table_id, mb) in table_batches {
let schema = mb.schema(schema::Projection::All)?; let schema = mb.schema(schema::Projection::All)?;
let record_batch = mb.to_arrow(schema::Projection::All)?; let record_batch = mb.to_arrow(schema::Projection::All)?;
let measurement_name = table_name_index.get(&TableId::new(table_id)).ok_or( let table_id = TableId::new(table_id);
WriteError::ConvertToLineProtocolFailed(format!( let measurement_name = match table_name_index {
"missing table name for id {}", Some(idx) => Cow::Borrowed(idx.get(&table_id).ok_or(
&table_id WriteError::ConvertToLineProtocolFailed(format!(
)), "missing table name for id {}",
)?; &table_id
)),
)?),
None => Cow::Owned(table_id.to_string()),
};
sink.write_all( sink.write_all(
convert_to_lines(measurement_name, &schema, &record_batch) convert_to_lines(&measurement_name, &schema, &record_batch)
.map_err(WriteError::ConvertToLineProtocolFailed)? .map_err(WriteError::ConvertToLineProtocolFailed)?
.as_slice(), .as_slice(),
)?; )?;
@ -210,7 +218,7 @@ mod tests {
wal.reader_for_segment(closed.id()) wal.reader_for_segment(closed.id())
.expect("failed to open reader for closed segment"), .expect("failed to open reader for closed segment"),
); );
let mut writer = LineProtoWriter::new(|_| Ok(Vec::<u8>::new()), table_name_index); let mut writer = LineProtoWriter::new(|_| Ok(Vec::<u8>::new()), Some(table_name_index));
let decoded_entries = decoder let decoded_entries = decoder
.into_iter() .into_iter()
@ -315,7 +323,7 @@ mod tests {
wal.reader_for_segment(closed.id()) wal.reader_for_segment(closed.id())
.expect("failed to open reader for closed segment"), .expect("failed to open reader for closed segment"),
); );
let mut writer = LineProtoWriter::new(|_| Ok(Vec::<u8>::new()), table_name_index); let mut writer = LineProtoWriter::new(|_| Ok(Vec::<u8>::new()), Some(table_name_index));
// The translator should be able to read all 2 good entries containing 4 write ops // The translator should be able to read all 2 good entries containing 4 write ops
let decoded_entries = decoder let decoded_entries = decoder
@ -359,6 +367,39 @@ mod tests {
}); });
} }
#[test]
fn write_line_proto_without_index() {
let mut sink = Vec::<u8>::new();
let batches = lines_to_batches(
r#"m1,t=foo v=1i 1
m2,t=bar v="arán" 1"#,
0,
)
.expect("failed to create batches from line proto")
.into_iter()
// Trim the non-numeric characters from the batches to make ID derivation deterministic
// (map enumeration is not ordered).
.map(|(table_name, batch)| {
(
table_name
.trim_matches(|c| !char::is_numeric(c))
.parse::<i64>()
.expect("failed to convert table name to id"),
batch,
)
})
.collect();
write_batches_as_line_proto(&mut sink, &None, batches)
.expect("write back to line proto should succeed");
assert_eq!(
String::from_utf8(sink).expect("invalid output"),
r#"1,t=foo v=1i 1
2,t=bar v="arán" 1"#
);
}
fn build_indexes<'a>( fn build_indexes<'a>(
iter: impl IntoIterator<Item = (&'a str, TableId)>, iter: impl IntoIterator<Item = (&'a str, TableId)>,
) -> (HashMap<String, TableId>, HashMap<TableId, String>) { ) -> (HashMap<String, TableId>, HashMap<TableId, String>) {