perf: drain block data more efficiently
This commit reduces copying of block data by replacing an inefficient `remove` call on vectors by with an index tracking approach, leving the original vectors in place. It further refactors some of the mapping code DRYing things up. It improves performance of the `map_field_columns` function by 48%. ``` time: [137.11 us 137.50 us 137.92 us] change: [-49.095% -48.558% -48.033%] (p = 0.00 < 0.05) Performance has improved. ```pull/24376/head
parent
1d3adea394
commit
2be6385ade
|
@ -1,4 +1,5 @@
|
|||
use criterion::{criterion_group, criterion_main, Criterion};
|
||||
use delorean_tsm::reader::*;
|
||||
use delorean_tsm::*;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
|
@ -38,16 +39,19 @@ fn map_field_columns(c: &mut Criterion) {
|
|||
);
|
||||
|
||||
let block0 = BlockData::Float {
|
||||
i: 0,
|
||||
values: vec![100.0; 1000],
|
||||
ts: (0..1000).collect(),
|
||||
};
|
||||
|
||||
let block1 = BlockData::Float {
|
||||
i: 0,
|
||||
values: vec![200.0; 500],
|
||||
ts: (1000..1500).collect(),
|
||||
};
|
||||
|
||||
let block2 = BlockData::Integer {
|
||||
i: 0,
|
||||
values: vec![22; 800],
|
||||
ts: (1000..1800).collect(),
|
||||
};
|
||||
|
|
|
@ -1446,7 +1446,10 @@ mod delorean_ingest_tests {
|
|||
|
||||
#[test]
|
||||
fn process_measurement_table() -> Result<(), Box<dyn std::error::Error>> {
|
||||
use delorean_tsm::{Block, BlockData};
|
||||
use delorean_tsm::{
|
||||
reader::{BlockData, MockBlockDecoder},
|
||||
Block,
|
||||
};
|
||||
|
||||
// Input data - in line protocol format
|
||||
//
|
||||
|
@ -1546,6 +1549,7 @@ mod delorean_ingest_tests {
|
|||
block_map.insert(
|
||||
0,
|
||||
BlockData::Float {
|
||||
i: 0,
|
||||
ts: vec![0, 1000, 2000],
|
||||
values: vec![1.2, 1.2, 1.4],
|
||||
},
|
||||
|
@ -1553,6 +1557,7 @@ mod delorean_ingest_tests {
|
|||
block_map.insert(
|
||||
1,
|
||||
BlockData::Float {
|
||||
i: 0,
|
||||
ts: vec![0, 1000, 2000],
|
||||
values: vec![10.2, 10.2, 10.4],
|
||||
},
|
||||
|
@ -1560,6 +1565,7 @@ mod delorean_ingest_tests {
|
|||
block_map.insert(
|
||||
2,
|
||||
BlockData::Float {
|
||||
i: 0,
|
||||
ts: vec![2000, 3000, 4000],
|
||||
values: vec![100.2, 99.5, 100.3],
|
||||
},
|
||||
|
@ -1567,13 +1573,13 @@ mod delorean_ingest_tests {
|
|||
block_map.insert(
|
||||
3,
|
||||
BlockData::Unsigned {
|
||||
i: 0,
|
||||
ts: vec![3000, 4000, 5000],
|
||||
values: vec![1000, 2000, 3000],
|
||||
},
|
||||
);
|
||||
|
||||
let decoder = delorean_tsm::reader::MockBlockDecoder::new(block_map);
|
||||
|
||||
let decoder = MockBlockDecoder::new(block_map);
|
||||
let (schema, packers) = TSMFileConverter::process_measurement_table(decoder, &mut table)?;
|
||||
|
||||
let expected_defs = vec![
|
||||
|
|
|
@ -131,29 +131,6 @@ pub struct Block {
|
|||
// MAX_BLOCK_VALUES is the maximum number of values a TSM block can store.
|
||||
const MAX_BLOCK_VALUES: usize = 1000;
|
||||
|
||||
/// `BlockData` describes the various types of block data that can be held within
|
||||
/// a TSM file.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum BlockData {
|
||||
Float { ts: Vec<i64>, values: Vec<f64> },
|
||||
Integer { ts: Vec<i64>, values: Vec<i64> },
|
||||
Bool { ts: Vec<i64>, values: Vec<bool> },
|
||||
Str { ts: Vec<i64>, values: Vec<Vec<u8>> },
|
||||
Unsigned { ts: Vec<i64>, values: Vec<u64> },
|
||||
}
|
||||
|
||||
impl BlockData {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
match &self {
|
||||
Self::Float { ts, values: _ } => ts.is_empty(),
|
||||
Self::Integer { ts, values: _ } => ts.is_empty(),
|
||||
Self::Bool { ts, values: _ } => ts.is_empty(),
|
||||
Self::Str { ts, values: _ } => ts.is_empty(),
|
||||
Self::Unsigned { ts, values: _ } => ts.is_empty(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)]
|
||||
/// `InfluxID` represents an InfluxDB ID used in InfluxDB 2.x to represent
|
||||
/// organization and bucket identifiers.
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
///! Types for mapping and converting series data from TSM indexes produced by
|
||||
///! InfluxDB >= 2.x
|
||||
use crate::reader::{BlockDecoder, TSMIndexReader};
|
||||
use crate::{Block, BlockData, BlockType, TSMError};
|
||||
use crate::reader::{BlockData, BlockDecoder, TSMIndexReader, ValuePair};
|
||||
use crate::{Block, BlockType, TSMError};
|
||||
|
||||
use log::warn;
|
||||
|
||||
|
@ -220,29 +220,6 @@ pub enum ColumnData {
|
|||
Unsigned(Vec<Option<u64>>),
|
||||
}
|
||||
|
||||
// ValuePair represents a single timestamp-value pair from a TSM block.
|
||||
#[derive(Debug, PartialEq)]
|
||||
enum ValuePair {
|
||||
F64((i64, f64)),
|
||||
I64((i64, i64)),
|
||||
Bool((i64, bool)),
|
||||
Str((i64, Vec<u8>)),
|
||||
U64((i64, u64)),
|
||||
}
|
||||
|
||||
impl ValuePair {
|
||||
// returns the timestamp associated with the value pair.
|
||||
fn timestamp(&self) -> i64 {
|
||||
match *self {
|
||||
Self::F64((ts, _)) => ts,
|
||||
Self::I64((ts, _)) => ts,
|
||||
Self::Bool((ts, _)) => ts,
|
||||
Self::Str((ts, _)) => ts,
|
||||
Self::U64((ts, _)) => ts,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Maps multiple columnar field blocks to a single tablular representation.
|
||||
//
|
||||
// Given a set of field keys and a set of blocks for each key,
|
||||
|
@ -464,20 +441,21 @@ fn refill_block_buffer(
|
|||
// refilling.
|
||||
for (field, blocks) in field_blocks.iter_mut() {
|
||||
if blocks.is_empty() {
|
||||
continue; // drained input block
|
||||
continue; // finished with all blocks for this field
|
||||
}
|
||||
|
||||
// peek at any decoded block data for current block for field
|
||||
if let Some(dst_block) = dst.get(field) {
|
||||
if !dst_block.is_empty() {
|
||||
continue; // not ready to be refilled.
|
||||
continue; // not ready to be replaced with next block yet
|
||||
}
|
||||
};
|
||||
|
||||
// Either no block data for field in dst, or the block data that is
|
||||
// present has been drained.
|
||||
// either no block data for field in dst, or the block data that is
|
||||
// present has been drained - ready for the next block for field
|
||||
//
|
||||
// Pop the next input block for this field key, decode it and refill dst
|
||||
// with it.
|
||||
// pop the next input block for this field key, decode it and refill dst
|
||||
// with it
|
||||
let decoded_block = decoder.decode(&blocks.remove(0))?;
|
||||
dst.insert(field.clone(), decoded_block);
|
||||
}
|
||||
|
@ -491,43 +469,9 @@ fn refill_value_pair_buffer(
|
|||
dst: &mut Vec<Option<ValuePair>>,
|
||||
) {
|
||||
for (i, block) in blocks.values_mut().enumerate() {
|
||||
// TODO(edd): seems like this could be DRY'd up a bit??
|
||||
// TODO(edd): PERF - removing from vector will shift elements. Better off
|
||||
// tracking an index that's been read up to?
|
||||
match dst[i] {
|
||||
Some(_) => {}
|
||||
None => match block {
|
||||
BlockData::Float { ts, values } => {
|
||||
if ts.is_empty() {
|
||||
continue;
|
||||
}
|
||||
dst[i] = Some(ValuePair::F64((ts.remove(0), values.remove(0))))
|
||||
}
|
||||
BlockData::Integer { ts, values } => {
|
||||
if ts.is_empty() {
|
||||
continue;
|
||||
}
|
||||
dst[i] = Some(ValuePair::I64((ts.remove(0), values.remove(0))))
|
||||
}
|
||||
BlockData::Bool { ts, values } => {
|
||||
if ts.is_empty() {
|
||||
continue;
|
||||
}
|
||||
dst[i] = Some(ValuePair::Bool((ts.remove(0), values.remove(0))))
|
||||
}
|
||||
BlockData::Str { ts, values } => {
|
||||
if ts.is_empty() {
|
||||
continue;
|
||||
}
|
||||
dst[i] = Some(ValuePair::Str((ts.remove(0), values.remove(0))))
|
||||
}
|
||||
BlockData::Unsigned { ts, values } => {
|
||||
if ts.is_empty() {
|
||||
continue;
|
||||
}
|
||||
dst[i] = Some(ValuePair::U64((ts.remove(0), values.remove(0))))
|
||||
}
|
||||
},
|
||||
if dst[i].is_none() {
|
||||
// (ts, value) pair has been used - fetch next pair (if any).
|
||||
dst[i] = block.next_pair();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -691,6 +635,7 @@ mod tests {
|
|||
input.insert(
|
||||
"a".to_string(),
|
||||
BlockData::Integer {
|
||||
i: 0,
|
||||
ts: vec![1, 2],
|
||||
values: vec![1, 2],
|
||||
},
|
||||
|
@ -699,6 +644,7 @@ mod tests {
|
|||
input.insert(
|
||||
"b".to_string(),
|
||||
BlockData::Integer {
|
||||
i: 0,
|
||||
ts: vec![1, 2, 3],
|
||||
values: vec![10, 20, 30],
|
||||
},
|
||||
|
@ -707,6 +653,7 @@ mod tests {
|
|||
input.insert(
|
||||
"c".to_string(),
|
||||
BlockData::Integer {
|
||||
i: 0,
|
||||
ts: vec![1, 2, 3],
|
||||
values: vec![100, 200, 300],
|
||||
},
|
||||
|
|
|
@ -263,6 +263,106 @@ impl BlockDecoder for MockBlockDecoder {
|
|||
}
|
||||
}
|
||||
|
||||
/// `BlockData` describes the various types of block data that can be held within
|
||||
/// a TSM file.
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum BlockData {
|
||||
Float {
|
||||
i: usize,
|
||||
ts: Vec<i64>,
|
||||
values: Vec<f64>,
|
||||
},
|
||||
Integer {
|
||||
i: usize,
|
||||
ts: Vec<i64>,
|
||||
values: Vec<i64>,
|
||||
},
|
||||
Bool {
|
||||
i: usize,
|
||||
ts: Vec<i64>,
|
||||
values: Vec<bool>,
|
||||
},
|
||||
Str {
|
||||
i: usize,
|
||||
ts: Vec<i64>,
|
||||
values: Vec<Vec<u8>>,
|
||||
},
|
||||
Unsigned {
|
||||
i: usize,
|
||||
ts: Vec<i64>,
|
||||
values: Vec<u64>,
|
||||
},
|
||||
}
|
||||
|
||||
impl BlockData {
|
||||
pub fn next_pair(&mut self) -> Option<ValuePair> {
|
||||
if self.is_empty() {
|
||||
return None;
|
||||
}
|
||||
|
||||
match self {
|
||||
Self::Float { i, ts, values } => {
|
||||
let idx = *i;
|
||||
*i += 1;
|
||||
Some(ValuePair::F64((ts[idx], values[idx])))
|
||||
}
|
||||
Self::Integer { i, ts, values } => {
|
||||
let idx = *i;
|
||||
*i += 1;
|
||||
Some(ValuePair::I64((ts[idx], values[idx])))
|
||||
}
|
||||
Self::Bool { i, ts, values } => {
|
||||
let idx = *i;
|
||||
*i += 1;
|
||||
Some(ValuePair::Bool((ts[idx], values[idx])))
|
||||
}
|
||||
Self::Str { i, ts, values } => {
|
||||
let idx = *i;
|
||||
*i += 1;
|
||||
Some(ValuePair::Str((ts[idx], values[idx].clone()))) // TODO - figure out
|
||||
}
|
||||
Self::Unsigned { i, ts, values } => {
|
||||
let idx = *i;
|
||||
*i += 1;
|
||||
Some(ValuePair::U64((ts[idx], values[idx])))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
match &self {
|
||||
Self::Float { i, ts, values: _ } => *i == ts.len(),
|
||||
Self::Integer { i, ts, values: _ } => *i == ts.len(),
|
||||
Self::Bool { i, ts, values: _ } => *i == ts.len(),
|
||||
Self::Str { i, ts, values: _ } => *i == ts.len(),
|
||||
Self::Unsigned { i, ts, values: _ } => *i == ts.len(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ValuePair represents a single timestamp-value pair from a TSM block.
|
||||
#[derive(Debug, PartialEq)]
|
||||
pub enum ValuePair {
|
||||
F64((i64, f64)),
|
||||
I64((i64, i64)),
|
||||
Bool((i64, bool)),
|
||||
Str((i64, Vec<u8>)),
|
||||
U64((i64, u64)),
|
||||
}
|
||||
|
||||
impl ValuePair {
|
||||
// returns the timestamp associated with the value pair.
|
||||
pub fn timestamp(&self) -> i64 {
|
||||
match *self {
|
||||
Self::F64((ts, _)) => ts,
|
||||
Self::I64((ts, _)) => ts,
|
||||
Self::Bool((ts, _)) => ts,
|
||||
Self::Str((ts, _)) => ts,
|
||||
Self::U64((ts, _)) => ts,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// `TSMBlockReader` allows you to read and decode TSM blocks from within a TSM
|
||||
/// file.
|
||||
///
|
||||
|
@ -326,7 +426,7 @@ where
|
|||
}
|
||||
})?;
|
||||
|
||||
Ok(BlockData::Float { ts, values })
|
||||
Ok(BlockData::Float { i: 0, ts, values })
|
||||
}
|
||||
BlockType::Integer => {
|
||||
// values will be same length as time-stamps.
|
||||
|
@ -335,7 +435,7 @@ where
|
|||
description: e.to_string(),
|
||||
})?;
|
||||
|
||||
Ok(BlockData::Integer { ts, values })
|
||||
Ok(BlockData::Integer { i: 0, ts, values })
|
||||
}
|
||||
BlockType::Bool => {
|
||||
// values will be same length as time-stamps.
|
||||
|
@ -344,7 +444,7 @@ where
|
|||
description: e.to_string(),
|
||||
})?;
|
||||
|
||||
Ok(BlockData::Bool { ts, values })
|
||||
Ok(BlockData::Bool { i: 0, ts, values })
|
||||
}
|
||||
BlockType::Str => {
|
||||
// values will be same length as time-stamps.
|
||||
|
@ -352,7 +452,7 @@ where
|
|||
encoders::string::decode(&data[idx..], &mut values).map_err(|e| TSMError {
|
||||
description: e.to_string(),
|
||||
})?;
|
||||
Ok(BlockData::Str { ts, values })
|
||||
Ok(BlockData::Str { i: 0, ts, values })
|
||||
}
|
||||
BlockType::Unsigned => {
|
||||
// values will be same length as time-stamps.
|
||||
|
@ -360,7 +460,7 @@ where
|
|||
encoders::unsigned::decode(&data[idx..], &mut values).map_err(|e| TSMError {
|
||||
description: e.to_string(),
|
||||
})?;
|
||||
Ok(BlockData::Unsigned { ts, values })
|
||||
Ok(BlockData::Unsigned { i: 0, ts, values })
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -478,11 +578,11 @@ mod tests {
|
|||
for block in blocks {
|
||||
// The first integer block in the value should have 509 values in it.
|
||||
match block {
|
||||
BlockData::Float { ts, values } => {
|
||||
BlockData::Float { i: _, ts, values } => {
|
||||
assert_eq!(ts.len(), 507);
|
||||
assert_eq!(values.len(), 507);
|
||||
}
|
||||
BlockData::Integer { ts, values } => {
|
||||
BlockData::Integer { i: _, ts, values } => {
|
||||
assert_eq!(ts.len(), 509);
|
||||
assert_eq!(values.len(), 509);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue