From b3e78d712dfe00a3a84735f4c14ed91f1029bf3b Mon Sep 17 00:00:00 2001 From: Edd Robinson Date: Mon, 22 Jun 2020 18:52:38 +0100 Subject: [PATCH] refactor: address PR feedback --- delorean_tsm/src/mapper.rs | 91 +++++++++++++++----------------------- 1 file changed, 35 insertions(+), 56 deletions(-) diff --git a/delorean_tsm/src/mapper.rs b/delorean_tsm/src/mapper.rs index 662945d272..7577bb04f4 100644 --- a/delorean_tsm/src/mapper.rs +++ b/delorean_tsm/src/mapper.rs @@ -47,18 +47,10 @@ impl Iterator for TSMMeasurementMapper { type Item = Result; fn next(&mut self) -> Option { - let entry = match self.iter.next() { - Some(entry) => match entry { - Ok(entry) => entry, - Err(e) => return Some(Err(e)), - }, - None => return None, // End of index iteration. - }; + // `None` indicates the end of index iteration. + let entry = try_or_some!(self.iter.next()?); - let parsed_key = match entry.parse_key() { - Ok(key) => key, - Err(e) => return Some(Err(e)), - }; + let parsed_key = try_or_some!(entry.parse_key()); let mut measurement: MeasurementTable = MeasurementTable::new(parsed_key.measurement); try_or_some!(measurement.add_series_data( parsed_key.tagset, @@ -69,26 +61,22 @@ impl Iterator for TSMMeasurementMapper { // The first index entry for the item has been processed, next keep // peeking at subsequent entries in the index until a yielded value is - // for a different measurement. At that point we will + // for a different measurement. At that point we will return the + // measurement. while let Some(res) = self.iter.peek() { match res { Ok(entry) => { - match entry.parse_key() { - Ok(parsed_key) => { - if measurement.name != parsed_key.measurement { - // Next entry is for a different measurement. - return Some(Ok(measurement)); - } - - try_or_some!(measurement.add_series_data( - parsed_key.tagset, - parsed_key.field_key, - entry.block_type, - entry.block - )); - } - Err(e) => return Some(Err(e)), - }; + let parsed_key = try_or_some!(entry.parse_key()); + if measurement.name != parsed_key.measurement { + // Next entry is for a different measurement. + return Some(Ok(measurement)); + } + try_or_some!(measurement.add_series_data( + parsed_key.tagset, + parsed_key.field_key, + entry.block_type, + entry.block + )); } Err(e) => return Some(Err(e.clone())), } @@ -318,7 +306,7 @@ pub fn map_field_columns( // columns. // This buffer holds the next decoded block for each input field. - let mut input_block_buffer: BTreeMap = BTreeMap::new(); + let mut input_block_buffer = BTreeMap::new(); refill_block_buffer(&mut decoder, field_blocks, &mut input_block_buffer)?; // This buffer holds the head (ts, value) pair in each decoded input block @@ -328,22 +316,22 @@ pub fn map_field_columns( refill_value_pair_buffer(&mut input_block_buffer, &mut block_value_buffer); // Create output columns for each field. - let mut result: BTreeMap = BTreeMap::new(); + let mut result = BTreeMap::new(); for (field_key, block) in &input_block_buffer { match block { - BlockData::Float { ts: _, values: _ } => { + BlockData::Float { .. } => { result.insert(field_key.clone(), ColumnData::Float(vec![])); } - BlockData::Integer { ts: _, values: _ } => { + BlockData::Integer { .. } => { result.insert(field_key.clone(), ColumnData::Integer(vec![])); } - BlockData::Bool { ts: _, values: _ } => { + BlockData::Bool { .. } => { result.insert(field_key.clone(), ColumnData::Bool(vec![])); } - BlockData::Str { ts: _, values: _ } => { + BlockData::Str { .. } => { result.insert(field_key.clone(), ColumnData::Str(vec![])); } - BlockData::Unsigned { ts: _, values: _ } => { + BlockData::Unsigned { .. } => { result.insert(field_key.clone(), ColumnData::Unsigned(vec![])); } } @@ -356,7 +344,7 @@ pub fn map_field_columns( // // When all inputs have been drained there is no timestamp available to // create a row with and iteration stops. - let mut timestamps: Vec = Vec::new(); // TODO(edd): get hint for pre-allocate + let mut timestamps = Vec::new(); // TODO(edd): get hint for pre-allocate while let Some(min_ts) = map_blocks_to_columns(&mut block_value_buffer, &mut result) { // // TODO(edd): Convert nanoseconds into microseconds for Parquet support. @@ -380,10 +368,7 @@ fn map_blocks_to_columns( ) -> Option { // First determine the minimum timestamp in any of the input blocks or return // None if all of the blocks have been drained. - let min_ts = match blocks.iter().flatten().map(ValuePair::timestamp).min() { - Some(ts) => ts, - None => return None, - }; + let min_ts = blocks.iter().flatten().map(ValuePair::timestamp).min()?; for (i, column) in dst.values_mut().enumerate() { match &mut blocks[i] { @@ -555,6 +540,8 @@ mod tests { use std::io::Cursor; use std::io::Read; + const TSM_FIXTURE_SIZE: usize = 4_222_248; + #[test] fn map_tsm_index() { let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz"); @@ -562,7 +549,8 @@ mod tests { let mut buf = Vec::new(); decoder.read_to_end(&mut buf).unwrap(); - let reader = TSMIndexReader::try_new(BufReader::new(Cursor::new(buf)), 4_222_248).unwrap(); + let reader = + TSMIndexReader::try_new(BufReader::new(Cursor::new(buf)), TSM_FIXTURE_SIZE).unwrap(); let mapper = TSMMeasurementMapper::new(reader.peekable()); // Although there are over 2,000 series keys in the TSM file, there are @@ -570,16 +558,6 @@ mod tests { assert_eq!(mapper.count(), 121); } - struct MockDecoder { - blocks: Vec, - } - - impl super::BlockDecoder for MockDecoder { - fn block_data(&mut self, _: &Block) -> Result { - Ok(self.blocks.remove(0)) - } - } - #[test] fn map_field_columns_file() { let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz"); @@ -588,13 +566,13 @@ mod tests { decoder.read_to_end(&mut buf).unwrap(); let index_reader = - TSMIndexReader::try_new(BufReader::new(Cursor::new(&buf)), 4_222_248).unwrap(); + TSMIndexReader::try_new(BufReader::new(Cursor::new(&buf)), TSM_FIXTURE_SIZE).unwrap(); let mut mapper = TSMMeasurementMapper::new(index_reader.peekable()); let mut block_reader = TSMBlockReader::new(BufReader::new(Cursor::new(&buf))); let mut cpu = mapper - .find(|m| m.to_owned().unwrap().name == "cpu") + .find(|m| m.as_ref().unwrap().name == "cpu") .unwrap() .unwrap(); @@ -615,7 +593,7 @@ mod tests { for field_blocks in cpu.tag_set_fields_blocks.values_mut() { let (_, field_cols) = super::map_field_columns(&mut block_reader, field_blocks).unwrap(); - let keys: Vec<&String> = field_cols.keys().collect(); + let keys: Vec<_> = field_cols.keys().collect(); // Every mapping between field blocks should result in columns // for every field. @@ -630,7 +608,8 @@ mod tests { let mut buf = Vec::new(); decoder.read_to_end(&mut buf).unwrap(); - let reader = TSMIndexReader::try_new(BufReader::new(Cursor::new(buf)), 4_222_248).unwrap(); + let reader = + TSMIndexReader::try_new(BufReader::new(Cursor::new(buf)), TSM_FIXTURE_SIZE).unwrap(); let mut mapper = TSMMeasurementMapper::new(reader.peekable()); let cpu = mapper @@ -640,7 +619,7 @@ mod tests { assert_eq!(cpu.tag_columns(), vec!["cpu", "host"]); assert_eq!( - cpu.field_columns().keys().collect::>(), + cpu.field_columns().keys().collect::>(), vec![ "usage_guest", "usage_guest_nice",