From eaa5b75437cff88f309582ead87f621b7e804125 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 3 Jun 2021 09:09:09 -0400 Subject: [PATCH] refactor: Make it clear only partition_key and table name pruning happens in catalog (#1608) * refactor: Make it clear only partition_key and table name pruning is happening in catalog * fix: clippy * fix: Update server/src/db/catalog.rs Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> * refactor: use TableNameFilter enum rather than Option * docs: Add docstring to the `From` implementation * fix: Update server/src/db/catalog/partition.rs Co-authored-by: Edd Robinson Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> Co-authored-by: Edd Robinson --- server/src/db.rs | 18 ++++--- server/src/db/catalog.rs | 79 ++++++++++++++++++++++-------- server/src/db/catalog/partition.rs | 21 +++++--- 3 files changed, 83 insertions(+), 35 deletions(-) diff --git a/server/src/db.rs b/server/src/db.rs index eee13b5d6e..8536e13393 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -1,6 +1,8 @@ //! This module contains the main IOx Database object which has the //! instances of the mutable buffer, read buffer, and object store +use self::catalog::TableNameFilter; + use super::{ buffer::{self, Buffer}, JobRegistry, @@ -39,8 +41,7 @@ use parquet_file::{ }, storage::Storage, }; -use query::predicate::{Predicate, PredicateBuilder}; -use query::{exec::Executor, Database, DEFAULT_SCHEMA}; +use query::{exec::Executor, predicate::Predicate, Database, DEFAULT_SCHEMA}; use rand_distr::{Distribution, Poisson}; use read_buffer::{Chunk as ReadBufferChunk, ChunkMetrics as ReadBufferChunkMetrics}; use snafu::{ResultExt, Snafu}; @@ -880,10 +881,11 @@ impl Db { /// Return chunk summary information for all chunks in the specified /// partition across all storage systems pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec { - self.catalog.state().filtered_chunks( - &PredicateBuilder::new().partition_key(partition_key).build(), - CatalogChunk::summary, - ) + let partition_key = Some(partition_key); + let table_names = TableNameFilter::AllTables; + self.catalog + .state() + .filtered_chunks(partition_key, table_names, CatalogChunk::summary) } /// Return Summary information for all columns in all chunks in the @@ -1116,9 +1118,11 @@ impl Database for Db { /// Note there could/should be an error here (if the partition /// doesn't exist... but the trait doesn't have an error) fn chunks(&self, predicate: &Predicate) -> Vec> { + let partition_key = predicate.partition_key.as_deref(); + let table_names: TableNameFilter<'_> = predicate.table_names.as_ref().into(); self.catalog .state() - .filtered_chunks(predicate, DbChunk::snapshot) + .filtered_chunks(partition_key, table_names, DbChunk::snapshot) } fn partition_keys(&self) -> Result, Self::Error> { diff --git a/server/src/db/catalog.rs b/server/src/db/catalog.rs index f0b2ee94bb..219039983a 100644 --- a/server/src/db/catalog.rs +++ b/server/src/db/catalog.rs @@ -1,5 +1,5 @@ //! This module contains the implementation of the InfluxDB IOx Metadata catalog -use std::any::Any; +use std::{any::Any, collections::BTreeSet}; use std::{ collections::{btree_map::Entry, BTreeMap}, sync::Arc, @@ -21,7 +21,6 @@ use internal_types::selection::Selection; use partition::Partition; use query::{ exec::stringset::StringSet, - predicate::Predicate, provider::{self, ProviderBuilder}, PartitionChunk, }; @@ -123,6 +122,34 @@ pub enum Error { } pub type Result = std::result::Result; +/// Specify which tables are to be matched when filtering +/// catalog chunks +#[derive(Debug, Clone, Copy)] +pub enum TableNameFilter<'a> { + /// Include all tables + AllTables, + /// Only include tables that appear in the named set + NamedTables(&'a BTreeSet), +} + +impl<'a> From>> for TableNameFilter<'a> { + /// Creates a [`TableNameFilter`] from an [`Option`]. + /// + /// If the Option is `None`, all table names will be included in + /// the results. + /// + /// If the Option is `Some(set)`, only table names which apear in + /// `set` will be included in the results. + /// + /// Note `Some(empty set)` will not match anything + fn from(v: Option<&'a BTreeSet>) -> Self { + match v { + Some(names) => Self::NamedTables(names), + None => Self::AllTables, + } + } +} + /// InfluxDB IOx Metadata Catalog /// /// The Catalog stores information such as which chunks exist, what @@ -233,11 +260,15 @@ impl Catalog { } pub fn chunk_summaries(&self) -> Vec { - self.filtered_chunks(&Predicate::default(), Chunk::summary) + let partition_key = None; + let table_names = TableNameFilter::AllTables; + self.filtered_chunks(partition_key, table_names, Chunk::summary) } pub fn detailed_chunk_summaries(&self) -> Vec { - self.filtered_chunks(&Predicate::default(), Chunk::detailed_summary) + let partition_key = None; + let table_names = TableNameFilter::AllTables; + self.filtered_chunks(partition_key, table_names, Chunk::detailed_summary) } /// Returns all chunks within the catalog in an arbitrary order @@ -272,16 +303,25 @@ impl Catalog { chunks } - /// Calls `map` with every chunk matching `predicate` and returns a - /// collection of the results - pub fn filtered_chunks(&self, predicate: &Predicate, map: F) -> Vec + /// Calls `map` with every chunk and returns a collection of the results + /// + /// If `partition_key` is Some(partition_key) only returns chunks + /// from the specified partiton. + /// + /// `table_names` specifies which tables to include + pub fn filtered_chunks( + &self, + partition_key: Option<&str>, + table_names: TableNameFilter<'_>, + map: F, + ) -> Vec where F: Fn(&Chunk) -> C + Copy, { let mut chunks = Vec::new(); let partitions = self.partitions.read(); - let partitions = match &predicate.partition_key { + let partitions = match partition_key { None => itertools::Either::Left(partitions.values()), Some(partition_key) => { itertools::Either::Right(partitions.get(partition_key).into_iter()) @@ -290,9 +330,8 @@ impl Catalog { for partition in partitions { let partition = partition.read(); - chunks.extend(partition.filtered_chunks(predicate).map(|chunk| { + chunks.extend(partition.filtered_chunks(table_names).map(|chunk| { let chunk = chunk.read(); - // TODO: Filter chunks map(&chunk) })) } @@ -360,7 +399,6 @@ mod tests { use super::*; use data_types::server_id::ServerId; use entry::{test_helpers::lp_to_entry, ClockValue}; - use query::predicate::PredicateBuilder; use std::convert::TryFrom; fn create_open_chunk(partition: &Arc>, table: &str) { @@ -602,6 +640,7 @@ mod tests { #[test] fn filtered_chunks() { + use TableNameFilter::*; let catalog = Catalog::test(); let p1 = catalog.get_or_create_partition("p1"); @@ -610,23 +649,21 @@ mod tests { create_open_chunk(&p1, "table2"); create_open_chunk(&p2, "table2"); - let a = catalog.filtered_chunks(&Predicate::default(), |_| ()); + let a = catalog.filtered_chunks(None, AllTables, |_| ()); - let b = catalog.filtered_chunks(&PredicateBuilder::new().table("table1").build(), |_| ()); + let b = catalog.filtered_chunks(None, NamedTables(&make_set("table1")), |_| ()); - let c = catalog.filtered_chunks(&PredicateBuilder::new().table("table2").build(), |_| ()); + let c = catalog.filtered_chunks(None, NamedTables(&make_set("table2")), |_| ()); - let d = catalog.filtered_chunks( - &PredicateBuilder::new() - .table("table2") - .partition_key("p2") - .build(), - |_| (), - ); + let d = catalog.filtered_chunks(Some("p2"), NamedTables(&make_set("table2")), |_| ()); assert_eq!(a.len(), 3); assert_eq!(b.len(), 1); assert_eq!(c.len(), 2); assert_eq!(d.len(), 1); } + + fn make_set(s: impl Into) -> BTreeSet { + std::iter::once(s.into()).collect() + } } diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index 41b73d7a29..579c9ceed6 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -9,14 +9,13 @@ use data_types::chunk_metadata::ChunkSummary; use data_types::partition_metadata::{ PartitionSummary, UnaggregatedPartitionSummary, UnaggregatedTableSummary, }; -use query::predicate::Predicate; use tracker::RwLock; use crate::db::catalog::metrics::PartitionMetrics; use super::{ chunk::{Chunk, ChunkStage}, - Error, Result, UnknownChunk, UnknownTable, + Error, Result, TableNameFilter, UnknownChunk, UnknownTable, }; /// IOx Catalog Partition @@ -231,16 +230,24 @@ impl Partition { self.tables.values().flat_map(|table| table.chunks.values()) } - /// Return an iterator over chunks in this partition that - /// may pass the provided predicate + /// Return an iterator over chunks in this partition + /// + /// `table_names` specifies which tables to include pub fn filtered_chunks<'a>( &'a self, - predicate: &'a Predicate, + table_names: TableNameFilter<'a>, ) -> impl Iterator>> + 'a { self.tables .iter() - .filter(move |(table_name, _)| predicate.should_include_table(table_name)) - .flat_map(|(_, table)| table.chunks.values()) + .filter_map( + move |(partition_table_name, partition_table)| match table_names { + TableNameFilter::AllTables => Some(partition_table.chunks.values()), + TableNameFilter::NamedTables(table_names) => table_names + .contains(partition_table_name) + .then(|| partition_table.chunks.values()), + }, + ) + .flatten() } /// Return the unaggregated chunk summary information for tables