refactor: address PR feedback

pull/24376/head
Edd Robinson 2020-06-22 18:52:38 +01:00
parent 844625d811
commit b3e78d712d
1 changed files with 35 additions and 56 deletions

View File

@ -47,18 +47,10 @@ impl<R: BufRead + Seek> Iterator for TSMMeasurementMapper<R> {
type Item = Result<MeasurementTable, TSMError>;
fn next(&mut self) -> Option<Self::Item> {
let entry = match self.iter.next() {
Some(entry) => match entry {
Ok(entry) => entry,
Err(e) => return Some(Err(e)),
},
None => return None, // End of index iteration.
};
// `None` indicates the end of index iteration.
let entry = try_or_some!(self.iter.next()?);
let parsed_key = match entry.parse_key() {
Ok(key) => key,
Err(e) => return Some(Err(e)),
};
let parsed_key = try_or_some!(entry.parse_key());
let mut measurement: MeasurementTable = MeasurementTable::new(parsed_key.measurement);
try_or_some!(measurement.add_series_data(
parsed_key.tagset,
@ -69,26 +61,22 @@ impl<R: BufRead + Seek> Iterator for TSMMeasurementMapper<R> {
// The first index entry for the item has been processed, next keep
// peeking at subsequent entries in the index until a yielded value is
// for a different measurement. At that point we will
// for a different measurement. At that point we will return the
// measurement.
while let Some(res) = self.iter.peek() {
match res {
Ok(entry) => {
match entry.parse_key() {
Ok(parsed_key) => {
if measurement.name != parsed_key.measurement {
// Next entry is for a different measurement.
return Some(Ok(measurement));
}
try_or_some!(measurement.add_series_data(
parsed_key.tagset,
parsed_key.field_key,
entry.block_type,
entry.block
));
}
Err(e) => return Some(Err(e)),
};
let parsed_key = try_or_some!(entry.parse_key());
if measurement.name != parsed_key.measurement {
// Next entry is for a different measurement.
return Some(Ok(measurement));
}
try_or_some!(measurement.add_series_data(
parsed_key.tagset,
parsed_key.field_key,
entry.block_type,
entry.block
));
}
Err(e) => return Some(Err(e.clone())),
}
@ -318,7 +306,7 @@ pub fn map_field_columns(
// columns.
// This buffer holds the next decoded block for each input field.
let mut input_block_buffer: BTreeMap<String, BlockData> = BTreeMap::new();
let mut input_block_buffer = BTreeMap::new();
refill_block_buffer(&mut decoder, field_blocks, &mut input_block_buffer)?;
// This buffer holds the head (ts, value) pair in each decoded input block
@ -328,22 +316,22 @@ pub fn map_field_columns(
refill_value_pair_buffer(&mut input_block_buffer, &mut block_value_buffer);
// Create output columns for each field.
let mut result: BTreeMap<String, ColumnData> = BTreeMap::new();
let mut result = BTreeMap::new();
for (field_key, block) in &input_block_buffer {
match block {
BlockData::Float { ts: _, values: _ } => {
BlockData::Float { .. } => {
result.insert(field_key.clone(), ColumnData::Float(vec![]));
}
BlockData::Integer { ts: _, values: _ } => {
BlockData::Integer { .. } => {
result.insert(field_key.clone(), ColumnData::Integer(vec![]));
}
BlockData::Bool { ts: _, values: _ } => {
BlockData::Bool { .. } => {
result.insert(field_key.clone(), ColumnData::Bool(vec![]));
}
BlockData::Str { ts: _, values: _ } => {
BlockData::Str { .. } => {
result.insert(field_key.clone(), ColumnData::Str(vec![]));
}
BlockData::Unsigned { ts: _, values: _ } => {
BlockData::Unsigned { .. } => {
result.insert(field_key.clone(), ColumnData::Unsigned(vec![]));
}
}
@ -356,7 +344,7 @@ pub fn map_field_columns(
//
// When all inputs have been drained there is no timestamp available to
// create a row with and iteration stops.
let mut timestamps: Vec<i64> = Vec::new(); // TODO(edd): get hint for pre-allocate
let mut timestamps = Vec::new(); // TODO(edd): get hint for pre-allocate
while let Some(min_ts) = map_blocks_to_columns(&mut block_value_buffer, &mut result) {
//
// TODO(edd): Convert nanoseconds into microseconds for Parquet support.
@ -380,10 +368,7 @@ fn map_blocks_to_columns(
) -> Option<i64> {
// First determine the minimum timestamp in any of the input blocks or return
// None if all of the blocks have been drained.
let min_ts = match blocks.iter().flatten().map(ValuePair::timestamp).min() {
Some(ts) => ts,
None => return None,
};
let min_ts = blocks.iter().flatten().map(ValuePair::timestamp).min()?;
for (i, column) in dst.values_mut().enumerate() {
match &mut blocks[i] {
@ -555,6 +540,8 @@ mod tests {
use std::io::Cursor;
use std::io::Read;
const TSM_FIXTURE_SIZE: usize = 4_222_248;
#[test]
fn map_tsm_index() {
let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
@ -562,7 +549,8 @@ mod tests {
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
let reader = TSMIndexReader::try_new(BufReader::new(Cursor::new(buf)), 4_222_248).unwrap();
let reader =
TSMIndexReader::try_new(BufReader::new(Cursor::new(buf)), TSM_FIXTURE_SIZE).unwrap();
let mapper = TSMMeasurementMapper::new(reader.peekable());
// Although there are over 2,000 series keys in the TSM file, there are
@ -570,16 +558,6 @@ mod tests {
assert_eq!(mapper.count(), 121);
}
struct MockDecoder {
blocks: Vec<BlockData>,
}
impl super::BlockDecoder for MockDecoder {
fn block_data(&mut self, _: &Block) -> Result<BlockData, TSMError> {
Ok(self.blocks.remove(0))
}
}
#[test]
fn map_field_columns_file() {
let file = File::open("../tests/fixtures/000000000000005-000000002.tsm.gz");
@ -588,13 +566,13 @@ mod tests {
decoder.read_to_end(&mut buf).unwrap();
let index_reader =
TSMIndexReader::try_new(BufReader::new(Cursor::new(&buf)), 4_222_248).unwrap();
TSMIndexReader::try_new(BufReader::new(Cursor::new(&buf)), TSM_FIXTURE_SIZE).unwrap();
let mut mapper = TSMMeasurementMapper::new(index_reader.peekable());
let mut block_reader = TSMBlockReader::new(BufReader::new(Cursor::new(&buf)));
let mut cpu = mapper
.find(|m| m.to_owned().unwrap().name == "cpu")
.find(|m| m.as_ref().unwrap().name == "cpu")
.unwrap()
.unwrap();
@ -615,7 +593,7 @@ mod tests {
for field_blocks in cpu.tag_set_fields_blocks.values_mut() {
let (_, field_cols) =
super::map_field_columns(&mut block_reader, field_blocks).unwrap();
let keys: Vec<&String> = field_cols.keys().collect();
let keys: Vec<_> = field_cols.keys().collect();
// Every mapping between field blocks should result in columns
// for every field.
@ -630,7 +608,8 @@ mod tests {
let mut buf = Vec::new();
decoder.read_to_end(&mut buf).unwrap();
let reader = TSMIndexReader::try_new(BufReader::new(Cursor::new(buf)), 4_222_248).unwrap();
let reader =
TSMIndexReader::try_new(BufReader::new(Cursor::new(buf)), TSM_FIXTURE_SIZE).unwrap();
let mut mapper = TSMMeasurementMapper::new(reader.peekable());
let cpu = mapper
@ -640,7 +619,7 @@ mod tests {
assert_eq!(cpu.tag_columns(), vec!["cpu", "host"]);
assert_eq!(
cpu.field_columns().keys().collect::<Vec<&String>>(),
cpu.field_columns().keys().collect::<Vec<_>>(),
vec![
"usage_guest",
"usage_guest_nice",