feat: add distinct count to StatValues (#1568)
parent
d8f19348bf
commit
db432de137
|
@ -5,6 +5,7 @@ use std::{borrow::Cow, mem};
|
|||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::borrow::Borrow;
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
/// Describes the aggregated (across all chunks) summary
|
||||
/// statistics for each column in each table in a partition
|
||||
|
@ -301,6 +302,11 @@ pub struct StatValues<T> {
|
|||
|
||||
/// number of non-nil values in this column
|
||||
pub count: u64,
|
||||
|
||||
/// number of distinct values in this column if known
|
||||
///
|
||||
/// This includes NULLs and NANs
|
||||
pub distinct_count: Option<NonZeroU64>,
|
||||
}
|
||||
|
||||
impl<T> Default for StatValues<T> {
|
||||
|
@ -309,6 +315,7 @@ impl<T> Default for StatValues<T> {
|
|||
min: None,
|
||||
max: None,
|
||||
count: 0,
|
||||
distinct_count: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -328,6 +335,7 @@ where
|
|||
min: starting_value.clone(),
|
||||
max: starting_value,
|
||||
count: 1,
|
||||
distinct_count: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -342,12 +350,20 @@ where
|
|||
assert!(min <= max);
|
||||
}
|
||||
|
||||
Self { min, max, count }
|
||||
Self {
|
||||
min,
|
||||
max,
|
||||
count,
|
||||
distinct_count: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_from(&mut self, other: &Self) {
|
||||
self.count += other.count;
|
||||
|
||||
// No way to accurately aggregate counts
|
||||
self.distinct_count = None;
|
||||
|
||||
match (&self.min, &other.min) {
|
||||
(None, None) | (Some(_), None) => {}
|
||||
(None, Some(o)) => self.min = Some(o.clone()),
|
||||
|
@ -647,62 +663,46 @@ mod tests {
|
|||
let col = table_a.column("string").unwrap();
|
||||
assert_eq!(
|
||||
col.stats,
|
||||
Statistics::String(StatValues {
|
||||
min: Some("aaa".to_string()),
|
||||
max: Some("zzz".to_string()),
|
||||
count: 4
|
||||
})
|
||||
Statistics::String(StatValues::new(
|
||||
Some("aaa".to_string()),
|
||||
Some("zzz".to_string()),
|
||||
4
|
||||
))
|
||||
);
|
||||
|
||||
let col = table_a.column("int").unwrap();
|
||||
assert_eq!(
|
||||
col.stats,
|
||||
Statistics::I64(StatValues {
|
||||
min: Some(1),
|
||||
max: Some(9),
|
||||
count: 4
|
||||
})
|
||||
Statistics::I64(StatValues::new(Some(1), Some(9), 4))
|
||||
);
|
||||
|
||||
let col = table_a.column("float").unwrap();
|
||||
assert_eq!(
|
||||
col.stats,
|
||||
Statistics::F64(StatValues {
|
||||
min: Some(1.3),
|
||||
max: Some(9.1),
|
||||
count: 2
|
||||
})
|
||||
Statistics::F64(StatValues::new(Some(1.3), Some(9.1), 2))
|
||||
);
|
||||
|
||||
table_b.update_from(&table_c);
|
||||
let col = table_b.column("string").unwrap();
|
||||
assert_eq!(
|
||||
col.stats,
|
||||
Statistics::String(StatValues {
|
||||
min: Some("aaa".to_string()),
|
||||
max: Some("zzz".to_string()),
|
||||
count: 4
|
||||
})
|
||||
Statistics::String(StatValues::new(
|
||||
Some("aaa".to_string()),
|
||||
Some("zzz".to_string()),
|
||||
4
|
||||
))
|
||||
);
|
||||
|
||||
let col = table_b.column("int").unwrap();
|
||||
assert_eq!(
|
||||
col.stats,
|
||||
Statistics::I64(StatValues {
|
||||
min: Some(1),
|
||||
max: Some(9),
|
||||
count: 4
|
||||
})
|
||||
Statistics::I64(StatValues::new(Some(1), Some(9), 4))
|
||||
);
|
||||
|
||||
let col = table_b.column("float").unwrap();
|
||||
assert_eq!(
|
||||
col.stats,
|
||||
Statistics::F64(StatValues {
|
||||
min: Some(1.3),
|
||||
max: Some(9.1),
|
||||
count: 2
|
||||
})
|
||||
Statistics::F64(StatValues::new(Some(1.3), Some(9.1), 2))
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -762,30 +762,22 @@ mod tests {
|
|||
let col = t.column("string").unwrap();
|
||||
assert_eq!(
|
||||
col.stats,
|
||||
Statistics::String(StatValues {
|
||||
min: Some("bar".to_string()),
|
||||
max: Some("foo".to_string()),
|
||||
count: 2
|
||||
})
|
||||
Statistics::String(StatValues::new(
|
||||
Some("bar".to_string()),
|
||||
Some("foo".to_string()),
|
||||
2
|
||||
))
|
||||
);
|
||||
let col = t.column("int").unwrap();
|
||||
assert_eq!(
|
||||
col.stats,
|
||||
Statistics::I64(StatValues {
|
||||
min: Some(1),
|
||||
max: Some(10),
|
||||
count: 3
|
||||
})
|
||||
Statistics::I64(StatValues::new(Some(1), Some(10), 3))
|
||||
);
|
||||
let t = partition.table("b").unwrap();
|
||||
let col = t.column("int").unwrap();
|
||||
assert_eq!(
|
||||
col.stats,
|
||||
Statistics::I64(StatValues {
|
||||
min: Some(10),
|
||||
max: Some(203),
|
||||
count: 2
|
||||
})
|
||||
Statistics::I64(StatValues::new(Some(10), Some(203), 2))
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -794,27 +786,15 @@ mod tests {
|
|||
let bool_false = ColumnSummary {
|
||||
name: "b".to_string(),
|
||||
influxdb_type: None,
|
||||
stats: Statistics::Bool(StatValues {
|
||||
min: Some(false),
|
||||
max: Some(false),
|
||||
count: 1,
|
||||
}),
|
||||
stats: Statistics::Bool(StatValues::new(Some(false), Some(false), 1)),
|
||||
};
|
||||
let bool_true = ColumnSummary {
|
||||
name: "b".to_string(),
|
||||
influxdb_type: None,
|
||||
stats: Statistics::Bool(StatValues {
|
||||
min: Some(true),
|
||||
max: Some(true),
|
||||
count: 1,
|
||||
}),
|
||||
stats: Statistics::Bool(StatValues::new(Some(true), Some(true), 1)),
|
||||
};
|
||||
|
||||
let expected_stats = Statistics::Bool(StatValues {
|
||||
min: Some(false),
|
||||
max: Some(true),
|
||||
count: 2,
|
||||
});
|
||||
let expected_stats = Statistics::Bool(StatValues::new(Some(false), Some(true), 2));
|
||||
|
||||
let mut b = bool_false.clone();
|
||||
b.update_from(&bool_true);
|
||||
|
@ -830,30 +810,18 @@ mod tests {
|
|||
let mut min = ColumnSummary {
|
||||
name: "foo".to_string(),
|
||||
influxdb_type: None,
|
||||
stats: Statistics::U64(StatValues {
|
||||
min: Some(5),
|
||||
max: Some(23),
|
||||
count: 1,
|
||||
}),
|
||||
stats: Statistics::U64(StatValues::new(Some(5), Some(23), 1)),
|
||||
};
|
||||
|
||||
let max = ColumnSummary {
|
||||
name: "foo".to_string(),
|
||||
influxdb_type: None,
|
||||
stats: Statistics::U64(StatValues {
|
||||
min: Some(6),
|
||||
max: Some(506),
|
||||
count: 43,
|
||||
}),
|
||||
stats: Statistics::U64(StatValues::new(Some(6), Some(506), 43)),
|
||||
};
|
||||
|
||||
min.update_from(&max);
|
||||
|
||||
let expected = Statistics::U64(StatValues {
|
||||
min: Some(5),
|
||||
max: Some(506),
|
||||
count: 44,
|
||||
});
|
||||
let expected = Statistics::U64(StatValues::new(Some(5), Some(506), 44));
|
||||
assert_eq!(min.stats, expected);
|
||||
}
|
||||
|
||||
|
|
|
@ -237,7 +237,10 @@ pub mod test_helpers {
|
|||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
use arrow_util::assert_batches_eq;
|
||||
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics};
|
||||
|
||||
use super::test_helpers::write_lp_to_chunk;
|
||||
use super::*;
|
||||
|
@ -294,6 +297,68 @@ mod tests {
|
|||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_summary() {
|
||||
let mut chunk = Chunk::new("cpu", ChunkMetrics::new_unregistered());
|
||||
let lp = r#"
|
||||
cpu,host=a val=23 1
|
||||
cpu,host=b,env=prod val=2 1
|
||||
cpu,host=c,env=stage val=11 1
|
||||
cpu,host=a,env=prod val=14 2
|
||||
"#;
|
||||
write_lp_to_chunk(&lp, &mut chunk).unwrap();
|
||||
|
||||
let summary = chunk.table_summary();
|
||||
assert_eq!(
|
||||
summary,
|
||||
TableSummary {
|
||||
name: "cpu".to_string(),
|
||||
columns: vec![
|
||||
ColumnSummary {
|
||||
name: "env".to_string(),
|
||||
influxdb_type: Some(InfluxDbType::Tag),
|
||||
stats: Statistics::String(StatValues {
|
||||
min: Some("prod".to_string()),
|
||||
max: Some("stage".to_string()),
|
||||
count: 3,
|
||||
distinct_count: Some(NonZeroU64::new(3).unwrap())
|
||||
})
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "host".to_string(),
|
||||
influxdb_type: Some(InfluxDbType::Tag),
|
||||
stats: Statistics::String(StatValues {
|
||||
min: Some("a".to_string()),
|
||||
max: Some("c".to_string()),
|
||||
count: 4,
|
||||
distinct_count: Some(NonZeroU64::new(3).unwrap())
|
||||
})
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "time".to_string(),
|
||||
influxdb_type: Some(InfluxDbType::Timestamp),
|
||||
stats: Statistics::I64(StatValues {
|
||||
min: Some(1),
|
||||
max: Some(2),
|
||||
count: 4,
|
||||
distinct_count: None
|
||||
})
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "val".to_string(),
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
stats: Statistics::F64(StatValues {
|
||||
min: Some(2.),
|
||||
max: Some(23.),
|
||||
count: 4,
|
||||
distinct_count: None
|
||||
})
|
||||
},
|
||||
]
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[cfg(not(feature = "nocache"))]
|
||||
fn test_snapshot() {
|
||||
|
|
|
@ -17,6 +17,7 @@ use entry::Column as EntryColumn;
|
|||
use internal_types::schema::{InfluxColumnType, InfluxFieldType, TIME_DATA_TYPE};
|
||||
|
||||
use crate::dictionary::{Dictionary, DID, INVALID_DID};
|
||||
use std::convert::TryInto;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
#[allow(missing_copy_implementations)]
|
||||
|
@ -260,8 +261,17 @@ impl Column {
|
|||
ColumnData::I64(_, stats) => Statistics::I64(stats.clone()),
|
||||
ColumnData::U64(_, stats) => Statistics::U64(stats.clone()),
|
||||
ColumnData::Bool(_, stats) => Statistics::Bool(stats.clone()),
|
||||
ColumnData::String(_, stats) | ColumnData::Tag(_, _, stats) => {
|
||||
Statistics::String(stats.clone())
|
||||
ColumnData::String(_, stats) => Statistics::String(stats.clone()),
|
||||
ColumnData::Tag(keys, dictionary, stats) => {
|
||||
let mut distinct_count = dictionary.values().len() as u64;
|
||||
if keys.len() as u64 != stats.count {
|
||||
// Column contains NULLs
|
||||
distinct_count += 1;
|
||||
}
|
||||
|
||||
let mut stats = stats.clone();
|
||||
stats.distinct_count = distinct_count.try_into().ok();
|
||||
Statistics::String(stats)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,7 +14,8 @@ use internal_types::{
|
|||
selection::Selection,
|
||||
};
|
||||
|
||||
use crate::{column, column::Column};
|
||||
use crate::column;
|
||||
use crate::column::Column;
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
|
|
|
@ -355,6 +355,9 @@ fn extract_iox_statistics(
|
|||
Ok(Statistics::Bool(StatValues {
|
||||
min: Some(*stats.min()),
|
||||
max: Some(*stats.max()),
|
||||
distinct_count: parquet_stats
|
||||
.distinct_count()
|
||||
.and_then(|x| x.try_into().ok()),
|
||||
count,
|
||||
}))
|
||||
}
|
||||
|
@ -362,6 +365,9 @@ fn extract_iox_statistics(
|
|||
Ok(Statistics::I64(StatValues {
|
||||
min: Some(*stats.min()),
|
||||
max: Some(*stats.max()),
|
||||
distinct_count: parquet_stats
|
||||
.distinct_count()
|
||||
.and_then(|x| x.try_into().ok()),
|
||||
count,
|
||||
}))
|
||||
}
|
||||
|
@ -371,6 +377,9 @@ fn extract_iox_statistics(
|
|||
Ok(Statistics::U64(StatValues {
|
||||
min: Some(*stats.min() as u64),
|
||||
max: Some(*stats.max() as u64),
|
||||
distinct_count: parquet_stats
|
||||
.distinct_count()
|
||||
.and_then(|x| x.try_into().ok()),
|
||||
count,
|
||||
}))
|
||||
}
|
||||
|
@ -378,6 +387,9 @@ fn extract_iox_statistics(
|
|||
Ok(Statistics::F64(StatValues {
|
||||
min: Some(*stats.min()),
|
||||
max: Some(*stats.max()),
|
||||
distinct_count: parquet_stats
|
||||
.distinct_count()
|
||||
.and_then(|x| x.try_into().ok()),
|
||||
count,
|
||||
}))
|
||||
}
|
||||
|
@ -385,6 +397,9 @@ fn extract_iox_statistics(
|
|||
Ok(Statistics::I64(StatValues {
|
||||
min: Some(*stats.min()),
|
||||
max: Some(*stats.max()),
|
||||
distinct_count: parquet_stats
|
||||
.distinct_count()
|
||||
.and_then(|x| x.try_into().ok()),
|
||||
count,
|
||||
}))
|
||||
}
|
||||
|
@ -411,6 +426,9 @@ fn extract_iox_statistics(
|
|||
})?
|
||||
.to_string(),
|
||||
),
|
||||
distinct_count: parquet_stats
|
||||
.distinct_count()
|
||||
.and_then(|x| x.try_into().ok()),
|
||||
count,
|
||||
}))
|
||||
}
|
||||
|
|
|
@ -189,6 +189,7 @@ fn create_column_tag(
|
|||
min: Some(data.iter().flatten().min().unwrap().to_string()),
|
||||
max: Some(data.iter().flatten().max().unwrap().to_string()),
|
||||
count: data.iter().map(Vec::len).sum::<usize>() as u64,
|
||||
distinct_count: None,
|
||||
}),
|
||||
});
|
||||
|
||||
|
@ -208,10 +209,16 @@ fn create_column_field_string(
|
|||
arrow_cols,
|
||||
summaries,
|
||||
schema_builder,
|
||||
|StatValues { min, max, count }| {
|
||||
|StatValues {
|
||||
min,
|
||||
max,
|
||||
count,
|
||||
distinct_count,
|
||||
}| {
|
||||
Statistics::String(StatValues {
|
||||
min: Some(min.unwrap().to_string()),
|
||||
max: Some(max.unwrap().to_string()),
|
||||
distinct_count,
|
||||
count,
|
||||
})
|
||||
},
|
||||
|
@ -285,6 +292,7 @@ fn create_column_field_f64(
|
|||
.max_by(|a, b| a.partial_cmp(b).unwrap())
|
||||
.cloned(),
|
||||
count: data.iter().map(Vec::len).sum::<usize>() as u64,
|
||||
distinct_count: None,
|
||||
}),
|
||||
});
|
||||
|
||||
|
@ -338,6 +346,7 @@ where
|
|||
min: data.iter().flatten().min().cloned(),
|
||||
max: data.iter().flatten().max().cloned(),
|
||||
count: data.iter().map(Vec::len).sum::<usize>() as u64,
|
||||
distinct_count: None,
|
||||
}),
|
||||
});
|
||||
|
||||
|
@ -368,6 +377,7 @@ fn create_column_timestamp(
|
|||
min,
|
||||
max,
|
||||
count: data.iter().map(Vec::len).sum::<usize>() as u64,
|
||||
distinct_count: None,
|
||||
}),
|
||||
});
|
||||
|
||||
|
|
|
@ -750,6 +750,7 @@ mod test {
|
|||
};
|
||||
use arrow::array::DictionaryArray;
|
||||
use arrow::datatypes::Int32Type;
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
// helper to make the `add_remove_tables` test simpler to read.
|
||||
fn gen_recordbatch() -> RecordBatch {
|
||||
|
@ -1167,20 +1168,12 @@ mod test {
|
|||
ColumnSummary {
|
||||
name: "active".into(),
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
stats: Statistics::Bool(StatValues {
|
||||
min: Some(false),
|
||||
max: Some(true),
|
||||
count: 3,
|
||||
}),
|
||||
stats: Statistics::Bool(StatValues::new(Some(false), Some(true), 3)),
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "counter".into(),
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
stats: Statistics::U64(StatValues {
|
||||
min: Some(1000),
|
||||
max: Some(5000),
|
||||
count: 3,
|
||||
}),
|
||||
stats: Statistics::U64(StatValues::new(Some(1000), Some(5000), 3)),
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "env".into(),
|
||||
|
@ -1189,16 +1182,13 @@ mod test {
|
|||
min: Some("dev".into()),
|
||||
max: Some("prod".into()),
|
||||
count: 3,
|
||||
distinct_count: Some(NonZeroU64::new(2).unwrap()),
|
||||
}),
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "icounter".into(),
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
stats: Statistics::I64(StatValues {
|
||||
min: Some(-1000),
|
||||
max: Some(4000),
|
||||
count: 3,
|
||||
}),
|
||||
stats: Statistics::I64(StatValues::new(Some(-1000), Some(4000), 3)),
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "msg".into(),
|
||||
|
@ -1207,25 +1197,18 @@ mod test {
|
|||
min: Some("msg a".into()),
|
||||
max: Some("msg b".into()),
|
||||
count: 3,
|
||||
distinct_count: Some(NonZeroU64::new(3).unwrap()),
|
||||
}),
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "temp".into(),
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
stats: Statistics::F64(StatValues {
|
||||
min: Some(10.0),
|
||||
max: Some(30000.0),
|
||||
count: 3,
|
||||
}),
|
||||
stats: Statistics::F64(StatValues::new(Some(10.0), Some(30000.0), 3)),
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "time".into(),
|
||||
influxdb_type: Some(InfluxDbType::Timestamp),
|
||||
stats: Statistics::I64(StatValues {
|
||||
min: Some(3333),
|
||||
max: Some(11111111),
|
||||
count: 3,
|
||||
}),
|
||||
stats: Statistics::I64(StatValues::new(Some(3333), Some(11111111), 3)),
|
||||
},
|
||||
],
|
||||
}];
|
||||
|
|
|
@ -18,6 +18,8 @@ use boolean::BooleanEncoding;
|
|||
use encoding::bool;
|
||||
use float::FloatEncoding;
|
||||
use integer::IntegerEncoding;
|
||||
use std::convert::TryInto;
|
||||
use std::num::NonZeroU64;
|
||||
use string::StringEncoding;
|
||||
|
||||
/// The possible logical types that column values can have. All values in a
|
||||
|
@ -91,6 +93,14 @@ impl Column {
|
|||
}
|
||||
}
|
||||
|
||||
/// The number of distinct values if known
|
||||
pub fn cardinality(&self) -> Option<NonZeroU64> {
|
||||
match &self {
|
||||
Self::String(_, data) => (data.cardinality() as u64).try_into().ok(),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
/// The estimated size in bytes of the contents of the column if it was not
|
||||
/// compressed, and was stored as a contiguous collection of elements. This
|
||||
/// method can provide a good approximation for the size of the column at
|
||||
|
|
|
@ -355,6 +355,14 @@ impl StringEncoding {
|
|||
}
|
||||
}
|
||||
|
||||
/// The number of distinct logical values in this column encoding.
|
||||
pub fn cardinality(&self) -> u32 {
|
||||
match &self {
|
||||
Self::RleDictionary(c) => c.cardinality(),
|
||||
Self::Dictionary(c) => c.cardinality(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Calculate all row ids for each distinct value in the column.
|
||||
pub fn group_row_ids(&self) -> Either<Vec<&RowIDs>, Vec<RowIDs>> {
|
||||
match self {
|
||||
|
|
|
@ -29,6 +29,7 @@ use datafusion::{
|
|||
};
|
||||
use internal_types::schema::{InfluxColumnType, Schema};
|
||||
use internal_types::selection::Selection;
|
||||
use std::num::NonZeroU64;
|
||||
|
||||
/// The name used for a timestamp column.
|
||||
pub const TIME_COLUMN_NAME: &str = internal_types::schema::TIME_COLUMN_NAME;
|
||||
|
@ -83,6 +84,7 @@ impl RowGroup {
|
|||
schema::ColumnType::Tag(name.clone()),
|
||||
c.logical_datatype(),
|
||||
c.column_range(),
|
||||
c.cardinality(),
|
||||
);
|
||||
|
||||
all_columns_by_name.insert(name.clone(), all_columns.len());
|
||||
|
@ -97,6 +99,7 @@ impl RowGroup {
|
|||
schema::ColumnType::Field(name.clone()),
|
||||
c.logical_datatype(),
|
||||
c.column_range(),
|
||||
c.cardinality(),
|
||||
);
|
||||
all_columns_by_name.insert(name.clone(), all_columns.len());
|
||||
all_columns.push(c);
|
||||
|
@ -110,6 +113,7 @@ impl RowGroup {
|
|||
schema::ColumnType::Timestamp(name.clone()),
|
||||
c.logical_datatype(),
|
||||
c.column_range(),
|
||||
c.cardinality(),
|
||||
);
|
||||
|
||||
all_columns_by_name.insert(name.clone(), all_columns.len());
|
||||
|
@ -1449,7 +1453,7 @@ pub enum ColumnType {
|
|||
}
|
||||
|
||||
impl ColumnType {
|
||||
// The total size in bytes of the column
|
||||
/// The total size in bytes of the column
|
||||
pub fn size(&self) -> usize {
|
||||
match &self {
|
||||
Self::Tag(c) => c.size(),
|
||||
|
@ -1458,6 +1462,15 @@ impl ColumnType {
|
|||
}
|
||||
}
|
||||
|
||||
/// The number of distinct values if known
|
||||
pub fn distinct_count(&self) -> Option<NonZeroU64> {
|
||||
match &self {
|
||||
Self::Tag(c) => c.cardinality(),
|
||||
Self::Field(c) => c.cardinality(),
|
||||
Self::Time(c) => c.cardinality(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Helper function to construct a `Tag` column from a slice of `&str`
|
||||
pub fn create_tag(values: &[&str]) -> Self {
|
||||
Self::Tag(Column::from(values))
|
||||
|
@ -1474,6 +1487,7 @@ pub struct ColumnMeta {
|
|||
pub typ: crate::schema::ColumnType,
|
||||
pub logical_data_type: LogicalDataType,
|
||||
pub range: (OwnedValue, OwnedValue),
|
||||
pub distinct_count: Option<NonZeroU64>,
|
||||
}
|
||||
|
||||
impl ColumnMeta {
|
||||
|
@ -1603,6 +1617,7 @@ impl MetaData {
|
|||
col_type: schema::ColumnType,
|
||||
logical_data_type: LogicalDataType,
|
||||
range: (OwnedValue, OwnedValue),
|
||||
distinct_count: Option<NonZeroU64>,
|
||||
) {
|
||||
self.column_names.push(name.to_owned());
|
||||
self.columns.insert(
|
||||
|
@ -1611,6 +1626,7 @@ impl MetaData {
|
|||
typ: col_type,
|
||||
logical_data_type,
|
||||
range,
|
||||
distinct_count,
|
||||
},
|
||||
);
|
||||
self.columns_size += column_size;
|
||||
|
@ -3206,6 +3222,7 @@ west,host-c,pro,10,6
|
|||
OwnedValue::String("east".to_owned()),
|
||||
OwnedValue::String("west".to_owned()),
|
||||
),
|
||||
distinct_count: Some(NonZeroU64::new(233).unwrap()),
|
||||
};
|
||||
|
||||
let col2 = ColumnMeta {
|
||||
|
@ -3215,6 +3232,7 @@ west,host-c,pro,10,6
|
|||
OwnedValue::String("north".to_owned()),
|
||||
OwnedValue::String("west".to_owned()),
|
||||
),
|
||||
distinct_count: Some(NonZeroU64::new(233).unwrap()),
|
||||
};
|
||||
|
||||
let col3 = ColumnMeta {
|
||||
|
@ -3224,6 +3242,7 @@ west,host-c,pro,10,6
|
|||
OwnedValue::String("east".to_owned()),
|
||||
OwnedValue::String("west".to_owned()),
|
||||
),
|
||||
distinct_count: None,
|
||||
};
|
||||
|
||||
assert_eq!(col1, col2);
|
||||
|
|
|
@ -600,17 +600,17 @@ impl MetaData {
|
|||
// Update the table schema using the incoming row group schema
|
||||
for (column_name, column_meta) in &other.columns {
|
||||
let (column_range_min, column_range_max) = &column_meta.range;
|
||||
let mut curr_range = &mut this
|
||||
.columns
|
||||
.get_mut(&column_name.to_string())
|
||||
.unwrap()
|
||||
.range;
|
||||
if column_range_min < &curr_range.0 {
|
||||
curr_range.0 = column_range_min.clone();
|
||||
let mut curr_meta = this.columns.get_mut(&column_name.to_string()).unwrap();
|
||||
|
||||
// No way to accurately aggregate counts across RowGroups
|
||||
curr_meta.distinct_count = None;
|
||||
|
||||
if column_range_min < &curr_meta.range.0 {
|
||||
curr_meta.range.0 = column_range_min.clone();
|
||||
}
|
||||
|
||||
if column_range_max > &curr_range.1 {
|
||||
curr_range.1 = column_range_max.clone();
|
||||
if column_range_max > &curr_meta.range.1 {
|
||||
curr_meta.range.1 = column_range_max.clone();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -682,12 +682,14 @@ impl MetaData {
|
|||
.iter()
|
||||
.map(|(name, column_meta)| {
|
||||
let count = self.rows;
|
||||
let distinct_count = column_meta.distinct_count;
|
||||
|
||||
let stats = match &column_meta.range {
|
||||
(OwnedValue::String(min), OwnedValue::String(max)) => {
|
||||
Statistics::String(StatValues {
|
||||
min: Some(min.to_string()),
|
||||
max: Some(max.to_string()),
|
||||
distinct_count,
|
||||
count,
|
||||
})
|
||||
}
|
||||
|
@ -695,6 +697,7 @@ impl MetaData {
|
|||
Statistics::Bool(StatValues {
|
||||
min: Some(*min),
|
||||
max: Some(*max),
|
||||
distinct_count,
|
||||
count,
|
||||
})
|
||||
}
|
||||
|
@ -702,16 +705,19 @@ impl MetaData {
|
|||
(Scalar::I64(min), Scalar::I64(max)) => Statistics::I64(StatValues {
|
||||
min: Some(*min),
|
||||
max: Some(*max),
|
||||
distinct_count,
|
||||
count,
|
||||
}),
|
||||
(Scalar::U64(min), Scalar::U64(max)) => Statistics::U64(StatValues {
|
||||
min: Some(*min),
|
||||
max: Some(*max),
|
||||
distinct_count,
|
||||
count,
|
||||
}),
|
||||
(Scalar::F64(min), Scalar::F64(max)) => Statistics::F64(StatValues {
|
||||
min: Some(*min),
|
||||
max: Some(*max),
|
||||
distinct_count,
|
||||
count,
|
||||
}),
|
||||
_ => panic!(
|
||||
|
|
|
@ -1437,7 +1437,7 @@ mod tests {
|
|||
.eq(1.0)
|
||||
.unwrap();
|
||||
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1111)
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 1143)
|
||||
.unwrap();
|
||||
|
||||
db.load_chunk_to_read_buffer("1970-01-01T00", "cpu", 0, &Default::default())
|
||||
|
@ -1459,7 +1459,7 @@ mod tests {
|
|||
|
||||
// verify chunk size updated (chunk moved from closing to moving to moved)
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "mutable_buffer", 0).unwrap();
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1598).unwrap();
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1630).unwrap();
|
||||
|
||||
db.write_chunk_to_object_store("1970-01-01T00", "cpu", 0, &Default::default())
|
||||
.await
|
||||
|
@ -1478,8 +1478,8 @@ mod tests {
|
|||
.eq(1.0)
|
||||
.unwrap();
|
||||
|
||||
let expected_parquet_size = 727;
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1598).unwrap();
|
||||
let expected_parquet_size = 759;
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1630).unwrap();
|
||||
// now also in OS
|
||||
catalog_chunk_size_bytes_metric_eq(
|
||||
&test_db.metric_registry,
|
||||
|
@ -1649,7 +1649,7 @@ mod tests {
|
|||
.unwrap();
|
||||
|
||||
// verify chunk size updated (chunk moved from moved to writing to written)
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1598).unwrap();
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1630).unwrap();
|
||||
|
||||
// drop, the chunk from the read buffer
|
||||
db.drop_chunk(partition_key, "cpu", mb_chunk.id()).unwrap();
|
||||
|
@ -1659,7 +1659,7 @@ mod tests {
|
|||
);
|
||||
|
||||
// verify size is reported until chunk dropped
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1598).unwrap();
|
||||
catalog_chunk_size_bytes_metric_eq(&test_db.metric_registry, "read_buffer", 1630).unwrap();
|
||||
std::mem::drop(rb_chunk);
|
||||
|
||||
// verify chunk size updated (chunk dropped from moved state)
|
||||
|
@ -1732,7 +1732,7 @@ mod tests {
|
|||
("svr_id", "1"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(3167.0)
|
||||
.sample_sum_eq(3231.0)
|
||||
.unwrap();
|
||||
|
||||
let rb = collect_read_filter(&rb_chunk, "cpu").await;
|
||||
|
@ -1834,7 +1834,7 @@ mod tests {
|
|||
("svr_id", "10"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(2325.0)
|
||||
.sample_sum_eq(2389.0)
|
||||
.unwrap();
|
||||
|
||||
// it should be the same chunk!
|
||||
|
@ -1942,7 +1942,7 @@ mod tests {
|
|||
("svr_id", "10"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(2325.0)
|
||||
.sample_sum_eq(2389.0)
|
||||
.unwrap();
|
||||
|
||||
// Unload RB chunk but keep it in OS
|
||||
|
@ -1970,7 +1970,7 @@ mod tests {
|
|||
("svr_id", "10"),
|
||||
])
|
||||
.histogram()
|
||||
.sample_sum_eq(727.0)
|
||||
.sample_sum_eq(759.0)
|
||||
.unwrap();
|
||||
|
||||
// Verify data written to the parquet file in object store
|
||||
|
@ -2330,7 +2330,7 @@ mod tests {
|
|||
Arc::from("cpu"),
|
||||
0,
|
||||
ChunkStorage::ReadBufferAndObjectStore,
|
||||
2316, // size of RB and OS chunks
|
||||
2380, // size of RB and OS chunks
|
||||
1,
|
||||
),
|
||||
ChunkSummary::new_without_timestamps(
|
||||
|
@ -2346,7 +2346,7 @@ mod tests {
|
|||
Arc::from("cpu"),
|
||||
0,
|
||||
ChunkStorage::ClosedMutableBuffer,
|
||||
2126,
|
||||
2190,
|
||||
1,
|
||||
),
|
||||
ChunkSummary::new_without_timestamps(
|
||||
|
@ -2372,7 +2372,7 @@ mod tests {
|
|||
.memory()
|
||||
.mutable_buffer()
|
||||
.get_total(),
|
||||
64 + 2126 + 87
|
||||
64 + 2190 + 87
|
||||
);
|
||||
assert_eq!(
|
||||
db.catalog
|
||||
|
@ -2381,11 +2381,11 @@ mod tests {
|
|||
.memory()
|
||||
.read_buffer()
|
||||
.get_total(),
|
||||
1589
|
||||
1621
|
||||
);
|
||||
assert_eq!(
|
||||
db.catalog.state().metrics().memory().parquet().get_total(),
|
||||
727
|
||||
759
|
||||
);
|
||||
}
|
||||
|
||||
|
@ -2435,29 +2435,17 @@ mod tests {
|
|||
ColumnSummary {
|
||||
name: "bar".into(),
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
stats: Statistics::F64(StatValues {
|
||||
min: Some(1.0),
|
||||
max: Some(2.0),
|
||||
count: 2,
|
||||
}),
|
||||
stats: Statistics::F64(StatValues::new(Some(1.0), Some(2.0), 2)),
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "time".into(),
|
||||
influxdb_type: Some(InfluxDbType::Timestamp),
|
||||
stats: Statistics::I64(StatValues {
|
||||
min: Some(1),
|
||||
max: Some(2),
|
||||
count: 2,
|
||||
}),
|
||||
stats: Statistics::I64(StatValues::new(Some(1), Some(2), 2)),
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "baz".into(),
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
stats: Statistics::F64(StatValues {
|
||||
min: Some(3.0),
|
||||
max: Some(3.0),
|
||||
count: 1,
|
||||
}),
|
||||
stats: Statistics::F64(StatValues::new(Some(3.0), Some(3.0), 1)),
|
||||
},
|
||||
],
|
||||
},
|
||||
|
@ -2467,20 +2455,12 @@ mod tests {
|
|||
ColumnSummary {
|
||||
name: "foo".into(),
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
stats: Statistics::F64(StatValues {
|
||||
min: Some(1.0),
|
||||
max: Some(1.0),
|
||||
count: 1,
|
||||
}),
|
||||
stats: Statistics::F64(StatValues::new(Some(1.0), Some(1.0), 1)),
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "time".into(),
|
||||
influxdb_type: Some(InfluxDbType::Timestamp),
|
||||
stats: Statistics::I64(StatValues {
|
||||
min: Some(1),
|
||||
max: Some(1),
|
||||
count: 1,
|
||||
}),
|
||||
stats: Statistics::I64(StatValues::new(Some(1), Some(1), 1)),
|
||||
},
|
||||
],
|
||||
},
|
||||
|
@ -2495,20 +2475,16 @@ mod tests {
|
|||
ColumnSummary {
|
||||
name: "bar".into(),
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
stats: Statistics::F64(StatValues {
|
||||
min: Some(1.0),
|
||||
max: Some(1.0),
|
||||
count: 1,
|
||||
}),
|
||||
stats: Statistics::F64(StatValues::new(Some(1.0), Some(1.0), 1)),
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "time".into(),
|
||||
influxdb_type: Some(InfluxDbType::Timestamp),
|
||||
stats: Statistics::I64(StatValues {
|
||||
min: Some(400000000000000),
|
||||
max: Some(400000000000000),
|
||||
count: 1,
|
||||
}),
|
||||
stats: Statistics::I64(StatValues::new(
|
||||
Some(400000000000000),
|
||||
Some(400000000000000),
|
||||
1,
|
||||
)),
|
||||
},
|
||||
],
|
||||
},
|
||||
|
@ -2518,20 +2494,16 @@ mod tests {
|
|||
ColumnSummary {
|
||||
name: "frob".into(),
|
||||
influxdb_type: Some(InfluxDbType::Field),
|
||||
stats: Statistics::F64(StatValues {
|
||||
min: Some(3.0),
|
||||
max: Some(3.0),
|
||||
count: 1,
|
||||
}),
|
||||
stats: Statistics::F64(StatValues::new(Some(3.0), Some(3.0), 1)),
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "time".into(),
|
||||
influxdb_type: Some(InfluxDbType::Timestamp),
|
||||
stats: Statistics::I64(StatValues {
|
||||
min: Some(400000000000001),
|
||||
max: Some(400000000000001),
|
||||
count: 1,
|
||||
}),
|
||||
stats: Statistics::I64(StatValues::new(
|
||||
Some(400000000000001),
|
||||
Some(400000000000001),
|
||||
1,
|
||||
)),
|
||||
},
|
||||
],
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue