From bd5d39f60c54626e1320642c5588e120628aaad3 Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Wed, 8 Jul 2020 21:17:00 +0100 Subject: [PATCH] refactor: address PR feedback --- benches/mapper.rs | 1 - delorean_ingest/src/lib.rs | 8 +------- delorean_tsm/src/lib.rs | 3 +++ delorean_tsm/src/mapper.rs | 7 +++++++ delorean_tsm/src/reader.rs | 2 +- src/commands/convert.rs | 17 ++++++----------- 6 files changed, 18 insertions(+), 20 deletions(-) diff --git a/benches/mapper.rs b/benches/mapper.rs index cea1ccc04c..fe402fca8c 100644 --- a/benches/mapper.rs +++ b/benches/mapper.rs @@ -9,7 +9,6 @@ fn map_field_columns(c: &mut Criterion) { let mut measurement_table = mapper::MeasurementTable::new("cpu".to_string(), 0); - // let mut field_blocks: BTreeMap> = BTreeMap::new(); measurement_table .add_series_data( vec![], diff --git a/delorean_ingest/src/lib.rs b/delorean_ingest/src/lib.rs index 535305cf92..9f21177332 100644 --- a/delorean_ingest/src/lib.rs +++ b/delorean_ingest/src/lib.rs @@ -676,15 +676,9 @@ impl TSMFileConverter { .collect::>(); // Process the measurement to build out a table. - // The MeasurementTable emits `TableSection`s, which are partial sections - // of the final table. Each section contains the data for all columns - // in the table, though not all of that data will necessarily be - // materialised. // - // `process` expects a closure that processes each table section as it's - // emitted from the `MeasurementTable`. + // The processing function we supply to `process` does the following: // - // As sections are emitted we do the following: // - Append the timestamp column to the packer timestamp column // - Materialise the same tag value for any tag key columns where the // emitted section has a none-null value for that column. diff --git a/delorean_tsm/src/lib.rs b/delorean_tsm/src/lib.rs index b76314d59e..55af29d66e 100644 --- a/delorean_tsm/src/lib.rs +++ b/delorean_tsm/src/lib.rs @@ -53,6 +53,9 @@ pub struct Block { pub offset: u64, pub size: u32, pub typ: BlockType, + + // This index is used to track an associated reader needed to decode the + // data this block holds. pub reader_idx: usize, } diff --git a/delorean_tsm/src/mapper.rs b/delorean_tsm/src/mapper.rs index 25182ddd70..ecec833503 100644 --- a/delorean_tsm/src/mapper.rs +++ b/delorean_tsm/src/mapper.rs @@ -193,6 +193,13 @@ impl MeasurementTable { } // Process the MeasurementTable in sections. + // + // Each call to `porcess` emits a `TableSection`, which is a partial section + // of the final table. Each section contains the data for all columns + // in the table, though not all of that data will necessarily be + // materialised. + // + // `process` expects a closure to process each section. pub fn process( &mut self, mut block_reader: impl BlockDecoder, diff --git a/delorean_tsm/src/reader.rs b/delorean_tsm/src/reader.rs index 10134fded3..54a7aaaa6f 100644 --- a/delorean_tsm/src/reader.rs +++ b/delorean_tsm/src/reader.rs @@ -482,7 +482,7 @@ where } } None => Err(TSMError { - description: "cannot decode block with no associated decoder".to_string(), + description: format!("cannot decode block {:?} with no associated decoder", block), }), } } diff --git a/src/commands/convert.rs b/src/commands/convert.rs index 8625520cf2..a1991d79b6 100644 --- a/src/commands/convert.rs +++ b/src/commands/convert.rs @@ -116,14 +116,14 @@ pub fn convert( let files: Vec<_> = fs::read_dir(input_path) .unwrap() .filter_map(Result::ok) - .filter(|filename| { - filename - .path() - .extension() - .map_or(false, |x| x == "tsm") - }) + .filter(|filename| filename.path().extension().map_or(false, |x| x == "tsm")) .collect(); + if files.is_empty() { + warn!("No TSM files found"); + return Ok(()); + } + let mut index_readers = Vec::with_capacity(files.len()); let mut block_readers = Vec::with_capacity(files.len()); for file in &files { @@ -135,11 +135,6 @@ pub fn convert( block_readers.push(BufReader::new(block_handle)); } - if block_readers.is_empty() { - warn!("No TSM files found"); - return Ok(()); - } - // setup writing let writer_source: Box = if is_directory(&output_path) { info!("Writing to output directory {:?}", output_path);