From 3a150594ab4f3260aea22b47c42d42a3283e3ee2 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies <1781103+tustvold@users.noreply.github.com> Date: Thu, 1 Apr 2021 14:34:21 +0100 Subject: [PATCH] refactor: push chunk and partition summary into catalog (#1103) Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- server/src/db.rs | 54 ++++++++--------------------- server/src/db/catalog.rs | 10 ++++++ server/src/db/catalog/partition.rs | 20 +++++++++++ src/influxdb_ioxd/rpc/management.rs | 1 + 4 files changed, 45 insertions(+), 40 deletions(-) diff --git a/server/src/db.rs b/server/src/db.rs index 33ec4b1b53..058b20c12e 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -414,43 +414,23 @@ impl Db { /// Return Summary information for all chunks in the specified /// partition across all storage systems - pub fn partition_chunk_summaries( - &self, - partition_key: &str, - ) -> impl Iterator { - let chunks = match self.catalog.partition(partition_key) { - Some(partition) => { - let partition = partition.read(); - partition.chunks().cloned().collect::>() - } - None => vec![], - }; - - chunks.into_iter().map(|chunk| { - let chunk = chunk.read(); - chunk.summary() - }) + pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec { + self.catalog + .partition(partition_key) + .map(|partition| partition.read().chunk_summaries().collect()) + .unwrap_or_default() } /// Return Summary information for all columns in all chunks in the /// partition across all storage systems pub fn partition_summary(&self, partition_key: &str) -> PartitionSummary { - let table_summaries = self - .catalog + self.catalog .partition(partition_key) - .map(|partition| { - let partition = partition.read(); - partition - .chunks() - .flat_map(|chunk| { - let chunk = chunk.read(); - chunk.table_summaries() - }) - .collect::>() + .map(|partition| partition.read().summary()) + .unwrap_or_else(|| PartitionSummary { + key: partition_key.to_string(), + tables: vec![], }) - .unwrap_or_else(Vec::new); - - PartitionSummary::from_table_summaries(partition_key, table_summaries) } /// Returns the number of iterations of the background worker loop pub fn worker_iterations(&self) -> usize { @@ -561,13 +541,7 @@ impl Database for Db { } fn chunk_summaries(&self) -> Result> { - let summaries = self - .partition_keys()? - .into_iter() - .map(|partition_key| self.partition_chunk_summaries(&partition_key)) - .flatten() - .collect(); - Ok(summaries) + Ok(self.catalog.chunk_summaries()) } } @@ -969,9 +943,7 @@ mod tests { Arc::new(s.to_string()) } - let chunk_summaries = db - .partition_chunk_summaries("1970-01-05T15") - .collect::>(); + let chunk_summaries = db.partition_chunk_summaries("1970-01-05T15"); let chunk_summaries = normalize_summaries(chunk_summaries); let expected = vec![ChunkSummary::new_without_timestamps( @@ -1266,6 +1238,7 @@ mod tests { fn mutable_chunk_ids(db: &Db, partition_key: &str) -> Vec { let mut chunk_ids: Vec = db .partition_chunk_summaries(partition_key) + .into_iter() .filter_map(|chunk| match chunk.storage { ChunkStorage::OpenMutableBuffer | ChunkStorage::ClosedMutableBuffer => { Some(chunk.id) @@ -1280,6 +1253,7 @@ mod tests { fn read_buffer_chunk_ids(db: &Db, partition_key: &str) -> Vec { let mut chunk_ids: Vec = db .partition_chunk_summaries(partition_key) + .into_iter() .filter_map(|chunk| match chunk.storage { ChunkStorage::ReadBuffer => Some(chunk.id), _ => None, diff --git a/server/src/db/catalog.rs b/server/src/db/catalog.rs index 62ff593adc..697d8fc6cb 100644 --- a/server/src/db/catalog.rs +++ b/server/src/db/catalog.rs @@ -10,6 +10,7 @@ use snafu::{OptionExt, Snafu}; use arrow_deps::datafusion::{catalog::schema::SchemaProvider, datasource::TableProvider}; use chunk::Chunk; +use data_types::chunk::ChunkSummary; use data_types::database_rules::{Order, Sort, SortOrder}; use data_types::error::ErrorLogger; use internal_types::selection::Selection; @@ -134,6 +135,15 @@ impl Catalog { chunks } + pub fn chunk_summaries(&self) -> Vec { + let mut summaries = Vec::new(); + for partition in self.partitions.read().values() { + let partition = partition.read(); + summaries.extend(partition.chunk_summaries()) + } + summaries + } + /// Returns the chunks in the requested sort order pub fn chunks_sorted_by(&self, sort_rules: &SortOrder) -> Vec>> { let mut chunks = self.chunks(); diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index 0b9d786066..41d3550e8f 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -7,6 +7,8 @@ use super::{ Result, UnknownChunk, }; use chrono::{DateTime, Utc}; +use data_types::chunk::ChunkSummary; +use data_types::partition_metadata::PartitionSummary; use parking_lot::RwLock; use snafu::OptionExt; @@ -127,4 +129,22 @@ impl Partition { pub fn chunks(&self) -> impl Iterator>> { self.chunks.values() } + + /// Return a PartitionSummary for this partition + pub fn summary(&self) -> PartitionSummary { + let table_summaries = self + .chunks() + .flat_map(|chunk| { + let chunk = chunk.read(); + chunk.table_summaries() + }) + .collect(); + + PartitionSummary::from_table_summaries(&self.key, table_summaries) + } + + /// Return chunk summaries for all chunks in this partition + pub fn chunk_summaries(&self) -> impl Iterator + '_ { + self.chunks.values().map(|x| x.read().summary()) + } } diff --git a/src/influxdb_ioxd/rpc/management.rs b/src/influxdb_ioxd/rpc/management.rs index 02646616c1..61034e0886 100644 --- a/src/influxdb_ioxd/rpc/management.rs +++ b/src/influxdb_ioxd/rpc/management.rs @@ -252,6 +252,7 @@ where let chunks: Vec = db .partition_chunk_summaries(&partition_key) + .into_iter() .map(|summary| summary.into()) .collect();