refactor: push chunk and partition summary into catalog (#1103)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
a59a6edbb8
commit
3a150594ab
|
@ -414,43 +414,23 @@ impl Db {
|
|||
|
||||
/// Return Summary information for all chunks in the specified
|
||||
/// partition across all storage systems
|
||||
pub fn partition_chunk_summaries(
|
||||
&self,
|
||||
partition_key: &str,
|
||||
) -> impl Iterator<Item = ChunkSummary> {
|
||||
let chunks = match self.catalog.partition(partition_key) {
|
||||
Some(partition) => {
|
||||
let partition = partition.read();
|
||||
partition.chunks().cloned().collect::<Vec<_>>()
|
||||
}
|
||||
None => vec![],
|
||||
};
|
||||
|
||||
chunks.into_iter().map(|chunk| {
|
||||
let chunk = chunk.read();
|
||||
chunk.summary()
|
||||
})
|
||||
pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec<ChunkSummary> {
|
||||
self.catalog
|
||||
.partition(partition_key)
|
||||
.map(|partition| partition.read().chunk_summaries().collect())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Return Summary information for all columns in all chunks in the
|
||||
/// partition across all storage systems
|
||||
pub fn partition_summary(&self, partition_key: &str) -> PartitionSummary {
|
||||
let table_summaries = self
|
||||
.catalog
|
||||
self.catalog
|
||||
.partition(partition_key)
|
||||
.map(|partition| {
|
||||
let partition = partition.read();
|
||||
partition
|
||||
.chunks()
|
||||
.flat_map(|chunk| {
|
||||
let chunk = chunk.read();
|
||||
chunk.table_summaries()
|
||||
})
|
||||
.collect::<Vec<_>>()
|
||||
.map(|partition| partition.read().summary())
|
||||
.unwrap_or_else(|| PartitionSummary {
|
||||
key: partition_key.to_string(),
|
||||
tables: vec![],
|
||||
})
|
||||
.unwrap_or_else(Vec::new);
|
||||
|
||||
PartitionSummary::from_table_summaries(partition_key, table_summaries)
|
||||
}
|
||||
/// Returns the number of iterations of the background worker loop
|
||||
pub fn worker_iterations(&self) -> usize {
|
||||
|
@ -561,13 +541,7 @@ impl Database for Db {
|
|||
}
|
||||
|
||||
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>> {
|
||||
let summaries = self
|
||||
.partition_keys()?
|
||||
.into_iter()
|
||||
.map(|partition_key| self.partition_chunk_summaries(&partition_key))
|
||||
.flatten()
|
||||
.collect();
|
||||
Ok(summaries)
|
||||
Ok(self.catalog.chunk_summaries())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -969,9 +943,7 @@ mod tests {
|
|||
Arc::new(s.to_string())
|
||||
}
|
||||
|
||||
let chunk_summaries = db
|
||||
.partition_chunk_summaries("1970-01-05T15")
|
||||
.collect::<Vec<_>>();
|
||||
let chunk_summaries = db.partition_chunk_summaries("1970-01-05T15");
|
||||
let chunk_summaries = normalize_summaries(chunk_summaries);
|
||||
|
||||
let expected = vec![ChunkSummary::new_without_timestamps(
|
||||
|
@ -1266,6 +1238,7 @@ mod tests {
|
|||
fn mutable_chunk_ids(db: &Db, partition_key: &str) -> Vec<u32> {
|
||||
let mut chunk_ids: Vec<u32> = db
|
||||
.partition_chunk_summaries(partition_key)
|
||||
.into_iter()
|
||||
.filter_map(|chunk| match chunk.storage {
|
||||
ChunkStorage::OpenMutableBuffer | ChunkStorage::ClosedMutableBuffer => {
|
||||
Some(chunk.id)
|
||||
|
@ -1280,6 +1253,7 @@ mod tests {
|
|||
fn read_buffer_chunk_ids(db: &Db, partition_key: &str) -> Vec<u32> {
|
||||
let mut chunk_ids: Vec<u32> = db
|
||||
.partition_chunk_summaries(partition_key)
|
||||
.into_iter()
|
||||
.filter_map(|chunk| match chunk.storage {
|
||||
ChunkStorage::ReadBuffer => Some(chunk.id),
|
||||
_ => None,
|
||||
|
|
|
@ -10,6 +10,7 @@ use snafu::{OptionExt, Snafu};
|
|||
|
||||
use arrow_deps::datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider};
|
||||
use chunk::Chunk;
|
||||
use data_types::chunk::ChunkSummary;
|
||||
use data_types::database_rules::{Order, Sort, SortOrder};
|
||||
use data_types::error::ErrorLogger;
|
||||
use internal_types::selection::Selection;
|
||||
|
@ -134,6 +135,15 @@ impl Catalog {
|
|||
chunks
|
||||
}
|
||||
|
||||
pub fn chunk_summaries(&self) -> Vec<ChunkSummary> {
|
||||
let mut summaries = Vec::new();
|
||||
for partition in self.partitions.read().values() {
|
||||
let partition = partition.read();
|
||||
summaries.extend(partition.chunk_summaries())
|
||||
}
|
||||
summaries
|
||||
}
|
||||
|
||||
/// Returns the chunks in the requested sort order
|
||||
pub fn chunks_sorted_by(&self, sort_rules: &SortOrder) -> Vec<Arc<RwLock<Chunk>>> {
|
||||
let mut chunks = self.chunks();
|
||||
|
|
|
@ -7,6 +7,8 @@ use super::{
|
|||
Result, UnknownChunk,
|
||||
};
|
||||
use chrono::{DateTime, Utc};
|
||||
use data_types::chunk::ChunkSummary;
|
||||
use data_types::partition_metadata::PartitionSummary;
|
||||
use parking_lot::RwLock;
|
||||
use snafu::OptionExt;
|
||||
|
||||
|
@ -127,4 +129,22 @@ impl Partition {
|
|||
pub fn chunks(&self) -> impl Iterator<Item = &Arc<RwLock<Chunk>>> {
|
||||
self.chunks.values()
|
||||
}
|
||||
|
||||
/// Return a PartitionSummary for this partition
|
||||
pub fn summary(&self) -> PartitionSummary {
|
||||
let table_summaries = self
|
||||
.chunks()
|
||||
.flat_map(|chunk| {
|
||||
let chunk = chunk.read();
|
||||
chunk.table_summaries()
|
||||
})
|
||||
.collect();
|
||||
|
||||
PartitionSummary::from_table_summaries(&self.key, table_summaries)
|
||||
}
|
||||
|
||||
/// Return chunk summaries for all chunks in this partition
|
||||
pub fn chunk_summaries(&self) -> impl Iterator<Item = ChunkSummary> + '_ {
|
||||
self.chunks.values().map(|x| x.read().summary())
|
||||
}
|
||||
}
|
||||
|
|
|
@ -252,6 +252,7 @@ where
|
|||
|
||||
let chunks: Vec<Chunk> = db
|
||||
.partition_chunk_summaries(&partition_key)
|
||||
.into_iter()
|
||||
.map(|summary| summary.into())
|
||||
.collect();
|
||||
|
||||
|
|
Loading…
Reference in New Issue