diff --git a/Cargo.lock b/Cargo.lock index 402acad833..3173fce4b3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1174,6 +1174,7 @@ version = "0.1.0" dependencies = [ "data_types", "influxdb_line_protocol", + "observability_deps", "ordered-float 3.0.0", "percent-encoding", "schema", @@ -4522,6 +4523,7 @@ dependencies = [ "criterion", "croaring", "data_types", + "data_types2", "datafusion 0.1.0", "either", "hashbrown 0.12.0", diff --git a/data_types2/Cargo.toml b/data_types2/Cargo.toml index 77dea8c888..84c58e76d8 100644 --- a/data_types2/Cargo.toml +++ b/data_types2/Cargo.toml @@ -7,6 +7,7 @@ description = "Shared data types in the IOx NG architecture" [dependencies] data_types = { path = "../data_types" } influxdb_line_protocol = { path = "../influxdb_line_protocol" } +observability_deps = { path = "../observability_deps" } ordered-float = "3" percent-encoding = "2.1.0" schema = { path = "../schema" } diff --git a/data_types2/src/lib.rs b/data_types2/src/lib.rs index 07828173e9..55ef129408 100644 --- a/data_types2/src/lib.rs +++ b/data_types2/src/lib.rs @@ -11,24 +11,23 @@ )] use influxdb_line_protocol::FieldValue; +use observability_deps::tracing::warn; use percent_encoding::{utf8_percent_encode, NON_ALPHANUMERIC}; use schema::{builder::SchemaBuilder, sort::SortKey, InfluxColumnType, InfluxFieldType, Schema}; use snafu::{ResultExt, Snafu}; use std::{ - borrow::Cow, + borrow::{Borrow, Cow}, collections::BTreeMap, convert::TryFrom, fmt::Write, - num::{FpCategory, NonZeroU32}, + mem, + num::{FpCategory, NonZeroU32, NonZeroU64}, ops::{Add, Deref, RangeInclusive, Sub}, sync::Arc, }; use uuid::Uuid; pub use data_types::{ - partition_metadata::{ - ColumnSummary, InfluxDbType, PartitionAddr, StatValues, Statistics, TableSummary, - }, sequence::Sequence, timestamp::{TimestampMinMax, TimestampRange, MAX_NANO_TIME, MIN_NANO_TIME}, }; @@ -1407,6 +1406,523 @@ impl<'a> std::fmt::Display for DatabaseName<'a> { } } +/// Column name, statistics which encode type information +#[derive(Debug, PartialEq, Clone)] +pub struct ColumnSummary { + /// Column name + pub name: String, + + /// Column's Influx data model type (if any) + pub influxdb_type: Option, + + /// Per column + pub stats: Statistics, +} + +impl ColumnSummary { + /// Returns the total number of rows (including nulls) in this column + pub fn total_count(&self) -> u64 { + self.stats.total_count() + } + + /// Updates statistics from other if the same type, otherwise a noop + pub fn update_from(&mut self, other: &Self) { + match (&mut self.stats, &other.stats) { + (Statistics::F64(s), Statistics::F64(o)) => { + s.update_from(o); + } + (Statistics::I64(s), Statistics::I64(o)) => { + s.update_from(o); + } + (Statistics::Bool(s), Statistics::Bool(o)) => { + s.update_from(o); + } + (Statistics::String(s), Statistics::String(o)) => { + s.update_from(o); + } + (Statistics::U64(s), Statistics::U64(o)) => { + s.update_from(o); + } + // do catch alls for the specific types, that way if a new type gets added, the compiler + // will complain. + (Statistics::F64(_), _) => unreachable!(), + (Statistics::I64(_), _) => unreachable!(), + (Statistics::U64(_), _) => unreachable!(), + (Statistics::Bool(_), _) => unreachable!(), + (Statistics::String(_), _) => unreachable!(), + } + } + + /// Updates these statistics so that that the total length of this + /// column is `len` rows, padding it with trailing NULLs if + /// necessary + pub fn update_to_total_count(&mut self, len: u64) { + let total_count = self.total_count(); + assert!( + total_count <= len, + "trying to shrink column stats from {} to {}", + total_count, + len + ); + let delta = len - total_count; + self.stats.update_for_nulls(delta); + } + + /// Return size in bytes of this Column metadata (not the underlying column) + pub fn size(&self) -> usize { + mem::size_of::() + self.name.len() + self.stats.size() + } +} + +// Replicate this enum here as it can't be derived from the existing statistics +#[derive(Debug, PartialEq, Clone, Copy)] +#[allow(missing_docs)] +pub enum InfluxDbType { + Tag, + Field, + Timestamp, +} + +/// Address of the chunk within the catalog +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct PartitionAddr { + /// Database name + pub db_name: Arc, + + /// What table does the chunk belong to? + pub table_name: Arc, + + /// What partition does the chunk belong to? + pub partition_key: Arc, +} + +/// Summary statistics for a column. +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct StatValues { + /// minimum (non-NaN, non-NULL) value, if any + pub min: Option, + + /// maximum (non-NaN, non-NULL) value, if any + pub max: Option, + + /// total number of values in this column, including null values + pub total_count: u64, + + /// number of null values in this column + pub null_count: Option, + + /// number of distinct values in this column if known + /// + /// This includes NULLs and NANs + pub distinct_count: Option, +} + +/// Represents the result of comparing the min/max ranges of two [`StatValues`] +#[derive(Debug, PartialEq, Copy, Clone)] +pub enum StatOverlap { + /// There is at least one value that exists in both ranges + NonZero, + + /// There are zero values that exists in both ranges + Zero, + + /// It is not known if there are any intersections (e.g. because + /// one of the bounds is not Known / is None) + Unknown, +} + +impl StatValues +where + T: PartialOrd, +{ + /// returns information about the overlap between two `StatValues` + pub fn overlaps(&self, other: &Self) -> StatOverlap { + match (&self.min, &self.max, &other.min, &other.max) { + (Some(self_min), Some(self_max), Some(other_min), Some(other_max)) => { + if self_min <= other_max && self_max >= other_min { + StatOverlap::NonZero + } else { + StatOverlap::Zero + } + } + // At least one of the values was None + _ => StatOverlap::Unknown, + } + } +} + +impl Default for StatValues { + fn default() -> Self { + Self { + min: None, + max: None, + total_count: 0, + null_count: None, + distinct_count: None, + } + } +} + +impl StatValues { + /// Create new statistics with no values + pub fn new_empty() -> Self { + Self { + min: None, + max: None, + total_count: 0, + null_count: Some(0), + distinct_count: None, + } + } + + /// Returns true if both the min and max values are None (aka not known) + pub fn is_none(&self) -> bool { + self.min.is_none() && self.max.is_none() + } + + /// Update the statistics values to account for `num_nulls` additional null values + pub fn update_for_nulls(&mut self, num_nulls: u64) { + self.total_count += num_nulls; + self.null_count = self.null_count.map(|x| x + num_nulls); + } + + /// updates the statistics keeping the min, max and incrementing count. + /// + /// The type plumbing exists to allow calling with &str on a StatValues + pub fn update(&mut self, other: &U) + where + T: Borrow, + U: ToOwned + PartialOrd + IsNan, + { + self.total_count += 1; + self.distinct_count = None; + + if !other.is_nan() { + match &self.min { + None => self.min = Some(other.to_owned()), + Some(s) => { + if s.borrow() > other { + self.min = Some(other.to_owned()); + } + } + } + + match &self.max { + None => { + self.max = Some(other.to_owned()); + } + Some(s) => { + if other > s.borrow() { + self.max = Some(other.to_owned()); + } + } + } + } + } +} + +impl StatValues +where + T: Clone + PartialOrd, +{ + /// Updates statistics from other + pub fn update_from(&mut self, other: &Self) { + self.total_count += other.total_count; + self.null_count = self.null_count.zip(other.null_count).map(|(a, b)| a + b); + + // No way to accurately aggregate counts + self.distinct_count = None; + + match (&self.min, &other.min) { + (None, None) | (Some(_), None) => {} + (None, Some(o)) => self.min = Some(o.clone()), + (Some(s), Some(o)) => { + if s > o { + self.min = Some(o.clone()); + } + } + } + + match (&self.max, &other.max) { + (None, None) | (Some(_), None) => {} + (None, Some(o)) => self.max = Some(o.clone()), + (Some(s), Some(o)) => { + if o > s { + self.max = Some(o.clone()); + } + } + }; + } +} + +impl StatValues +where + T: IsNan + PartialOrd, +{ + /// Create new statistics with the specified count and null count + pub fn new(min: Option, max: Option, total_count: u64, null_count: Option) -> Self { + let distinct_count = None; + Self::new_with_distinct(min, max, total_count, null_count, distinct_count) + } + + /// Create statistics for a column that only has nulls up to now + pub fn new_all_null(total_count: u64, distinct_count: Option) -> Self { + let min = None; + let max = None; + let null_count = Some(total_count); + + if let Some(count) = distinct_count { + assert!(count > 0); + } + Self::new_with_distinct( + min, + max, + total_count, + null_count, + distinct_count.map(|c| NonZeroU64::new(c).unwrap()), + ) + } + + /// Create statistics for a column with zero nulls and unknown distinct count + pub fn new_non_null(min: Option, max: Option, total_count: u64) -> Self { + let null_count = Some(0); + let distinct_count = None; + Self::new_with_distinct(min, max, total_count, null_count, distinct_count) + } + + /// Create new statistics with the specified count and null count and distinct values + pub fn new_with_distinct( + min: Option, + max: Option, + total_count: u64, + null_count: Option, + distinct_count: Option, + ) -> Self { + if let Some(min) = &min { + assert!(!min.is_nan()); + } + if let Some(max) = &max { + assert!(!max.is_nan()); + } + if let (Some(min), Some(max)) = (&min, &max) { + assert!(min <= max); + } + + Self { + min, + max, + total_count, + null_count, + distinct_count, + } + } +} + +/// Whether a type is NaN or not. +pub trait IsNan { + /// Test for NaNess. + fn is_nan(&self) -> bool; +} + +impl IsNan for &T { + fn is_nan(&self) -> bool { + (*self).is_nan() + } +} + +macro_rules! impl_is_nan_false { + ($t:ty) => { + impl IsNan for $t { + fn is_nan(&self) -> bool { + false + } + } + }; +} + +impl_is_nan_false!(bool); +impl_is_nan_false!(str); +impl_is_nan_false!(String); +impl_is_nan_false!(i8); +impl_is_nan_false!(i16); +impl_is_nan_false!(i32); +impl_is_nan_false!(i64); +impl_is_nan_false!(u8); +impl_is_nan_false!(u16); +impl_is_nan_false!(u32); +impl_is_nan_false!(u64); + +impl IsNan for f64 { + fn is_nan(&self) -> bool { + Self::is_nan(*self) + } +} + +/// Statistics and type information for a column. +#[derive(Debug, PartialEq, Clone)] +#[allow(missing_docs)] +pub enum Statistics { + I64(StatValues), + U64(StatValues), + F64(StatValues), + Bool(StatValues), + String(StatValues), +} + +impl Statistics { + /// Returns the total number of rows in this column + pub fn total_count(&self) -> u64 { + match self { + Self::I64(s) => s.total_count, + Self::U64(s) => s.total_count, + Self::F64(s) => s.total_count, + Self::Bool(s) => s.total_count, + Self::String(s) => s.total_count, + } + } + + /// Returns true if both the min and max values are None (aka not known) + pub fn is_none(&self) -> bool { + match self { + Self::I64(v) => v.is_none(), + Self::U64(v) => v.is_none(), + Self::F64(v) => v.is_none(), + Self::Bool(v) => v.is_none(), + Self::String(v) => v.is_none(), + } + } + + /// Returns the number of null rows in this column + pub fn null_count(&self) -> Option { + match self { + Self::I64(s) => s.null_count, + Self::U64(s) => s.null_count, + Self::F64(s) => s.null_count, + Self::Bool(s) => s.null_count, + Self::String(s) => s.null_count, + } + } + + /// Returns the distinct count if known + pub fn distinct_count(&self) -> Option { + match self { + Self::I64(s) => s.distinct_count, + Self::U64(s) => s.distinct_count, + Self::F64(s) => s.distinct_count, + Self::Bool(s) => s.distinct_count, + Self::String(s) => s.distinct_count, + } + } + + /// Update the statistics values to account for `num_nulls` additional null values + pub fn update_for_nulls(&mut self, num_nulls: u64) { + match self { + Self::I64(v) => v.update_for_nulls(num_nulls), + Self::U64(v) => v.update_for_nulls(num_nulls), + Self::F64(v) => v.update_for_nulls(num_nulls), + Self::Bool(v) => v.update_for_nulls(num_nulls), + Self::String(v) => v.update_for_nulls(num_nulls), + } + } + + /// Return the size in bytes of this stats instance + pub fn size(&self) -> usize { + match self { + Self::String(v) => std::mem::size_of::() + v.string_size(), + _ => std::mem::size_of::(), + } + } + + /// Return a human interpretable description of this type + pub fn type_name(&self) -> &'static str { + match self { + Self::I64(_) => "I64", + Self::U64(_) => "U64", + Self::F64(_) => "F64", + Self::Bool(_) => "Bool", + Self::String(_) => "String", + } + } +} + +impl StatValues { + /// Returns the bytes associated by storing min/max string values + pub fn string_size(&self) -> usize { + self.min.as_ref().map(|x| x.len()).unwrap_or(0) + + self.max.as_ref().map(|x| x.len()).unwrap_or(0) + } +} + +/// Metadata and statistics information for a table. This can be +/// either for the portion of a Table stored within a single chunk or +/// aggregated across chunks. +#[derive(Debug, PartialEq, Clone, Default)] +pub struct TableSummary { + /// Per column statistics + pub columns: Vec, +} + +impl TableSummary { + /// Get the column summary by name. + pub fn column(&self, name: &str) -> Option<&ColumnSummary> { + self.columns.iter().find(|c| c.name == name) + } + + /// Returns the total number of rows in the columns of this summary + pub fn total_count(&self) -> u64 { + // Assumes that all tables have the same number of rows, so + // pick the first one + let count = self.columns.get(0).map(|c| c.total_count()).unwrap_or(0); + + // Validate that the counts are consistent across columns + for c in &self.columns { + // Restore to assert when https://github.com/influxdata/influxdb_iox/issues/2124 is fixed + if c.total_count() != count { + warn!(column_name=%c.name, + column_count=c.total_count(), previous_count=count, + "Mismatch in statistics count, see #2124"); + } + } + count + } + + /// Updates the table summary with combined stats from the other. Counts are + /// treated as non-overlapping so they're just added together. If the + /// type of a column differs between the two tables, no update is done + /// on that column. Columns that only exist in the other are cloned into + /// this table summary. + pub fn update_from(&mut self, other: &Self) { + let new_total_count = self.total_count() + other.total_count(); + + // update all existing columns + for col in &mut self.columns { + if let Some(other_col) = other.column(&col.name) { + col.update_from(other_col); + } else { + col.update_to_total_count(new_total_count); + } + } + + // Add any columns that were new + for col in &other.columns { + if self.column(&col.name).is_none() { + let mut new_col = col.clone(); + // ensure the count is consistent + new_col.update_to_total_count(new_total_count); + self.columns.push(new_col); + } + } + } + + /// Total size of all ColumnSummaries that belong to this table which include + /// column names and their stats + pub fn size(&self) -> usize { + let size: usize = self.columns.iter().map(|c| c.size()).sum(); + size + mem::size_of::() // Add size of this struct that points to + // table and ColumnSummary + } +} + #[cfg(test)] mod tests { use super::*; @@ -1735,4 +2251,619 @@ mod tests { let db = DatabaseName::new("my-example-db_with_underscores and spaces").unwrap(); assert_eq!(&*db, "my-example-db_with_underscores and spaces"); } + + #[test] + fn statistics_new_non_null() { + let actual = StatValues::new_non_null(Some(-1i64), Some(1i64), 3); + let expected = StatValues { + min: Some(-1i64), + max: Some(1i64), + total_count: 3, + null_count: Some(0), + distinct_count: None, + }; + assert_eq!(actual, expected); + } + + #[test] + fn statistics_new_all_null() { + // i64 values do not have a distinct count + let actual = StatValues::::new_all_null(3, None); + let expected = StatValues { + min: None, + max: None, + total_count: 3, + null_count: Some(3), + distinct_count: None, + }; + assert_eq!(actual, expected); + + // string columns can have a distinct count + let actual = StatValues::::new_all_null(3, Some(1_u64)); + let expected = StatValues { + min: None, + max: None, + total_count: 3, + null_count: Some(3), + distinct_count: Some(NonZeroU64::try_from(1_u64).unwrap()), + }; + assert_eq!(actual, expected); + } + + impl StatValues + where + T: IsNan + PartialOrd + Clone, + { + fn new_with_value(starting_value: T) -> Self { + let starting_value = if starting_value.is_nan() { + None + } else { + Some(starting_value) + }; + + let min = starting_value.clone(); + let max = starting_value; + let total_count = 1; + let null_count = Some(0); + let distinct_count = None; + Self::new_with_distinct(min, max, total_count, null_count, distinct_count) + } + } + + impl Statistics { + /// Return the minimum value, if any, formatted as a string + fn min_as_str(&self) -> Option> { + match self { + Self::I64(v) => v.min.map(|x| Cow::Owned(x.to_string())), + Self::U64(v) => v.min.map(|x| Cow::Owned(x.to_string())), + Self::F64(v) => v.min.map(|x| Cow::Owned(x.to_string())), + Self::Bool(v) => v.min.map(|x| Cow::Owned(x.to_string())), + Self::String(v) => v.min.as_deref().map(Cow::Borrowed), + } + } + + /// Return the maximum value, if any, formatted as a string + fn max_as_str(&self) -> Option> { + match self { + Self::I64(v) => v.max.map(|x| Cow::Owned(x.to_string())), + Self::U64(v) => v.max.map(|x| Cow::Owned(x.to_string())), + Self::F64(v) => v.max.map(|x| Cow::Owned(x.to_string())), + Self::Bool(v) => v.max.map(|x| Cow::Owned(x.to_string())), + Self::String(v) => v.max.as_deref().map(Cow::Borrowed), + } + } + } + + #[test] + fn statistics_update() { + let mut stat = StatValues::new_with_value(23); + assert_eq!(stat.min, Some(23)); + assert_eq!(stat.max, Some(23)); + assert_eq!(stat.total_count, 1); + + stat.update(&55); + assert_eq!(stat.min, Some(23)); + assert_eq!(stat.max, Some(55)); + assert_eq!(stat.total_count, 2); + + stat.update(&6); + assert_eq!(stat.min, Some(6)); + assert_eq!(stat.max, Some(55)); + assert_eq!(stat.total_count, 3); + + stat.update(&30); + assert_eq!(stat.min, Some(6)); + assert_eq!(stat.max, Some(55)); + assert_eq!(stat.total_count, 4); + } + + #[test] + fn statistics_default() { + let mut stat = StatValues::default(); + assert_eq!(stat.min, None); + assert_eq!(stat.max, None); + assert_eq!(stat.total_count, 0); + + stat.update(&55); + assert_eq!(stat.min, Some(55)); + assert_eq!(stat.max, Some(55)); + assert_eq!(stat.total_count, 1); + + let mut stat = StatValues::::default(); + assert_eq!(stat.min, None); + assert_eq!(stat.max, None); + assert_eq!(stat.total_count, 0); + + stat.update("cupcakes"); + assert_eq!(stat.min, Some("cupcakes".to_string())); + assert_eq!(stat.max, Some("cupcakes".to_string())); + assert_eq!(stat.total_count, 1); + + stat.update("woo"); + assert_eq!(stat.min, Some("cupcakes".to_string())); + assert_eq!(stat.max, Some("woo".to_string())); + assert_eq!(stat.total_count, 2); + } + + #[test] + fn statistics_is_none() { + let mut stat = StatValues::default(); + assert!(stat.is_none()); + stat.min = Some(0); + assert!(!stat.is_none()); + stat.max = Some(1); + assert!(!stat.is_none()); + } + + #[test] + fn statistics_overlaps() { + let stat1 = StatValues { + min: Some(10), + max: Some(20), + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat1), StatOverlap::NonZero); + + // [--stat1--] + // [--stat2--] + let stat2 = StatValues { + min: Some(5), + max: Some(15), + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat2), StatOverlap::NonZero); + assert_eq!(stat2.overlaps(&stat1), StatOverlap::NonZero); + + // [--stat1--] + // [--stat3--] + let stat3 = StatValues { + min: Some(15), + max: Some(25), + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat3), StatOverlap::NonZero); + assert_eq!(stat3.overlaps(&stat1), StatOverlap::NonZero); + + // [--stat1--] + // [--stat4--] + let stat4 = StatValues { + min: Some(25), + max: Some(35), + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat4), StatOverlap::Zero); + assert_eq!(stat4.overlaps(&stat1), StatOverlap::Zero); + + // [--stat1--] + // [--stat5--] + let stat5 = StatValues { + min: Some(0), + max: Some(5), + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat5), StatOverlap::Zero); + assert_eq!(stat5.overlaps(&stat1), StatOverlap::Zero); + } + + #[test] + fn statistics_overlaps_none() { + let stat1 = StatValues { + min: Some(10), + max: Some(20), + ..Default::default() + }; + + let stat2 = StatValues { + min: None, + max: Some(20), + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat2), StatOverlap::Unknown); + assert_eq!(stat2.overlaps(&stat1), StatOverlap::Unknown); + + let stat3 = StatValues { + min: Some(10), + max: None, + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat3), StatOverlap::Unknown); + assert_eq!(stat3.overlaps(&stat1), StatOverlap::Unknown); + + let stat4 = StatValues { + min: None, + max: None, + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat4), StatOverlap::Unknown); + assert_eq!(stat4.overlaps(&stat1), StatOverlap::Unknown); + } + + #[test] + fn statistics_overlaps_mixed_none() { + let stat1 = StatValues { + min: Some(10), + max: None, + ..Default::default() + }; + + let stat2 = StatValues { + min: None, + max: Some(5), + ..Default::default() + }; + assert_eq!(stat1.overlaps(&stat2), StatOverlap::Unknown); + assert_eq!(stat2.overlaps(&stat1), StatOverlap::Unknown); + } + + #[test] + fn update_string() { + let mut stat = StatValues::new_with_value("bbb".to_string()); + assert_eq!(stat.min, Some("bbb".to_string())); + assert_eq!(stat.max, Some("bbb".to_string())); + assert_eq!(stat.total_count, 1); + + stat.update("aaa"); + assert_eq!(stat.min, Some("aaa".to_string())); + assert_eq!(stat.max, Some("bbb".to_string())); + assert_eq!(stat.total_count, 2); + + stat.update("z"); + assert_eq!(stat.min, Some("aaa".to_string())); + assert_eq!(stat.max, Some("z".to_string())); + assert_eq!(stat.total_count, 3); + + stat.update("p"); + assert_eq!(stat.min, Some("aaa".to_string())); + assert_eq!(stat.max, Some("z".to_string())); + assert_eq!(stat.total_count, 4); + } + + #[test] + fn stats_is_none() { + let stat = Statistics::I64(StatValues::new_non_null(Some(-1), Some(100), 1)); + assert!(!stat.is_none()); + + let stat = Statistics::I64(StatValues::new_non_null(None, Some(100), 1)); + assert!(!stat.is_none()); + + let stat = Statistics::I64(StatValues::new_non_null(None, None, 0)); + assert!(stat.is_none()); + } + + #[test] + fn stats_as_str_i64() { + let stat = Statistics::I64(StatValues::new_non_null(Some(-1), Some(100), 1)); + assert_eq!(stat.min_as_str(), Some("-1".into())); + assert_eq!(stat.max_as_str(), Some("100".into())); + + let stat = Statistics::I64(StatValues::new_non_null(None, None, 1)); + assert_eq!(stat.min_as_str(), None); + assert_eq!(stat.max_as_str(), None); + } + + #[test] + fn stats_as_str_u64() { + let stat = Statistics::U64(StatValues::new_non_null(Some(1), Some(100), 1)); + assert_eq!(stat.min_as_str(), Some("1".into())); + assert_eq!(stat.max_as_str(), Some("100".into())); + + let stat = Statistics::U64(StatValues::new_non_null(None, None, 1)); + assert_eq!(stat.min_as_str(), None); + assert_eq!(stat.max_as_str(), None); + } + + #[test] + fn stats_as_str_f64() { + let stat = Statistics::F64(StatValues::new_non_null(Some(99.0), Some(101.0), 1)); + assert_eq!(stat.min_as_str(), Some("99".into())); + assert_eq!(stat.max_as_str(), Some("101".into())); + + let stat = Statistics::F64(StatValues::new_non_null(None, None, 1)); + assert_eq!(stat.min_as_str(), None); + assert_eq!(stat.max_as_str(), None); + } + + #[test] + fn stats_as_str_bool() { + let stat = Statistics::Bool(StatValues::new_non_null(Some(false), Some(true), 1)); + assert_eq!(stat.min_as_str(), Some("false".into())); + assert_eq!(stat.max_as_str(), Some("true".into())); + + let stat = Statistics::Bool(StatValues::new_non_null(None, None, 1)); + assert_eq!(stat.min_as_str(), None); + assert_eq!(stat.max_as_str(), None); + } + + #[test] + fn stats_as_str_str() { + let stat = Statistics::String(StatValues::new_non_null( + Some("a".to_string()), + Some("zz".to_string()), + 1, + )); + assert_eq!(stat.min_as_str(), Some("a".into())); + assert_eq!(stat.max_as_str(), Some("zz".into())); + + let stat = Statistics::String(StatValues::new_non_null(None, None, 1)); + assert_eq!(stat.min_as_str(), None); + assert_eq!(stat.max_as_str(), None); + } + + #[test] + fn table_update_from() { + let mut string_stats = StatValues::new_with_value("foo".to_string()); + string_stats.update("bar"); + let string_col = ColumnSummary { + name: "string".to_string(), + influxdb_type: None, + stats: Statistics::String(string_stats), + }; + + let mut int_stats = StatValues::new_with_value(1); + int_stats.update(&5); + let int_col = ColumnSummary { + name: "int".to_string(), + influxdb_type: None, + stats: Statistics::I64(int_stats), + }; + + let mut float_stats = StatValues::new_with_value(9.1); + float_stats.update(&1.3); + let float_col = ColumnSummary { + name: "float".to_string(), + influxdb_type: None, + stats: Statistics::F64(float_stats), + }; + + let mut table_a = TableSummary { + columns: vec![string_col, int_col, float_col], + }; + + let mut string_stats = StatValues::new_with_value("aaa".to_string()); + string_stats.update("zzz"); + let string_col = ColumnSummary { + name: "string".to_string(), + influxdb_type: None, + stats: Statistics::String(string_stats), + }; + + let mut int_stats = StatValues::new_with_value(3); + int_stats.update(&9); + let int_col = ColumnSummary { + name: "int".to_string(), + influxdb_type: None, + stats: Statistics::I64(int_stats), + }; + + let mut table_b = TableSummary { + columns: vec![int_col, string_col], + }; + + // keep this to test joining the other way + let table_c = table_a.clone(); + + table_a.update_from(&table_b); + let col = table_a.column("string").unwrap(); + assert_eq!( + col.stats, + Statistics::String(StatValues::new_non_null( + Some("aaa".to_string()), + Some("zzz".to_string()), + 4, + )) + ); + + let col = table_a.column("int").unwrap(); + assert_eq!( + col.stats, + Statistics::I64(StatValues::new_non_null(Some(1), Some(9), 4)) + ); + + let col = table_a.column("float").unwrap(); + assert_eq!( + col.stats, + Statistics::F64(StatValues::new(Some(1.3), Some(9.1), 4, Some(2))) + ); + + table_b.update_from(&table_c); + let col = table_b.column("string").unwrap(); + assert_eq!( + col.stats, + Statistics::String(StatValues::new_non_null( + Some("aaa".to_string()), + Some("zzz".to_string()), + 4, + )) + ); + + let col = table_b.column("int").unwrap(); + assert_eq!( + col.stats, + Statistics::I64(StatValues::new_non_null(Some(1), Some(9), 4)) + ); + + let col = table_b.column("float").unwrap(); + assert_eq!( + col.stats, + Statistics::F64(StatValues::new(Some(1.3), Some(9.1), 4, Some(2))) + ); + } + + #[test] + fn table_update_from_new_column() { + let string_stats = StatValues::new_with_value("bar".to_string()); + let string_col = ColumnSummary { + name: "string".to_string(), + influxdb_type: None, + stats: Statistics::String(string_stats), + }; + + let int_stats = StatValues::new_with_value(5); + let int_col = ColumnSummary { + name: "int".to_string(), + influxdb_type: None, + stats: Statistics::I64(int_stats), + }; + + // table summary that does not have the "string" col + let table1 = TableSummary { + columns: vec![int_col.clone()], + }; + + // table summary that has both columns + let table2 = TableSummary { + columns: vec![int_col, string_col], + }; + + // Statistics should be the same regardless of the order we update the stats + + let expected_string_stats = Statistics::String(StatValues::new( + Some("bar".to_string()), + Some("bar".to_string()), + 2, // total count is 2 even though did not appear in the update + Some(1), // 1 null + )); + + let expected_int_stats = Statistics::I64(StatValues::new( + Some(5), + Some(5), + 2, + Some(0), // no nulls + )); + + // update table 1 with table 2 + let mut table = table1.clone(); + table.update_from(&table2); + + assert_eq!( + &table.column("string").unwrap().stats, + &expected_string_stats + ); + + assert_eq!(&table.column("int").unwrap().stats, &expected_int_stats); + + // update table 2 with table 1 + let mut table = table2; + table.update_from(&table1); + + assert_eq!( + &table.column("string").unwrap().stats, + &expected_string_stats + ); + + assert_eq!(&table.column("int").unwrap().stats, &expected_int_stats); + } + + #[test] + fn column_update_from_boolean() { + let bool_false = ColumnSummary { + name: "b".to_string(), + influxdb_type: None, + stats: Statistics::Bool(StatValues::new(Some(false), Some(false), 1, Some(1))), + }; + let bool_true = ColumnSummary { + name: "b".to_string(), + influxdb_type: None, + stats: Statistics::Bool(StatValues::new(Some(true), Some(true), 1, Some(2))), + }; + + let expected_stats = Statistics::Bool(StatValues::new(Some(false), Some(true), 2, Some(3))); + + let mut b = bool_false.clone(); + b.update_from(&bool_true); + assert_eq!(b.stats, expected_stats); + + let mut b = bool_true; + b.update_from(&bool_false); + assert_eq!(b.stats, expected_stats); + } + + #[test] + fn column_update_from_u64() { + let mut min = ColumnSummary { + name: "foo".to_string(), + influxdb_type: None, + stats: Statistics::U64(StatValues::new(Some(5), Some(23), 1, Some(1))), + }; + + let max = ColumnSummary { + name: "foo".to_string(), + influxdb_type: None, + stats: Statistics::U64(StatValues::new(Some(6), Some(506), 43, Some(2))), + }; + + min.update_from(&max); + + let expected = Statistics::U64(StatValues::new(Some(5), Some(506), 44, Some(3))); + assert_eq!(min.stats, expected); + } + + #[test] + fn nans() { + let mut stat = StatValues::default(); + assert_eq!(stat.min, None); + assert_eq!(stat.max, None); + assert_eq!(stat.total_count, 0); + + stat.update(&f64::NAN); + assert_eq!(stat.min, None); + assert_eq!(stat.max, None); + assert_eq!(stat.total_count, 1); + + stat.update(&1.0); + assert_eq!(stat.min, Some(1.0)); + assert_eq!(stat.max, Some(1.0)); + assert_eq!(stat.total_count, 2); + + stat.update(&2.0); + assert_eq!(stat.min, Some(1.0)); + assert_eq!(stat.max, Some(2.0)); + assert_eq!(stat.total_count, 3); + + stat.update(&f64::INFINITY); + assert_eq!(stat.min, Some(1.0)); + assert_eq!(stat.max, Some(f64::INFINITY)); + assert_eq!(stat.total_count, 4); + + stat.update(&-1.0); + assert_eq!(stat.min, Some(-1.0)); + assert_eq!(stat.max, Some(f64::INFINITY)); + assert_eq!(stat.total_count, 5); + + // =========== + + let mut stat = StatValues::new_with_value(2.0); + stat.update(&f64::INFINITY); + assert_eq!(stat.min, Some(2.0)); + assert_eq!(stat.max, Some(f64::INFINITY)); + assert_eq!(stat.total_count, 2); + + stat.update(&f64::NAN); + assert_eq!(stat.min, Some(2.0)); + assert_eq!(stat.max, Some(f64::INFINITY)); + assert_eq!(stat.total_count, 3); + + // =========== + + let mut stat2 = StatValues::new_with_value(1.0); + stat2.update_from(&stat); + assert_eq!(stat2.min, Some(1.0)); + assert_eq!(stat.max, Some(f64::INFINITY)); + assert_eq!(stat2.total_count, 4); + + // =========== + + let stat2 = StatValues::new_with_value(1.0); + stat.update_from(&stat2); + assert_eq!(stat.min, Some(1.0)); + assert_eq!(stat.max, Some(f64::INFINITY)); + assert_eq!(stat.total_count, 4); + + // =========== + + let stat = StatValues::new_with_value(f64::NAN); + assert_eq!(stat.min, None); + assert_eq!(stat.max, None); + assert_eq!(stat.total_count, 1); + } } diff --git a/dml/src/lib.rs b/dml/src/lib.rs index 2f33b45887..d818252ad4 100644 --- a/dml/src/lib.rs +++ b/dml/src/lib.rs @@ -12,11 +12,10 @@ )] use data_types::{ - partition_metadata::{StatValues, Statistics}, router::{ShardConfig, ShardId}, sequence::Sequence, }; -use data_types2::{DeletePredicate, NonEmptyString}; +use data_types2::{DeletePredicate, NonEmptyString, StatValues, Statistics}; use hashbrown::HashMap; use iox_time::Time; use mutable_batch::MutableBatch; diff --git a/mutable_batch/src/column.rs b/mutable_batch/src/column.rs index 0f18038b99..e2631dcce2 100644 --- a/mutable_batch/src/column.rs +++ b/mutable_batch/src/column.rs @@ -1,23 +1,18 @@ //! A [`Column`] stores the rows for a given column name -use std::fmt::Formatter; -use std::mem; -use std::sync::Arc; - -use arrow::error::ArrowError; use arrow::{ array::{ ArrayDataBuilder, ArrayRef, BooleanArray, Float64Array, Int64Array, TimestampNanosecondArray, UInt64Array, }, datatypes::DataType, + error::ArrowError, }; -use snafu::{ResultExt, Snafu}; - -use arrow_util::bitset::BitSet; -use arrow_util::string::PackedStringArray; -use data_types::partition_metadata::{StatValues, Statistics}; +use arrow_util::{bitset::BitSet, string::PackedStringArray}; +use data_types2::{StatValues, Statistics}; use schema::{InfluxColumnType, InfluxFieldType, TIME_DATA_TYPE}; +use snafu::{ResultExt, Snafu}; +use std::{fmt::Formatter, mem, sync::Arc}; /// A "dictionary ID" (DID) is a compact numeric representation of an interned /// string in the dictionary. The same string always maps the same DID. diff --git a/mutable_batch/src/writer.rs b/mutable_batch/src/writer.rs index 90824cc898..c322594f2e 100644 --- a/mutable_batch/src/writer.rs +++ b/mutable_batch/src/writer.rs @@ -3,7 +3,7 @@ use crate::column::{Column, ColumnData, INVALID_DID}; use crate::MutableBatch; use arrow_util::bitset::{iter_set_positions, iter_set_positions_with_offset, BitSet}; -use data_types::partition_metadata::{IsNan, StatValues, Statistics}; +use data_types2::{IsNan, StatValues, Statistics}; use schema::{InfluxColumnType, InfluxFieldType}; use snafu::Snafu; use std::num::NonZeroU64; diff --git a/mutable_batch/tests/extend.rs b/mutable_batch/tests/extend.rs index 00b4fbffbf..42325fcdff 100644 --- a/mutable_batch/tests/extend.rs +++ b/mutable_batch/tests/extend.rs @@ -1,10 +1,8 @@ use arrow_util::assert_batches_eq; -use data_types::partition_metadata::{StatValues, Statistics}; -use mutable_batch::writer::Writer; -use mutable_batch::MutableBatch; +use data_types2::{StatValues, Statistics}; +use mutable_batch::{writer::Writer, MutableBatch}; use schema::selection::Selection; -use std::collections::BTreeMap; -use std::num::NonZeroU64; +use std::{collections::BTreeMap, num::NonZeroU64}; #[test] fn test_extend() { diff --git a/mutable_batch/tests/extend_range.rs b/mutable_batch/tests/extend_range.rs index d37bd26a1b..68437141d5 100644 --- a/mutable_batch/tests/extend_range.rs +++ b/mutable_batch/tests/extend_range.rs @@ -1,10 +1,8 @@ use arrow_util::assert_batches_eq; -use data_types::partition_metadata::{StatValues, Statistics}; -use mutable_batch::writer::Writer; -use mutable_batch::MutableBatch; +use data_types2::{StatValues, Statistics}; +use mutable_batch::{writer::Writer, MutableBatch}; use schema::selection::Selection; -use std::collections::BTreeMap; -use std::num::NonZeroU64; +use std::{collections::BTreeMap, num::NonZeroU64}; #[test] fn test_extend_range() { diff --git a/mutable_batch/tests/writer.rs b/mutable_batch/tests/writer.rs index 670a7638e0..20b97a702b 100644 --- a/mutable_batch/tests/writer.rs +++ b/mutable_batch/tests/writer.rs @@ -1,6 +1,6 @@ use arrow_util::assert_batches_eq; -use data_types::partition_metadata::{StatValues, Statistics}; use data_types::write_summary::TimestampSummary; +use data_types2::{StatValues, Statistics}; use mutable_batch::writer::Writer; use mutable_batch::MutableBatch; use schema::selection::Selection; diff --git a/mutable_batch/tests/writer_fuzz.rs b/mutable_batch/tests/writer_fuzz.rs index e3bf11c6c9..6475faf9ad 100644 --- a/mutable_batch/tests/writer_fuzz.rs +++ b/mutable_batch/tests/writer_fuzz.rs @@ -1,3 +1,16 @@ +use arrow::{ + array::{ + ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray, TimestampNanosecondArray, + UInt64Array, + }, + record_batch::RecordBatch, +}; +use arrow_util::bitset::BitSet; +use data_types2::{IsNan, PartitionTemplate, StatValues, Statistics, TemplatePart}; +use hashbrown::HashSet; +use mutable_batch::{writer::Writer, MutableBatch, PartitionWrite, WritePayload}; +use rand::prelude::*; +use schema::selection::Selection; /// A fuzz test of the [`mutable_batch::Writer`] interface: /// /// - column writes - `write_i64`, `write_tag`, etc... @@ -5,25 +18,7 @@ /// - batch writes with ranges - `write_batch_ranges` /// /// Verifies that the rows and statistics are as expected after a number of interleaved writes -use std::collections::BTreeMap; -use std::num::NonZeroU64; -use std::ops::Range; -use std::sync::Arc; - -use arrow::array::{ - ArrayRef, BooleanArray, Float64Array, Int64Array, StringArray, TimestampNanosecondArray, - UInt64Array, -}; -use arrow::record_batch::RecordBatch; -use hashbrown::HashSet; -use rand::prelude::*; - -use arrow_util::bitset::BitSet; -use data_types::partition_metadata::{IsNan, StatValues, Statistics}; -use data_types2::{PartitionTemplate, TemplatePart}; -use mutable_batch::writer::Writer; -use mutable_batch::{MutableBatch, PartitionWrite, WritePayload}; -use schema::selection::Selection; +use std::{collections::BTreeMap, num::NonZeroU64, ops::Range, sync::Arc}; fn make_rng() -> StdRng { let seed = rand::rngs::OsRng::default().next_u64(); diff --git a/parquet_file/src/chunk.rs b/parquet_file/src/chunk.rs index 1dee0f3d0b..3a9f2c749d 100644 --- a/parquet_file/src/chunk.rs +++ b/parquet_file/src/chunk.rs @@ -3,17 +3,14 @@ use crate::{ storage::Storage, ParquetFilePath, }; -use data_types::{ - partition_metadata::{Statistics, TableSummary}, - timestamp::{TimestampMinMax, TimestampRange}, +use data_types2::{ + ParquetFile, ParquetFileWithMetadata, Statistics, TableSummary, TimestampMinMax, TimestampRange, }; -use data_types2::{ParquetFile, ParquetFileWithMetadata}; use datafusion::physical_plan::SendableRecordBatchStream; use object_store::DynObjectStore; use observability_deps::tracing::*; use predicate::Predicate; -use schema::selection::Selection; -use schema::{Schema, TIME_COLUMN_NAME}; +use schema::{selection::Selection, Schema, TIME_COLUMN_NAME}; use snafu::{ResultExt, Snafu}; use std::{collections::BTreeSet, mem, sync::Arc}; diff --git a/parquet_file/src/metadata.rs b/parquet_file/src/metadata.rs index 92a09a868a..b359b3ab01 100644 --- a/parquet_file/src/metadata.rs +++ b/parquet_file/src/metadata.rs @@ -86,9 +86,9 @@ //! [Apache Parquet]: https://parquet.apache.org/ //! [Apache Thrift]: https://thrift.apache.org/ //! [Thrift Compact Protocol]: https://github.com/apache/thrift/blob/master/doc/specs/thrift-compact-protocol.md -use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}; use data_types2::{ - NamespaceId, ParquetFileParams, PartitionId, SequenceNumber, SequencerId, TableId, Timestamp, + ColumnSummary, InfluxDbType, NamespaceId, ParquetFileParams, PartitionId, SequenceNumber, + SequencerId, StatValues, Statistics, TableId, Timestamp, }; use generated_types::influxdata::iox::ingester::v1 as proto; use iox_time::Time; @@ -106,8 +106,10 @@ use parquet::{ schema::types::SchemaDescriptor as ParquetSchemaDescriptor, }; use prost::Message; -use schema::sort::{SortKey, SortKeyBuilder}; -use schema::{InfluxColumnType, InfluxFieldType, Schema}; +use schema::{ + sort::{SortKey, SortKeyBuilder}, + InfluxColumnType, InfluxFieldType, Schema, +}; use snafu::{ensure, OptionExt, ResultExt, Snafu}; use std::{convert::TryInto, sync::Arc}; use thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol, TOutputProtocol}; diff --git a/querier/src/chunk/query_access.rs b/querier/src/chunk/query_access.rs index 6479344a1f..8d6d17da0a 100644 --- a/querier/src/chunk/query_access.rs +++ b/querier/src/chunk/query_access.rs @@ -1,7 +1,8 @@ use std::sync::Arc; -use data_types::timestamp::TimestampMinMax; -use data_types2::{ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary}; +use data_types2::{ + ChunkAddr, ChunkId, ChunkOrder, DeletePredicate, PartitionId, TableSummary, TimestampMinMax, +}; use observability_deps::tracing::debug; use predicate::PredicateMatch; use query::{QueryChunk, QueryChunkError, QueryChunkMeta}; diff --git a/query/src/provider/overlap.rs b/query/src/provider/overlap.rs index fba36af4d0..ae08d22c7c 100644 --- a/query/src/provider/overlap.rs +++ b/query/src/provider/overlap.rs @@ -1,21 +1,18 @@ -//! Contains the algorithm to determine which chunks may contain -//! "duplicate" primary keys (that is where data with the same -//! combination of "tag" columns and timestamp in the InfluxDB -//! DataModel have been written in via multiple distinct line protocol -//! writes (and thus are stored in separate rows) +//! Contains the algorithm to determine which chunks may contain "duplicate" primary keys (that is +//! where data with the same combination of "tag" columns and timestamp in the InfluxDB DataModel +//! have been written in via multiple distinct line protocol writes (and thus are stored in +//! separate rows) -use data_types::{ - partition_metadata::{ColumnSummary, StatOverlap, Statistics}, - timestamp::TimestampMinMax, +use crate::{QueryChunk, QueryChunkMeta}; +use data_types2::{ + ColumnSummary, DeletePredicate, ParquetFileWithMetadata, PartitionId, StatOverlap, Statistics, + TableSummary, TimestampMinMax, }; -use data_types2::{DeletePredicate, ParquetFileWithMetadata, PartitionId, TableSummary}; use observability_deps::tracing::debug; use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME}; use snafu::Snafu; use std::{cmp::Ordering, sync::Arc}; -use crate::{QueryChunk, QueryChunkMeta}; - #[derive(Debug, Snafu)] pub enum Error { #[snafu(display( diff --git a/query/src/provider/physical.rs b/query/src/provider/physical.rs index 165c148ace..98f661393f 100644 --- a/query/src/provider/physical.rs +++ b/query/src/provider/physical.rs @@ -1,9 +1,9 @@ //! Implementation of a DataFusion PhysicalPlan node across partition chunks -use std::{fmt, sync::Arc}; - +use super::adapter::SchemaAdapterStream; +use crate::{exec::IOxSessionContext, QueryChunk}; use arrow::datatypes::SchemaRef; -use data_types::partition_metadata::TableSummary; +use data_types2::TableSummary; use datafusion::{ error::DataFusionError, execution::context::TaskContext, @@ -14,13 +14,9 @@ use datafusion::{ }, }; use observability_deps::tracing::debug; -use schema::selection::Selection; -use schema::Schema; - -use crate::{exec::IOxSessionContext, QueryChunk}; use predicate::Predicate; - -use super::adapter::SchemaAdapterStream; +use schema::{selection::Selection, Schema}; +use std::{fmt, sync::Arc}; /// Implements the DataFusion physical plan interface #[derive(Debug)] diff --git a/query/src/pruning.rs b/query/src/pruning.rs index 0274aa6eb3..d505b9b0ad 100644 --- a/query/src/pruning.rs +++ b/query/src/pruning.rs @@ -1,23 +1,22 @@ //! Implementation of statistics based pruning -use std::sync::Arc; - -use arrow::array::{ - ArrayRef, BooleanArray, DictionaryArray, Float64Array, Int64Array, StringArray, UInt64Array, +use crate::QueryChunk; +use arrow::{ + array::{ + ArrayRef, BooleanArray, DictionaryArray, Float64Array, Int64Array, StringArray, UInt64Array, + }, + datatypes::{DataType, Int32Type, TimeUnit}, }; -use arrow::datatypes::{DataType, Int32Type, TimeUnit}; - -use data_types::partition_metadata::{StatValues, Statistics}; +use data_types2::{StatValues, Statistics}; use datafusion::{ logical_plan::Column, physical_optimizer::pruning::{PruningPredicate, PruningStatistics}, }; use observability_deps::tracing::{debug, trace}; use predicate::Predicate; -use schema::Schema; - -use crate::QueryChunk; use query_functions::group_by::Aggregate; +use schema::Schema; +use std::sync::Arc; /// Something that cares to be notified when pruning of chunks occurs pub trait PruningObserver { diff --git a/query/src/statistics.rs b/query/src/statistics.rs index 7f9d75b0ef..7268c52a44 100644 --- a/query/src/statistics.rs +++ b/query/src/statistics.rs @@ -1,8 +1,6 @@ //! Code to translate IOx statistics to DataFusion statistics -use data_types::partition_metadata::{ - ColumnSummary, InfluxDbType, Statistics as IOxStatistics, TableSummary, -}; +use data_types2::{ColumnSummary, InfluxDbType, Statistics as IOxStatistics, TableSummary}; use datafusion::{ physical_plan::{ColumnStatistics, Statistics as DFStatistics}, scalar::ScalarValue, @@ -107,11 +105,10 @@ fn df_from_iox_col(col: &ColumnSummary) -> ColumnStatistics { #[cfg(test)] mod test { - use std::num::NonZeroU64; - use super::*; - use data_types::partition_metadata::{InfluxDbType, StatValues}; + use data_types2::{InfluxDbType, StatValues}; use schema::{builder::SchemaBuilder, InfluxFieldType}; + use std::num::NonZeroU64; macro_rules! assert_nice_eq { ($actual:ident, $expected:ident) => { diff --git a/query/src/test.rs b/query/src/test.rs index 25222244be..8a2b85781c 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -283,7 +283,7 @@ impl TestChunk { Self { table_name, schema: Arc::new(SchemaBuilder::new().build().unwrap()), - table_summary: TableSummary::new(), + table_summary: TableSummary::default(), id: ChunkId::new_test(0), may_contain_pk_duplicates: Default::default(), predicates: Default::default(), diff --git a/read_buffer/Cargo.toml b/read_buffer/Cargo.toml index 815c98dc0c..c28185807c 100644 --- a/read_buffer/Cargo.toml +++ b/read_buffer/Cargo.toml @@ -15,6 +15,7 @@ arrow = { version = "13", features = ["prettyprint"] } arrow_util = { path = "../arrow_util" } croaring = "0.6" data_types = { path = "../data_types" } +data_types2 = { path = "../data_types2" } datafusion = { path = "../datafusion" } either = "1.6.1" hashbrown = "0.12" diff --git a/read_buffer/src/chunk.rs b/read_buffer/src/chunk.rs index 8fa6f8a8d1..f2412a2d35 100644 --- a/read_buffer/src/chunk.rs +++ b/read_buffer/src/chunk.rs @@ -5,11 +5,10 @@ use crate::{ table::{self, Table}, }; use arrow::{error::ArrowError, record_batch::RecordBatch}; -use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary}; - +use data_types::chunk_metadata::ChunkColumnSummary; +use data_types2::TableSummary; use observability_deps::tracing::debug; -use schema::selection::Selection; -use schema::{builder::Error as SchemaError, Schema}; +use schema::{builder::Error as SchemaError, selection::Selection, Schema}; use snafu::{ResultExt, Snafu}; use std::{ collections::{BTreeMap, BTreeSet}, @@ -527,11 +526,10 @@ mod test { Int32Type, }, }; - use data_types::partition_metadata::{ColumnSummary, InfluxDbType, StatValues, Statistics}; + use data_types2::{ColumnSummary, InfluxDbType, StatValues, Statistics}; use metric::{Attributes, MetricKind, Observation, ObservationSet, RawReporter}; use schema::builder::SchemaBuilder; - use std::iter::FromIterator; - use std::{num::NonZeroU64, sync::Arc}; + use std::{iter::FromIterator, num::NonZeroU64, sync::Arc}; // helper to make the `add_remove_tables` test simpler to read. fn gen_recordbatch() -> RecordBatch { diff --git a/read_buffer/src/schema.rs b/read_buffer/src/schema.rs index 1ea39cb3eb..981dce42c6 100644 --- a/read_buffer/src/schema.rs +++ b/read_buffer/src/schema.rs @@ -241,8 +241,8 @@ impl ColumnType { } } - pub fn as_influxdb_type(&self) -> Option { - use data_types::partition_metadata::InfluxDbType; + pub fn as_influxdb_type(&self) -> Option { + use data_types2::InfluxDbType; match self { Self::Tag(_) => Some(InfluxDbType::Tag), Self::Field(_) => Some(InfluxDbType::Field), diff --git a/read_buffer/src/table.rs b/read_buffer/src/table.rs index addab2105e..b6fbbd6d4f 100644 --- a/read_buffer/src/table.rs +++ b/read_buffer/src/table.rs @@ -6,7 +6,8 @@ use crate::{ BinaryExpr, }; use arrow::record_batch::RecordBatch; -use data_types::{chunk_metadata::ChunkColumnSummary, partition_metadata::TableSummary}; +use data_types::chunk_metadata::ChunkColumnSummary; +use data_types2::TableSummary; use parking_lot::RwLock; use schema::selection::Selection; use snafu::{ensure, Snafu}; @@ -782,7 +783,7 @@ impl MetaData { } pub fn to_summary(&self) -> TableSummary { - use data_types::partition_metadata::{ColumnSummary, StatValues, Statistics}; + use data_types2::{ColumnSummary, StatValues, Statistics}; let columns = self .columns .iter() @@ -850,8 +851,8 @@ impl MetaData { fn make_null_stats( total_count: u64, logical_data_type: &LogicalDataType, -) -> data_types::partition_metadata::Statistics { - use data_types::partition_metadata::{StatValues, Statistics}; +) -> data_types2::Statistics { + use data_types2::{StatValues, Statistics}; use LogicalDataType::*; match logical_data_type { @@ -1103,9 +1104,6 @@ impl std::fmt::Display for DisplayReadAggregateResults<'_> { #[cfg(test)] mod test { - use arrow::array::BooleanArray; - use data_types::partition_metadata::{StatValues, Statistics}; - use super::*; use crate::{ column::Column, @@ -1113,6 +1111,8 @@ mod test { schema::{self, LogicalDataType}, value::{AggregateVec, OwnedValue, Scalar}, }; + use arrow::array::BooleanArray; + use data_types2::{StatValues, Statistics}; #[test] fn meta_data_update_with() {