refactor: Make it clear only partition_key and table name pruning happens in catalog (#1608)
* refactor: Make it clear only partition_key and table name pruning is happening in catalog * fix: clippy * fix: Update server/src/db/catalog.rs Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> * refactor: use TableNameFilter enum rather than Option * docs: Add docstring to the `From` implementation * fix: Update server/src/db/catalog/partition.rs Co-authored-by: Edd Robinson <me@edd.io> Co-authored-by: Carol (Nichols || Goulding) <193874+carols10cents@users.noreply.github.com> Co-authored-by: Edd Robinson <me@edd.io>pull/24376/head
parent
336b25c029
commit
eaa5b75437
|
@ -1,6 +1,8 @@
|
||||||
//! This module contains the main IOx Database object which has the
|
//! This module contains the main IOx Database object which has the
|
||||||
//! instances of the mutable buffer, read buffer, and object store
|
//! instances of the mutable buffer, read buffer, and object store
|
||||||
|
|
||||||
|
use self::catalog::TableNameFilter;
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
buffer::{self, Buffer},
|
buffer::{self, Buffer},
|
||||||
JobRegistry,
|
JobRegistry,
|
||||||
|
@ -39,8 +41,7 @@ use parquet_file::{
|
||||||
},
|
},
|
||||||
storage::Storage,
|
storage::Storage,
|
||||||
};
|
};
|
||||||
use query::predicate::{Predicate, PredicateBuilder};
|
use query::{exec::Executor, predicate::Predicate, Database, DEFAULT_SCHEMA};
|
||||||
use query::{exec::Executor, Database, DEFAULT_SCHEMA};
|
|
||||||
use rand_distr::{Distribution, Poisson};
|
use rand_distr::{Distribution, Poisson};
|
||||||
use read_buffer::{Chunk as ReadBufferChunk, ChunkMetrics as ReadBufferChunkMetrics};
|
use read_buffer::{Chunk as ReadBufferChunk, ChunkMetrics as ReadBufferChunkMetrics};
|
||||||
use snafu::{ResultExt, Snafu};
|
use snafu::{ResultExt, Snafu};
|
||||||
|
@ -880,10 +881,11 @@ impl Db {
|
||||||
/// Return chunk summary information for all chunks in the specified
|
/// Return chunk summary information for all chunks in the specified
|
||||||
/// partition across all storage systems
|
/// partition across all storage systems
|
||||||
pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec<ChunkSummary> {
|
pub fn partition_chunk_summaries(&self, partition_key: &str) -> Vec<ChunkSummary> {
|
||||||
self.catalog.state().filtered_chunks(
|
let partition_key = Some(partition_key);
|
||||||
&PredicateBuilder::new().partition_key(partition_key).build(),
|
let table_names = TableNameFilter::AllTables;
|
||||||
CatalogChunk::summary,
|
self.catalog
|
||||||
)
|
.state()
|
||||||
|
.filtered_chunks(partition_key, table_names, CatalogChunk::summary)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return Summary information for all columns in all chunks in the
|
/// Return Summary information for all columns in all chunks in the
|
||||||
|
@ -1116,9 +1118,11 @@ impl Database for Db {
|
||||||
/// Note there could/should be an error here (if the partition
|
/// Note there could/should be an error here (if the partition
|
||||||
/// doesn't exist... but the trait doesn't have an error)
|
/// doesn't exist... but the trait doesn't have an error)
|
||||||
fn chunks(&self, predicate: &Predicate) -> Vec<Arc<Self::Chunk>> {
|
fn chunks(&self, predicate: &Predicate) -> Vec<Arc<Self::Chunk>> {
|
||||||
|
let partition_key = predicate.partition_key.as_deref();
|
||||||
|
let table_names: TableNameFilter<'_> = predicate.table_names.as_ref().into();
|
||||||
self.catalog
|
self.catalog
|
||||||
.state()
|
.state()
|
||||||
.filtered_chunks(predicate, DbChunk::snapshot)
|
.filtered_chunks(partition_key, table_names, DbChunk::snapshot)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn partition_keys(&self) -> Result<Vec<String>, Self::Error> {
|
fn partition_keys(&self) -> Result<Vec<String>, Self::Error> {
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
//! This module contains the implementation of the InfluxDB IOx Metadata catalog
|
//! This module contains the implementation of the InfluxDB IOx Metadata catalog
|
||||||
use std::any::Any;
|
use std::{any::Any, collections::BTreeSet};
|
||||||
use std::{
|
use std::{
|
||||||
collections::{btree_map::Entry, BTreeMap},
|
collections::{btree_map::Entry, BTreeMap},
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
|
@ -21,7 +21,6 @@ use internal_types::selection::Selection;
|
||||||
use partition::Partition;
|
use partition::Partition;
|
||||||
use query::{
|
use query::{
|
||||||
exec::stringset::StringSet,
|
exec::stringset::StringSet,
|
||||||
predicate::Predicate,
|
|
||||||
provider::{self, ProviderBuilder},
|
provider::{self, ProviderBuilder},
|
||||||
PartitionChunk,
|
PartitionChunk,
|
||||||
};
|
};
|
||||||
|
@ -123,6 +122,34 @@ pub enum Error {
|
||||||
}
|
}
|
||||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
|
|
||||||
|
/// Specify which tables are to be matched when filtering
|
||||||
|
/// catalog chunks
|
||||||
|
#[derive(Debug, Clone, Copy)]
|
||||||
|
pub enum TableNameFilter<'a> {
|
||||||
|
/// Include all tables
|
||||||
|
AllTables,
|
||||||
|
/// Only include tables that appear in the named set
|
||||||
|
NamedTables(&'a BTreeSet<String>),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> From<Option<&'a BTreeSet<String>>> for TableNameFilter<'a> {
|
||||||
|
/// Creates a [`TableNameFilter`] from an [`Option`].
|
||||||
|
///
|
||||||
|
/// If the Option is `None`, all table names will be included in
|
||||||
|
/// the results.
|
||||||
|
///
|
||||||
|
/// If the Option is `Some(set)`, only table names which apear in
|
||||||
|
/// `set` will be included in the results.
|
||||||
|
///
|
||||||
|
/// Note `Some(empty set)` will not match anything
|
||||||
|
fn from(v: Option<&'a BTreeSet<String>>) -> Self {
|
||||||
|
match v {
|
||||||
|
Some(names) => Self::NamedTables(names),
|
||||||
|
None => Self::AllTables,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// InfluxDB IOx Metadata Catalog
|
/// InfluxDB IOx Metadata Catalog
|
||||||
///
|
///
|
||||||
/// The Catalog stores information such as which chunks exist, what
|
/// The Catalog stores information such as which chunks exist, what
|
||||||
|
@ -233,11 +260,15 @@ impl Catalog {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn chunk_summaries(&self) -> Vec<ChunkSummary> {
|
pub fn chunk_summaries(&self) -> Vec<ChunkSummary> {
|
||||||
self.filtered_chunks(&Predicate::default(), Chunk::summary)
|
let partition_key = None;
|
||||||
|
let table_names = TableNameFilter::AllTables;
|
||||||
|
self.filtered_chunks(partition_key, table_names, Chunk::summary)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn detailed_chunk_summaries(&self) -> Vec<DetailedChunkSummary> {
|
pub fn detailed_chunk_summaries(&self) -> Vec<DetailedChunkSummary> {
|
||||||
self.filtered_chunks(&Predicate::default(), Chunk::detailed_summary)
|
let partition_key = None;
|
||||||
|
let table_names = TableNameFilter::AllTables;
|
||||||
|
self.filtered_chunks(partition_key, table_names, Chunk::detailed_summary)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns all chunks within the catalog in an arbitrary order
|
/// Returns all chunks within the catalog in an arbitrary order
|
||||||
|
@ -272,16 +303,25 @@ impl Catalog {
|
||||||
chunks
|
chunks
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Calls `map` with every chunk matching `predicate` and returns a
|
/// Calls `map` with every chunk and returns a collection of the results
|
||||||
/// collection of the results
|
///
|
||||||
pub fn filtered_chunks<F, C>(&self, predicate: &Predicate, map: F) -> Vec<C>
|
/// If `partition_key` is Some(partition_key) only returns chunks
|
||||||
|
/// from the specified partiton.
|
||||||
|
///
|
||||||
|
/// `table_names` specifies which tables to include
|
||||||
|
pub fn filtered_chunks<F, C>(
|
||||||
|
&self,
|
||||||
|
partition_key: Option<&str>,
|
||||||
|
table_names: TableNameFilter<'_>,
|
||||||
|
map: F,
|
||||||
|
) -> Vec<C>
|
||||||
where
|
where
|
||||||
F: Fn(&Chunk) -> C + Copy,
|
F: Fn(&Chunk) -> C + Copy,
|
||||||
{
|
{
|
||||||
let mut chunks = Vec::new();
|
let mut chunks = Vec::new();
|
||||||
let partitions = self.partitions.read();
|
let partitions = self.partitions.read();
|
||||||
|
|
||||||
let partitions = match &predicate.partition_key {
|
let partitions = match partition_key {
|
||||||
None => itertools::Either::Left(partitions.values()),
|
None => itertools::Either::Left(partitions.values()),
|
||||||
Some(partition_key) => {
|
Some(partition_key) => {
|
||||||
itertools::Either::Right(partitions.get(partition_key).into_iter())
|
itertools::Either::Right(partitions.get(partition_key).into_iter())
|
||||||
|
@ -290,9 +330,8 @@ impl Catalog {
|
||||||
|
|
||||||
for partition in partitions {
|
for partition in partitions {
|
||||||
let partition = partition.read();
|
let partition = partition.read();
|
||||||
chunks.extend(partition.filtered_chunks(predicate).map(|chunk| {
|
chunks.extend(partition.filtered_chunks(table_names).map(|chunk| {
|
||||||
let chunk = chunk.read();
|
let chunk = chunk.read();
|
||||||
// TODO: Filter chunks
|
|
||||||
map(&chunk)
|
map(&chunk)
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
@ -360,7 +399,6 @@ mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use data_types::server_id::ServerId;
|
use data_types::server_id::ServerId;
|
||||||
use entry::{test_helpers::lp_to_entry, ClockValue};
|
use entry::{test_helpers::lp_to_entry, ClockValue};
|
||||||
use query::predicate::PredicateBuilder;
|
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
fn create_open_chunk(partition: &Arc<RwLock<Partition>>, table: &str) {
|
fn create_open_chunk(partition: &Arc<RwLock<Partition>>, table: &str) {
|
||||||
|
@ -602,6 +640,7 @@ mod tests {
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn filtered_chunks() {
|
fn filtered_chunks() {
|
||||||
|
use TableNameFilter::*;
|
||||||
let catalog = Catalog::test();
|
let catalog = Catalog::test();
|
||||||
|
|
||||||
let p1 = catalog.get_or_create_partition("p1");
|
let p1 = catalog.get_or_create_partition("p1");
|
||||||
|
@ -610,23 +649,21 @@ mod tests {
|
||||||
create_open_chunk(&p1, "table2");
|
create_open_chunk(&p1, "table2");
|
||||||
create_open_chunk(&p2, "table2");
|
create_open_chunk(&p2, "table2");
|
||||||
|
|
||||||
let a = catalog.filtered_chunks(&Predicate::default(), |_| ());
|
let a = catalog.filtered_chunks(None, AllTables, |_| ());
|
||||||
|
|
||||||
let b = catalog.filtered_chunks(&PredicateBuilder::new().table("table1").build(), |_| ());
|
let b = catalog.filtered_chunks(None, NamedTables(&make_set("table1")), |_| ());
|
||||||
|
|
||||||
let c = catalog.filtered_chunks(&PredicateBuilder::new().table("table2").build(), |_| ());
|
let c = catalog.filtered_chunks(None, NamedTables(&make_set("table2")), |_| ());
|
||||||
|
|
||||||
let d = catalog.filtered_chunks(
|
let d = catalog.filtered_chunks(Some("p2"), NamedTables(&make_set("table2")), |_| ());
|
||||||
&PredicateBuilder::new()
|
|
||||||
.table("table2")
|
|
||||||
.partition_key("p2")
|
|
||||||
.build(),
|
|
||||||
|_| (),
|
|
||||||
);
|
|
||||||
|
|
||||||
assert_eq!(a.len(), 3);
|
assert_eq!(a.len(), 3);
|
||||||
assert_eq!(b.len(), 1);
|
assert_eq!(b.len(), 1);
|
||||||
assert_eq!(c.len(), 2);
|
assert_eq!(c.len(), 2);
|
||||||
assert_eq!(d.len(), 1);
|
assert_eq!(d.len(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn make_set(s: impl Into<String>) -> BTreeSet<String> {
|
||||||
|
std::iter::once(s.into()).collect()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,14 +9,13 @@ use data_types::chunk_metadata::ChunkSummary;
|
||||||
use data_types::partition_metadata::{
|
use data_types::partition_metadata::{
|
||||||
PartitionSummary, UnaggregatedPartitionSummary, UnaggregatedTableSummary,
|
PartitionSummary, UnaggregatedPartitionSummary, UnaggregatedTableSummary,
|
||||||
};
|
};
|
||||||
use query::predicate::Predicate;
|
|
||||||
use tracker::RwLock;
|
use tracker::RwLock;
|
||||||
|
|
||||||
use crate::db::catalog::metrics::PartitionMetrics;
|
use crate::db::catalog::metrics::PartitionMetrics;
|
||||||
|
|
||||||
use super::{
|
use super::{
|
||||||
chunk::{Chunk, ChunkStage},
|
chunk::{Chunk, ChunkStage},
|
||||||
Error, Result, UnknownChunk, UnknownTable,
|
Error, Result, TableNameFilter, UnknownChunk, UnknownTable,
|
||||||
};
|
};
|
||||||
|
|
||||||
/// IOx Catalog Partition
|
/// IOx Catalog Partition
|
||||||
|
@ -231,16 +230,24 @@ impl Partition {
|
||||||
self.tables.values().flat_map(|table| table.chunks.values())
|
self.tables.values().flat_map(|table| table.chunks.values())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return an iterator over chunks in this partition that
|
/// Return an iterator over chunks in this partition
|
||||||
/// may pass the provided predicate
|
///
|
||||||
|
/// `table_names` specifies which tables to include
|
||||||
pub fn filtered_chunks<'a>(
|
pub fn filtered_chunks<'a>(
|
||||||
&'a self,
|
&'a self,
|
||||||
predicate: &'a Predicate,
|
table_names: TableNameFilter<'a>,
|
||||||
) -> impl Iterator<Item = &Arc<RwLock<Chunk>>> + 'a {
|
) -> impl Iterator<Item = &Arc<RwLock<Chunk>>> + 'a {
|
||||||
self.tables
|
self.tables
|
||||||
.iter()
|
.iter()
|
||||||
.filter(move |(table_name, _)| predicate.should_include_table(table_name))
|
.filter_map(
|
||||||
.flat_map(|(_, table)| table.chunks.values())
|
move |(partition_table_name, partition_table)| match table_names {
|
||||||
|
TableNameFilter::AllTables => Some(partition_table.chunks.values()),
|
||||||
|
TableNameFilter::NamedTables(table_names) => table_names
|
||||||
|
.contains(partition_table_name)
|
||||||
|
.then(|| partition_table.chunks.values()),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
.flatten()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the unaggregated chunk summary information for tables
|
/// Return the unaggregated chunk summary information for tables
|
||||||
|
|
Loading…
Reference in New Issue