feat: Make a TableSummaryAndTimes to use to slowly replace TableSummary
And use TableSummaryAndTimes with the mutable buffer chunks when turning them into catalog chunks. It's proving too big to switch over everything using TableSummary at once, so this will let us switch over more incrementally.pull/24376/head
parent
b79d9eb0ab
commit
7ccbab8c90
|
@ -1,6 +1,7 @@
|
|||
//! This module contains structs that describe the metadata for a partition
|
||||
//! including schema, summary statistics, and file locations in storage.
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
borrow::{Borrow, Cow},
|
||||
|
@ -54,6 +55,35 @@ impl FromIterator<Self> for TableSummary {
|
|||
}
|
||||
}
|
||||
|
||||
/// Temporary transition struct that has times of first/last write. Will eventually replace
|
||||
/// TableSummary entirely.
|
||||
#[derive(Debug)]
|
||||
pub struct TableSummaryAndTimes {
|
||||
/// Table name
|
||||
pub name: String,
|
||||
|
||||
/// Per column statistics
|
||||
pub columns: Vec<ColumnSummary>,
|
||||
|
||||
/// Time at which the first data was written into this table. Note
|
||||
/// this is not the same as the timestamps on the data itself
|
||||
pub time_of_first_write: DateTime<Utc>,
|
||||
|
||||
/// Most recent time at which data write was initiated into this
|
||||
/// chunk. Note this is not the same as the timestamps on the data
|
||||
/// itself
|
||||
pub time_of_last_write: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl From<TableSummaryAndTimes> for TableSummary {
|
||||
fn from(other: TableSummaryAndTimes) -> Self {
|
||||
Self {
|
||||
name: other.name,
|
||||
columns: other.columns,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Metadata and statistics information for a table. This can be
|
||||
/// either for the portion of a Table stored within a single chunk or
|
||||
/// aggregated across chunks.
|
||||
|
|
|
@ -4,7 +4,7 @@ use crate::{
|
|||
};
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use chrono::{DateTime, Utc};
|
||||
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, TableSummary};
|
||||
use data_types::partition_metadata::{ColumnSummary, InfluxDbType, TableSummaryAndTimes};
|
||||
use entry::{Sequence, TableBatch};
|
||||
use hashbrown::HashMap;
|
||||
use internal_types::{
|
||||
|
@ -237,7 +237,7 @@ impl MBChunk {
|
|||
}
|
||||
|
||||
/// Returns a vec of the summary statistics of the tables in this chunk
|
||||
pub fn table_summary(&self) -> TableSummary {
|
||||
pub fn table_summary(&self) -> TableSummaryAndTimes {
|
||||
let mut columns: Vec<_> = self
|
||||
.columns
|
||||
.iter()
|
||||
|
@ -255,9 +255,11 @@ impl MBChunk {
|
|||
|
||||
columns.sort_by(|a, b| a.name.cmp(&b.name));
|
||||
|
||||
TableSummary {
|
||||
TableSummaryAndTimes {
|
||||
name: self.table_name.to_string(),
|
||||
columns,
|
||||
time_of_first_write: self.time_of_first_write,
|
||||
time_of_last_write: self.time_of_last_write,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -521,12 +523,8 @@ mod tests {
|
|||
assert!(chunk.time_of_first_write < after_write);
|
||||
|
||||
let summary = chunk.table_summary();
|
||||
|
||||
assert_eq!(
|
||||
summary,
|
||||
TableSummary {
|
||||
name: "cpu".to_string(),
|
||||
columns: vec![
|
||||
assert_eq!(summary.name, "cpu");
|
||||
let expected_column_summaries = vec![
|
||||
ColumnSummary {
|
||||
name: "env".to_string(),
|
||||
influxdb_type: Some(InfluxDbType::Tag),
|
||||
|
@ -534,8 +532,8 @@ mod tests {
|
|||
min: Some("prod".to_string()),
|
||||
max: Some("stage".to_string()),
|
||||
count: 3,
|
||||
distinct_count: Some(NonZeroU64::new(3).unwrap())
|
||||
})
|
||||
distinct_count: Some(NonZeroU64::new(3).unwrap()),
|
||||
}),
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "host".to_string(),
|
||||
|
@ -544,8 +542,8 @@ mod tests {
|
|||
min: Some("a".to_string()),
|
||||
max: Some("c".to_string()),
|
||||
count: 4,
|
||||
distinct_count: Some(NonZeroU64::new(3).unwrap())
|
||||
})
|
||||
distinct_count: Some(NonZeroU64::new(3).unwrap()),
|
||||
}),
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "time".to_string(),
|
||||
|
@ -554,8 +552,8 @@ mod tests {
|
|||
min: Some(1),
|
||||
max: Some(2),
|
||||
count: 4,
|
||||
distinct_count: None
|
||||
})
|
||||
distinct_count: None,
|
||||
}),
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "val".to_string(),
|
||||
|
@ -564,12 +562,11 @@ mod tests {
|
|||
min: Some(2.),
|
||||
max: Some(23.),
|
||||
count: 4,
|
||||
distinct_count: None
|
||||
})
|
||||
distinct_count: None,
|
||||
}),
|
||||
},
|
||||
]
|
||||
}
|
||||
)
|
||||
];
|
||||
assert_eq!(summary.columns, expected_column_summaries);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
|
|
@ -246,6 +246,10 @@ impl CatalogChunk {
|
|||
metrics: ChunkMetrics,
|
||||
) -> Self {
|
||||
assert_eq!(chunk.table_name(), &addr.table_name);
|
||||
|
||||
let first_write = chunk.table_summary().time_of_first_write;
|
||||
let last_write = chunk.table_summary().time_of_last_write;
|
||||
|
||||
let stage = ChunkStage::Open { mb_chunk: chunk };
|
||||
|
||||
metrics
|
||||
|
@ -257,8 +261,8 @@ impl CatalogChunk {
|
|||
stage,
|
||||
lifecycle_action: None,
|
||||
metrics,
|
||||
time_of_first_write: None,
|
||||
time_of_last_write: None,
|
||||
time_of_first_write: Some(first_write),
|
||||
time_of_last_write: Some(last_write),
|
||||
time_closed: None,
|
||||
};
|
||||
chunk.record_write();
|
||||
|
@ -475,7 +479,7 @@ impl CatalogChunk {
|
|||
match &self.stage {
|
||||
ChunkStage::Open { mb_chunk, .. } => {
|
||||
// The stats for open chunks change so can't be cached
|
||||
Arc::new(mb_chunk.table_summary())
|
||||
Arc::new(mb_chunk.table_summary().into())
|
||||
}
|
||||
ChunkStage::Frozen { meta, .. } => Arc::clone(&meta.table_summary),
|
||||
ChunkStage::Persisted { meta, .. } => Arc::clone(&meta.table_summary),
|
||||
|
@ -533,7 +537,7 @@ impl CatalogChunk {
|
|||
|
||||
// Cache table summary + schema
|
||||
let metadata = ChunkMetadata {
|
||||
table_summary: Arc::new(mb_chunk.table_summary()),
|
||||
table_summary: Arc::new(mb_chunk.table_summary().into()),
|
||||
schema: s.full_schema(),
|
||||
};
|
||||
|
||||
|
@ -836,6 +840,8 @@ mod tests {
|
|||
let mb_chunk = make_mb_chunk(&addr.table_name, sequencer_id);
|
||||
let chunk = CatalogChunk::new_open(addr, mb_chunk, ChunkMetrics::new_unregistered());
|
||||
assert!(matches!(chunk.stage(), &ChunkStage::Open { .. }));
|
||||
assert!(chunk.time_of_first_write.is_some());
|
||||
assert!(chunk.time_of_last_write.is_some());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
|
|
@ -109,7 +109,7 @@ impl DbChunk {
|
|||
chunk: Arc::clone(&snapshot),
|
||||
};
|
||||
let meta = ChunkMetadata {
|
||||
table_summary: Arc::new(mb_chunk.table_summary()),
|
||||
table_summary: Arc::new(mb_chunk.table_summary().into()),
|
||||
schema: snapshot.full_schema(),
|
||||
};
|
||||
(state, Arc::new(meta))
|
||||
|
|
Loading…
Reference in New Issue