Merge pull request #1997 from influxdata/cn/alt-table-summary

feat: Make a TableSummaryAndTimes type for incremental replacement of TableSummary
pull/24376/head
kodiakhq[bot] 2021-07-14 14:56:54 +00:00 committed by GitHub
commit cedd6269c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 93 additions and 63 deletions

View File

@ -1,6 +1,7 @@
//! This module contains structs that describe the metadata for a partition //! This module contains structs that describe the metadata for a partition
//! including schema, summary statistics, and file locations in storage. //! including schema, summary statistics, and file locations in storage.
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{ use std::{
borrow::{Borrow, Cow}, borrow::{Borrow, Cow},
@ -54,6 +55,35 @@ impl FromIterator<Self> for TableSummary {
} }
} }
/// Temporary transition struct that has times of first/last write. Will eventually replace
/// TableSummary entirely.
#[derive(Debug)]
pub struct TableSummaryAndTimes {
/// Table name
pub name: String,
/// Per column statistics
pub columns: Vec<ColumnSummary>,
/// Time at which the first data was written into this table. Note
/// this is not the same as the timestamps on the data itself
pub time_of_first_write: DateTime<Utc>,
/// Most recent time at which data write was initiated into this
/// chunk. Note this is not the same as the timestamps on the data
/// itself
pub time_of_last_write: DateTime<Utc>,
}
impl From<TableSummaryAndTimes> for TableSummary {
fn from(other: TableSummaryAndTimes) -> Self {
Self {
name: other.name,
columns: other.columns,
}
}
}
/// Metadata and statistics information for a table. This can be /// Metadata and statistics information for a table. This can be
/// either for the portion of a Table stored within a single chunk or /// either for the portion of a Table stored within a single chunk or
/// aggregated across chunks. /// aggregated across chunks.

View File

@ -4,7 +4,7 @@ use crate::{
}; };
use arrow::record_batch::RecordBatch; use arrow::record_batch::RecordBatch;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, TableSummary}; use data_types::partition_metadata::{ColumnSummary, InfluxDbType, TableSummaryAndTimes};
use entry::{Sequence, TableBatch}; use entry::{Sequence, TableBatch};
use hashbrown::HashMap; use hashbrown::HashMap;
use internal_types::{ use internal_types::{
@ -237,7 +237,7 @@ impl MBChunk {
} }
/// Returns a vec of the summary statistics of the tables in this chunk /// Returns a vec of the summary statistics of the tables in this chunk
pub fn table_summary(&self) -> TableSummary { pub fn table_summary(&self) -> TableSummaryAndTimes {
let mut columns: Vec<_> = self let mut columns: Vec<_> = self
.columns .columns
.iter() .iter()
@ -255,9 +255,11 @@ impl MBChunk {
columns.sort_by(|a, b| a.name.cmp(&b.name)); columns.sort_by(|a, b| a.name.cmp(&b.name));
TableSummary { TableSummaryAndTimes {
name: self.table_name.to_string(), name: self.table_name.to_string(),
columns, columns,
time_of_first_write: self.time_of_first_write,
time_of_last_write: self.time_of_last_write,
} }
} }
@ -521,55 +523,50 @@ mod tests {
assert!(chunk.time_of_first_write < after_write); assert!(chunk.time_of_first_write < after_write);
let summary = chunk.table_summary(); let summary = chunk.table_summary();
assert_eq!(summary.name, "cpu");
assert_eq!( let expected_column_summaries = vec![
summary, ColumnSummary {
TableSummary { name: "env".to_string(),
name: "cpu".to_string(), influxdb_type: Some(InfluxDbType::Tag),
columns: vec![ stats: Statistics::String(StatValues {
ColumnSummary { min: Some("prod".to_string()),
name: "env".to_string(), max: Some("stage".to_string()),
influxdb_type: Some(InfluxDbType::Tag), count: 3,
stats: Statistics::String(StatValues { distinct_count: Some(NonZeroU64::new(3).unwrap()),
min: Some("prod".to_string()), }),
max: Some("stage".to_string()), },
count: 3, ColumnSummary {
distinct_count: Some(NonZeroU64::new(3).unwrap()) name: "host".to_string(),
}) influxdb_type: Some(InfluxDbType::Tag),
}, stats: Statistics::String(StatValues {
ColumnSummary { min: Some("a".to_string()),
name: "host".to_string(), max: Some("c".to_string()),
influxdb_type: Some(InfluxDbType::Tag), count: 4,
stats: Statistics::String(StatValues { distinct_count: Some(NonZeroU64::new(3).unwrap()),
min: Some("a".to_string()), }),
max: Some("c".to_string()), },
count: 4, ColumnSummary {
distinct_count: Some(NonZeroU64::new(3).unwrap()) name: "time".to_string(),
}) influxdb_type: Some(InfluxDbType::Timestamp),
}, stats: Statistics::I64(StatValues {
ColumnSummary { min: Some(1),
name: "time".to_string(), max: Some(2),
influxdb_type: Some(InfluxDbType::Timestamp), count: 4,
stats: Statistics::I64(StatValues { distinct_count: None,
min: Some(1), }),
max: Some(2), },
count: 4, ColumnSummary {
distinct_count: None name: "val".to_string(),
}) influxdb_type: Some(InfluxDbType::Field),
}, stats: Statistics::F64(StatValues {
ColumnSummary { min: Some(2.),
name: "val".to_string(), max: Some(23.),
influxdb_type: Some(InfluxDbType::Field), count: 4,
stats: Statistics::F64(StatValues { distinct_count: None,
min: Some(2.), }),
max: Some(23.), },
count: 4, ];
distinct_count: None assert_eq!(summary.columns, expected_column_summaries);
})
},
]
}
)
} }
#[test] #[test]

View File

@ -238,31 +238,32 @@ impl ChunkMetrics {
impl CatalogChunk { impl CatalogChunk {
/// Creates a new open chunk from the provided MUB chunk. /// Creates a new open chunk from the provided MUB chunk.
/// ///
/// Panics if the provided chunk is empty, otherwise creates a new open chunk and records a /// Panics if the provided chunk is empty, otherwise creates a new open chunk.
/// write at the current time.
pub(super) fn new_open( pub(super) fn new_open(
addr: ChunkAddr, addr: ChunkAddr,
chunk: mutable_buffer::chunk::MBChunk, chunk: mutable_buffer::chunk::MBChunk,
metrics: ChunkMetrics, metrics: ChunkMetrics,
) -> Self { ) -> Self {
assert_eq!(chunk.table_name(), &addr.table_name); assert_eq!(chunk.table_name(), &addr.table_name);
let first_write = chunk.table_summary().time_of_first_write;
let last_write = chunk.table_summary().time_of_last_write;
let stage = ChunkStage::Open { mb_chunk: chunk }; let stage = ChunkStage::Open { mb_chunk: chunk };
metrics metrics
.state .state
.inc_with_labels(&[KeyValue::new("state", "open")]); .inc_with_labels(&[KeyValue::new("state", "open")]);
let mut chunk = Self { Self {
addr, addr,
stage, stage,
lifecycle_action: None, lifecycle_action: None,
metrics, metrics,
time_of_first_write: None, time_of_first_write: Some(first_write),
time_of_last_write: None, time_of_last_write: Some(last_write),
time_closed: None, time_closed: None,
}; }
chunk.record_write();
chunk
} }
/// Creates a new RUB chunk from the provided RUB chunk and metadata /// Creates a new RUB chunk from the provided RUB chunk and metadata
@ -475,7 +476,7 @@ impl CatalogChunk {
match &self.stage { match &self.stage {
ChunkStage::Open { mb_chunk, .. } => { ChunkStage::Open { mb_chunk, .. } => {
// The stats for open chunks change so can't be cached // The stats for open chunks change so can't be cached
Arc::new(mb_chunk.table_summary()) Arc::new(mb_chunk.table_summary().into())
} }
ChunkStage::Frozen { meta, .. } => Arc::clone(&meta.table_summary), ChunkStage::Frozen { meta, .. } => Arc::clone(&meta.table_summary),
ChunkStage::Persisted { meta, .. } => Arc::clone(&meta.table_summary), ChunkStage::Persisted { meta, .. } => Arc::clone(&meta.table_summary),
@ -533,7 +534,7 @@ impl CatalogChunk {
// Cache table summary + schema // Cache table summary + schema
let metadata = ChunkMetadata { let metadata = ChunkMetadata {
table_summary: Arc::new(mb_chunk.table_summary()), table_summary: Arc::new(mb_chunk.table_summary().into()),
schema: s.full_schema(), schema: s.full_schema(),
}; };
@ -836,6 +837,8 @@ mod tests {
let mb_chunk = make_mb_chunk(&addr.table_name, sequencer_id); let mb_chunk = make_mb_chunk(&addr.table_name, sequencer_id);
let chunk = CatalogChunk::new_open(addr, mb_chunk, ChunkMetrics::new_unregistered()); let chunk = CatalogChunk::new_open(addr, mb_chunk, ChunkMetrics::new_unregistered());
assert!(matches!(chunk.stage(), &ChunkStage::Open { .. })); assert!(matches!(chunk.stage(), &ChunkStage::Open { .. }));
assert!(chunk.time_of_first_write.is_some());
assert!(chunk.time_of_last_write.is_some());
} }
#[tokio::test] #[tokio::test]

View File

@ -109,7 +109,7 @@ impl DbChunk {
chunk: Arc::clone(&snapshot), chunk: Arc::clone(&snapshot),
}; };
let meta = ChunkMetadata { let meta = ChunkMetadata {
table_summary: Arc::new(mb_chunk.table_summary()), table_summary: Arc::new(mb_chunk.table_summary().into()),
schema: snapshot.full_schema(), schema: snapshot.full_schema(),
}; };
(state, Arc::new(meta)) (state, Arc::new(meta))