diff --git a/query/src/frontend/influxrpc.rs b/query/src/frontend/influxrpc.rs index f08931a4d5..ed3894b583 100644 --- a/query/src/frontend/influxrpc.rs +++ b/query/src/frontend/influxrpc.rs @@ -16,7 +16,7 @@ use internal_types::{ selection::Selection, }; use observability_deps::tracing::{debug, trace}; -use snafu::{ensure, ResultExt, Snafu}; +use snafu::{ensure, OptionExt, ResultExt, Snafu}; use crate::{ exec::{field::FieldColumns, make_schema_pivot}, @@ -141,6 +141,9 @@ pub enum Error { #[snafu(display("Internal error: aggregate {:?} is not a selector", agg))] InternalAggregateNotSelector { agg: Aggregate }, + + #[snafu(display("Table was removed while planning query: {}", table_name))] + TableRemoved { table_name: String }, } pub type Result = std::result::Result; @@ -301,7 +304,10 @@ impl InfluxRpcPlanner { // were already known to have data (based on the contents of known_columns) for (table_name, chunks) in need_full_plans.into_iter() { - let plan = self.tag_keys_plan(&table_name, &predicate, chunks)?; + let schema = database.table_schema(&table_name).context(TableRemoved { + table_name: &table_name, + })?; + let plan = self.tag_keys_plan(&table_name, schema, &predicate, chunks)?; if let Some(plan) = plan { builder = builder.append(plan) @@ -422,7 +428,10 @@ impl InfluxRpcPlanner { // time in `known_columns`, and some tables in chunks that we // need to run a plan to find what values pass the predicate. for (table_name, chunks) in need_full_plans.into_iter() { - let scan_and_filter = self.scan_and_filter(&table_name, &predicate, chunks)?; + let schema = database.table_schema(&table_name).context(TableRemoved { + table_name: &table_name, + })?; + let scan_and_filter = self.scan_and_filter(&table_name, schema, &predicate, chunks)?; // if we have any data to scan, make a plan! if let Some(TableScanAndFilter { @@ -483,7 +492,10 @@ impl InfluxRpcPlanner { let mut field_list_plan = FieldListPlan::new(); for (table_name, chunks) in table_chunks { - if let Some(plan) = self.field_columns_plan(&table_name, &predicate, chunks)? { + let schema = database.table_schema(&table_name).context(TableRemoved { + table_name: &table_name, + })?; + if let Some(plan) = self.field_columns_plan(&table_name, schema, &predicate, chunks)? { field_list_plan = field_list_plan.append(plan); } } @@ -524,8 +536,12 @@ impl InfluxRpcPlanner { let mut ss_plans = Vec::with_capacity(table_chunks.len()); for (table_name, chunks) in table_chunks { let prefix_columns: Option<&[&str]> = None; + let schema = database.table_schema(&table_name).context(TableRemoved { + table_name: &table_name, + })?; - let ss_plan = self.read_filter_plan(table_name, prefix_columns, &predicate, chunks)?; + let ss_plan = + self.read_filter_plan(table_name, schema, prefix_columns, &predicate, chunks)?; // If we have to do real work, add it to the list of plans if let Some(ss_plan) = ss_plan { ss_plans.push(ss_plan); @@ -559,11 +575,25 @@ impl InfluxRpcPlanner { // now, build up plans for each table let mut ss_plans = Vec::with_capacity(table_chunks.len()); for (table_name, chunks) in table_chunks { + let schema = database.table_schema(&table_name).context(TableRemoved { + table_name: &table_name, + })?; let ss_plan = match agg { - Aggregate::None => { - self.read_filter_plan(table_name, Some(group_columns), &predicate, chunks)? - } - _ => self.read_group_plan(table_name, &predicate, agg, group_columns, chunks)?, + Aggregate::None => self.read_filter_plan( + table_name, + Arc::clone(&schema), + Some(group_columns), + &predicate, + chunks, + )?, + _ => self.read_group_plan( + table_name, + schema, + &predicate, + agg, + group_columns, + chunks, + )?, }; // If we have to do real work, add it to the list of plans @@ -604,8 +634,12 @@ impl InfluxRpcPlanner { // now, build up plans for each table let mut ss_plans = Vec::with_capacity(table_chunks.len()); for (table_name, chunks) in table_chunks { - let ss_plan = self - .read_window_aggregate_plan(table_name, &predicate, agg, &every, &offset, chunks)?; + let schema = database.table_schema(&table_name).context(TableRemoved { + table_name: &table_name, + })?; + let ss_plan = self.read_window_aggregate_plan( + table_name, schema, &predicate, agg, &every, &offset, chunks, + )?; // If we have to do real work, add it to the list of plans if let Some(ss_plan) = ss_plan { ss_plans.push(ss_plan); @@ -665,13 +699,14 @@ impl InfluxRpcPlanner { fn tag_keys_plan( &self, table_name: &str, + schema: Arc, predicate: &Predicate, chunks: Vec>, ) -> Result> where C: QueryChunk + 'static, { - let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?; + let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?; let TableScanAndFilter { plan_builder, @@ -727,13 +762,14 @@ impl InfluxRpcPlanner { fn field_columns_plan( &self, table_name: &str, + schema: Arc, predicate: &Predicate, chunks: Vec>, ) -> Result> where C: QueryChunk + 'static, { - let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?; + let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?; let TableScanAndFilter { plan_builder, schema, @@ -777,6 +813,7 @@ impl InfluxRpcPlanner { fn read_filter_plan( &self, table_name: impl AsRef, + schema: Arc, prefix_columns: Option<&[impl AsRef]>, predicate: &Predicate, chunks: Vec>, @@ -785,7 +822,7 @@ impl InfluxRpcPlanner { C: QueryChunk + 'static, { let table_name = table_name.as_ref(); - let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?; + let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?; let TableScanAndFilter { plan_builder, @@ -897,6 +934,7 @@ impl InfluxRpcPlanner { fn read_group_plan( &self, table_name: impl Into, + schema: Arc, predicate: &Predicate, agg: Aggregate, group_columns: &[impl AsRef], @@ -906,7 +944,7 @@ impl InfluxRpcPlanner { C: QueryChunk + 'static, { let table_name = table_name.into(); - let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?; + let scan_and_filter = self.scan_and_filter(&table_name, schema, predicate, chunks)?; let TableScanAndFilter { plan_builder, @@ -984,9 +1022,11 @@ impl InfluxRpcPlanner { /// GroupBy(gby: tag columns, window_function; agg: aggregate(field)) /// Filter(predicate) /// Scan + #[allow(clippy::too_many_arguments)] fn read_window_aggregate_plan( &self, table_name: impl Into, + schema: Arc, predicate: &Predicate, agg: Aggregate, every: &WindowDuration, @@ -997,7 +1037,7 @@ impl InfluxRpcPlanner { C: QueryChunk + 'static, { let table_name = table_name.into(); - let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?; + let scan_and_filter = self.scan_and_filter(&table_name, schema, predicate, chunks)?; let TableScanAndFilter { plan_builder, @@ -1074,6 +1114,7 @@ impl InfluxRpcPlanner { fn scan_and_filter( &self, table_name: &str, + schema: Arc, predicate: &Predicate, chunks: Vec>, ) -> Result> @@ -1085,7 +1126,7 @@ impl InfluxRpcPlanner { let projection = None; // Prepare the scan of the table - let mut builder = ProviderBuilder::new(table_name); + let mut builder = ProviderBuilder::new(table_name, schema); // Since the entire predicate is used in the call to // `database.chunks()` there will not be any additional @@ -1106,9 +1147,7 @@ impl InfluxRpcPlanner { chunk.id(), ); - builder = builder - .add_chunk(chunk) - .context(CreatingProvider { table_name })?; + builder = builder.add_chunk(chunk); } let provider = builder.build().context(CreatingProvider { table_name })?; @@ -1217,7 +1256,7 @@ struct TableScanAndFilter { /// Represents plan that scans a table and applies optional filtering plan_builder: LogicalPlanBuilder, /// The IOx schema of the result - schema: Schema, + schema: Arc, } /// Reorders tag_columns so that its prefix matches exactly diff --git a/query/src/frontend/reorg.rs b/query/src/frontend/reorg.rs index 069273f9f9..366e2fe852 100644 --- a/query/src/frontend/reorg.rs +++ b/query/src/frontend/reorg.rs @@ -63,9 +63,10 @@ impl ReorgPlanner { /// (Scan chunks) <-- any needed deduplication happens here pub fn compact_plan( &self, + schema: Arc, chunks: I, output_sort: SortKey<'_>, - ) -> Result<(Schema, LogicalPlan)> + ) -> Result<(Arc, LogicalPlan)> where C: QueryChunk + 'static, I: IntoIterator>, @@ -73,13 +74,21 @@ impl ReorgPlanner { let ScanPlan { plan_builder, provider, - } = self.scan_and_sort_plan(chunks, output_sort.clone())?; + } = self.scan_and_sort_plan(schema, chunks, output_sort.clone())?; let mut schema = provider.iox_schema(); // Set the sort_key of the schema to the compacted chunk's sort key + // Try to do this only if the sort key changes so we avoid unnecessary schema copies. trace!(input_schema=?schema, "Setting sort key on schema"); - schema.set_sort_key(&output_sort); + if schema + .sort_key() + .map_or(true, |existing_key| existing_key != output_sort) + { + let mut schema_cloned = schema.as_ref().clone(); + schema_cloned.set_sort_key(&output_sort); + schema = Arc::new(schema_cloned); + } trace!(output_schema=?schema, "Setting sort key on schema"); let plan = plan_builder.build().context(BuildingPlan)?; @@ -136,10 +145,11 @@ impl ReorgPlanner { /// ``` pub fn split_plan( &self, + schema: Arc, chunks: I, output_sort: SortKey<'_>, split_time: i64, - ) -> Result<(Schema, LogicalPlan)> + ) -> Result<(Arc, LogicalPlan)> where C: QueryChunk + 'static, I: IntoIterator>, @@ -147,7 +157,7 @@ impl ReorgPlanner { let ScanPlan { plan_builder, provider, - } = self.scan_and_sort_plan(chunks, output_sort)?; + } = self.scan_and_sort_plan(schema, chunks, output_sort)?; // TODO: Set sort key on schema let schema = provider.iox_schema(); @@ -176,7 +186,12 @@ impl ReorgPlanner { /// /// (Sort on output_sort) /// (Scan chunks) <-- any needed deduplication happens here - fn scan_and_sort_plan(&self, chunks: I, output_sort: SortKey<'_>) -> Result> + fn scan_and_sort_plan( + &self, + schema: Arc, + chunks: I, + output_sort: SortKey<'_>, + ) -> Result> where C: QueryChunk + 'static, I: IntoIterator>, @@ -189,7 +204,7 @@ impl ReorgPlanner { let table_name = &table_name; // Prepare the plan for the table - let mut builder = ProviderBuilder::new(table_name); + let mut builder = ProviderBuilder::new(table_name, schema); // There are no predicates in these plans, so no need to prune them builder = builder.add_no_op_pruner(); @@ -203,9 +218,7 @@ impl ReorgPlanner { chunk.id(), ); - builder = builder - .add_chunk(chunk) - .context(CreatingProvider { table_name })?; + builder = builder.add_chunk(chunk); } let provider = builder.build().context(CreatingProvider { table_name })?; @@ -244,16 +257,17 @@ struct ScanPlan { #[cfg(test)] mod test { use arrow_util::assert_batches_eq; - use internal_types::schema::sort::SortOptions; + use internal_types::schema::{merge::SchemaMerger, sort::SortOptions}; use crate::{ exec::{Executor, ExecutorType}, test::{raw_data, TestChunk}, + QueryChunkMeta, }; use super::*; - async fn get_test_chunks() -> Vec> { + async fn get_test_chunks() -> (Arc, Vec>) { // Chunk 1 with 5 rows of data on 2 tags let chunk1 = Arc::new( TestChunk::new(1) @@ -298,12 +312,19 @@ mod test { ]; assert_batches_eq!(&expected, &raw_data(&[Arc::clone(&chunk2)]).await); - vec![chunk1, chunk2] + let schema = SchemaMerger::new() + .merge(&chunk1.schema()) + .unwrap() + .merge(&chunk2.schema()) + .unwrap() + .build(); + + (Arc::new(schema), vec![chunk1, chunk2]) } #[tokio::test] async fn test_compact_plan() { - let chunks = get_test_chunks().await; + let (schema, chunks) = get_test_chunks().await; let mut sort_key = SortKey::with_capacity(2); sort_key.push( @@ -322,7 +343,7 @@ mod test { ); let (_, compact_plan) = ReorgPlanner::new() - .compact_plan(chunks, sort_key) + .compact_plan(schema, chunks, sort_key) .expect("created compact plan"); let executor = Executor::new(1); @@ -363,7 +384,7 @@ mod test { test_helpers::maybe_start_logging(); // validate that the plumbing is all hooked up. The logic of // the operator is tested in its own module. - let chunks = get_test_chunks().await; + let (schema, chunks) = get_test_chunks().await; let mut sort_key = SortKey::with_capacity(1); sort_key.push( @@ -376,7 +397,7 @@ mod test { // split on 1000 should have timestamps 1000, 5000, and 7000 let (_, split_plan) = ReorgPlanner::new() - .split_plan(chunks, sort_key, 1000) + .split_plan(schema, chunks, sort_key, 1000) .expect("created compact plan"); let executor = Executor::new(1); diff --git a/query/src/lib.rs b/query/src/lib.rs index 8e991144b0..e8faec56b0 100644 --- a/query/src/lib.rs +++ b/query/src/lib.rs @@ -55,6 +55,9 @@ pub trait QueryDatabase: Debug + Send + Sync { /// Return the partition keys for data in this DB fn partition_keys(&self) -> Result, Self::Error>; + /// Schema for a specific table if the table exists. + fn table_schema(&self, table_name: &str) -> Option>; + /// Returns a set of chunks within the partition with data that may match /// the provided predicate. If possible, chunks which have no rows that can /// possibly match the predicate may be omitted. diff --git a/query/src/provider.rs b/query/src/provider.rs index 37a1b923f2..a7dbeeac12 100644 --- a/query/src/provider.rs +++ b/query/src/provider.rs @@ -40,26 +40,12 @@ use self::{ #[derive(Debug, Snafu)] pub enum Error { - #[snafu(display("Chunk schema not compatible for table '{}': {}", table_name, source))] - ChunkSchemaNotCompatible { - table_name: String, - source: internal_types::schema::merge::Error, - }, - #[snafu(display( "Internal error: no chunk pruner provided to builder for {}", table_name, ))] InternalNoChunkPruner { table_name: String }, - #[snafu(display("Internal error: No rows found in table '{}'", table_name))] - InternalNoRowsInTable { table_name: String }, - - #[snafu(display("Internal error: Cannot verify the push-down predicate '{}'", source,))] - InternalPushdownPredicate { - source: datafusion::error::DataFusionError, - }, - #[snafu(display("Internal error: Cannot create projection select expr '{}'", source,))] InternalSelectExpr { source: datafusion::error::DataFusionError, @@ -106,35 +92,25 @@ pub trait ChunkPruner: Sync + Send + std::fmt::Debug { #[derive(Debug)] pub struct ProviderBuilder { table_name: Arc, - schema_merger: SchemaMerger, + schema: Arc, chunk_pruner: Option>>, chunks: Vec>, } impl ProviderBuilder { - pub fn new(table_name: impl AsRef) -> Self { + pub fn new(table_name: impl AsRef, schema: Arc) -> Self { Self { table_name: Arc::from(table_name.as_ref()), - schema_merger: SchemaMerger::new(), + schema, chunk_pruner: None, chunks: Vec::new(), } } /// Add a new chunk to this provider - pub fn add_chunk(mut self, chunk: Arc) -> Result { - let chunk_table_schema = chunk.schema(); - - self.schema_merger = self - .schema_merger - .merge(&chunk_table_schema.as_ref()) - .context(ChunkSchemaNotCompatible { - table_name: self.table_name.as_ref(), - })?; - + pub fn add_chunk(mut self, chunk: Arc) -> Self { self.chunks.push(chunk); - - Ok(self) + self } /// Specify a `ChunkPruner` for the provider that will apply @@ -161,16 +137,6 @@ impl ProviderBuilder { /// Create the Provider pub fn build(self) -> Result> { - let iox_schema = self.schema_merger.build(); - - // if the table was reported to exist, it should not be empty - if self.chunks.is_empty() { - return InternalNoRowsInTable { - table_name: self.table_name.as_ref(), - } - .fail(); - } - let chunk_pruner = match self.chunk_pruner { Some(chunk_pruner) => chunk_pruner, None => { @@ -182,7 +148,7 @@ impl ProviderBuilder { }; Ok(ChunkTableProvider { - iox_schema, + iox_schema: self.schema, chunk_pruner, table_name: self.table_name, chunks: self.chunks, @@ -198,7 +164,7 @@ impl ProviderBuilder { pub struct ChunkTableProvider { table_name: Arc, /// The IOx schema (wrapper around Arrow Schemaref) for this table - iox_schema: Schema, + iox_schema: Arc, /// Something that can prune chunks chunk_pruner: Arc>, // The chunks @@ -207,8 +173,8 @@ pub struct ChunkTableProvider { impl ChunkTableProvider { /// Return the IOx schema view for the data provided by this provider - pub fn iox_schema(&self) -> Schema { - self.iox_schema.clone() + pub fn iox_schema(&self) -> Arc { + Arc::clone(&self.iox_schema) } /// Return the Arrow schema view for the data provided by this provider @@ -255,8 +221,8 @@ impl TableProvider for ChunkTableProvider { // Figure out the schema of the requested output let scan_schema = match projection { - Some(indicies) => self.iox_schema.select_by_indices(indicies), - None => self.iox_schema.clone(), + Some(indicies) => Arc::new(self.iox_schema.select_by_indices(indicies)), + None => Arc::clone(&self.iox_schema), }; // This debug shows the self.arrow_schema() includes all columns in all chunks @@ -266,7 +232,7 @@ impl TableProvider for ChunkTableProvider { let mut deduplicate = Deduplicater::new(); let plan = deduplicate.build_scan_plan( Arc::clone(&self.table_name), - Arc::new(scan_schema), + scan_schema, chunks, predicate, )?; diff --git a/query/src/test.rs b/query/src/test.rs index d9ca63ad22..218d083ee6 100644 --- a/query/src/test.rs +++ b/query/src/test.rs @@ -116,6 +116,23 @@ impl QueryDatabase for TestDatabase { fn chunk_summaries(&self) -> Result, Self::Error> { unimplemented!("summaries not implemented TestDatabase") } + + fn table_schema(&self, table_name: &str) -> Option> { + let mut merger = SchemaMerger::new(); + let mut found_one = false; + + let partitions = self.partitions.lock(); + for partition in partitions.values() { + for chunk in partition.values() { + if chunk.table_name() == table_name { + merger = merger.merge(&chunk.schema()).expect("consistent schemas"); + found_one = true; + } + } + } + + found_one.then(|| Arc::new(merger.build())) + } } #[derive(Debug, Default)] diff --git a/query_tests/cases/in/all_chunks_dropped.expected b/query_tests/cases/in/all_chunks_dropped.expected new file mode 100644 index 0000000000..5febb4d2e9 --- /dev/null +++ b/query_tests/cases/in/all_chunks_dropped.expected @@ -0,0 +1,25 @@ +-- Test Setup: OneMeasurementAllChunksDropped +-- SQL: SELECT * from information_schema.tables; ++---------------+--------------------+---------------+------------+ +| table_catalog | table_schema | table_name | table_type | ++---------------+--------------------+---------------+------------+ +| public | iox | h2o | BASE TABLE | +| public | system | chunks | BASE TABLE | +| public | system | columns | BASE TABLE | +| public | system | chunk_columns | BASE TABLE | +| public | system | operations | BASE TABLE | +| public | information_schema | tables | VIEW | +| public | information_schema | columns | VIEW | ++---------------+--------------------+---------------+------------+ +-- SQL: SHOW TABLES; ++---------------+--------------------+---------------+------------+ +| table_catalog | table_schema | table_name | table_type | ++---------------+--------------------+---------------+------------+ +| public | iox | h2o | BASE TABLE | +| public | system | chunks | BASE TABLE | +| public | system | columns | BASE TABLE | +| public | system | chunk_columns | BASE TABLE | +| public | system | operations | BASE TABLE | +| public | information_schema | tables | VIEW | +| public | information_schema | columns | VIEW | ++---------------+--------------------+---------------+------------+ diff --git a/query_tests/cases/in/all_chunks_dropped.sql b/query_tests/cases/in/all_chunks_dropped.sql new file mode 100644 index 0000000000..ff66029f2e --- /dev/null +++ b/query_tests/cases/in/all_chunks_dropped.sql @@ -0,0 +1,8 @@ +-- Test for predicate push down explains +-- IOX_SETUP: OneMeasurementAllChunksDropped + +-- list information schema +SELECT * from information_schema.tables; + +-- same but shorter +SHOW TABLES; diff --git a/query_tests/src/cases.rs b/query_tests/src/cases.rs index 87861c16a8..6a4bbf342a 100644 --- a/query_tests/src/cases.rs +++ b/query_tests/src/cases.rs @@ -4,6 +4,20 @@ use std::path::Path; use crate::runner::Runner; +#[tokio::test] +// Tests from "all_chunks_dropped.sql", +async fn test_cases_all_chunks_dropped_sql() { + let input_path = Path::new("cases").join("in").join("all_chunks_dropped.sql"); + let mut runner = Runner::new(); + runner + .run(input_path) + .await + .expect("test failed"); + runner + .flush() + .expect("flush worked"); +} + #[tokio::test] // Tests from "duplicates.sql", async fn test_cases_duplicates_sql() { diff --git a/query_tests/src/runner.rs b/query_tests/src/runner.rs index 16147f4ada..6edf05ce56 100644 --- a/query_tests/src/runner.rs +++ b/query_tests/src/runner.rs @@ -294,6 +294,8 @@ impl Runner { } /// Return output path for input path. +/// +/// This converts `some/prefix/in/foo.sql` (or other file extensions) to `some/prefix/out/foo.out`. fn make_output_path(input: &Path) -> Result { let stem = input.file_stem().context(NoFileStem { path: input })?; @@ -306,6 +308,10 @@ fn make_output_path(input: &Path) -> Result { out.push("out"); // set file name and ext + // The PathBuf API is somewhat confusing: `set_file_name` will replace the last component (which at this point is + // the "out"). However we wanna create a file out of the stem and the extension. So as a somewhat hackish + // workaround first push a placeholder that is then replaced. + out.push("placeholder"); out.set_file_name(stem); out.set_extension("out"); @@ -417,8 +423,11 @@ SELECT * from disk; let in_dir = dir.path().join("in"); std::fs::create_dir(&in_dir).expect("create in-dir"); + let out_dir = dir.path().join("out"); + std::fs::create_dir(&out_dir).expect("create out-dir"); + let mut file = in_dir; - file.set_file_name("foo.sql"); + file.push("foo.sql"); std::fs::write(&file, contents).expect("writing data to temp file"); (dir, file) diff --git a/query_tests/src/scenarios.rs b/query_tests/src/scenarios.rs index 7c31dfe445..9e83c62608 100644 --- a/query_tests/src/scenarios.rs +++ b/query_tests/src/scenarios.rs @@ -49,6 +49,7 @@ pub fn get_all_setups() -> &'static HashMap> { register_setup!(TwoMeasurements), register_setup!(TwoMeasurementsPredicatePushDown), register_setup!(OneMeasurementThreeChunksWithDuplicates), + register_setup!(OneMeasurementAllChunksDropped), ] .into_iter() .map(|(name, setup)| (name.to_string(), setup as Arc)) @@ -676,6 +677,34 @@ pub(crate) async fn make_one_chunk_rub_scenario( vec![scenario] } +/// This creates two chunks but then drops them all. This should keep the tables. +#[derive(Debug)] +pub struct OneMeasurementAllChunksDropped {} +#[async_trait] +impl DbSetup for OneMeasurementAllChunksDropped { + async fn make(&self) -> Vec { + let db = make_db().await.db; + + let partition_key = "1970-01-01T00"; + let table_name = "h2o"; + + let lp_lines = vec!["h2o,state=MA temp=70.4 50"]; + write_lp(&db, &lp_lines.join("\n")).await; + db.rollover_partition(table_name, partition_key) + .await + .unwrap(); + db.move_chunk_to_read_buffer(table_name, partition_key, 0) + .await + .unwrap(); + db.drop_chunk(table_name, partition_key, 0).unwrap(); + + vec![DbScenario { + scenario_name: "one measurement but all chunks are dropped".into(), + db, + }] + } +} + /// This function loads one chunk of lp data into different scenarios that simulates /// the data life cycle. /// diff --git a/server/src/db.rs b/server/src/db.rs index 28e0fb607b..0368661712 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -26,6 +26,7 @@ use data_types::{ use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider}; use entry::{Entry, SequencedEntry}; use futures::{stream::BoxStream, StreamExt}; +use internal_types::schema::Schema; use metrics::KeyValue; use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; use object_store::{path::parsed::DirsAndFileName, ObjectStore}; @@ -850,6 +851,10 @@ impl QueryDatabase for Db { fn chunk_summaries(&self) -> Result> { self.catalog_access.chunk_summaries() } + + fn table_schema(&self, table_name: &str) -> Option> { + self.catalog_access.table_schema(table_name) + } } /// Convenience implementation of `CatalogProvider` so the rest of the @@ -980,6 +985,7 @@ mod tests { convert::TryFrom, iter::Iterator, num::{NonZeroU64, NonZeroUsize}, + ops::Deref, str, time::{Duration, Instant}, }; @@ -2831,16 +2837,9 @@ mod tests { assert_eq!(paths_actual, paths_expected); // ==================== do: remember table schema ==================== - let mut table_schemas: HashMap = Default::default(); + let mut table_schemas: HashMap> = Default::default(); for (table_name, _partition_key, _chunk_id) in &chunks { - // TODO: use official `db.table_schema` interface later - let schema = db - .catalog - .table(table_name) - .unwrap() - .schema() - .read() - .clone(); + let schema = db.table_schema(table_name).unwrap(); table_schemas.insert(table_name.clone(), schema); } @@ -2868,16 +2867,9 @@ mod tests { } )); } - for (table_name, schema) in table_schemas { - // TODO: use official `db.table_schema` interface later - let schema2 = db - .catalog - .table(table_name) - .unwrap() - .schema() - .read() - .clone(); - assert_eq!(schema2, schema); + for (table_name, schema) in &table_schemas { + let schema2 = db.table_schema(table_name).unwrap(); + assert_eq!(schema2.deref(), schema.deref()); } // ==================== check: DB still writable ==================== diff --git a/server/src/db/access.rs b/server/src/db/access.rs index 8ba4253395..19ded7a481 100644 --- a/server/src/db/access.rs +++ b/server/src/db/access.rs @@ -11,16 +11,17 @@ use super::{ }; use async_trait::async_trait; -use data_types::{chunk_metadata::ChunkSummary, error::ErrorLogger}; +use data_types::chunk_metadata::ChunkSummary; use datafusion::{ catalog::{catalog::CatalogProvider, schema::SchemaProvider}, datasource::TableProvider, }; +use internal_types::schema::Schema; use metrics::{Counter, KeyValue, MetricRegistry}; use observability_deps::tracing::debug; use query::{ predicate::{Predicate, PredicateBuilder}, - provider::{self, ChunkPruner, ProviderBuilder}, + provider::{ChunkPruner, ProviderBuilder}, QueryChunk, QueryChunkMeta, DEFAULT_SCHEMA, }; use system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA}; @@ -195,6 +196,13 @@ impl QueryDatabase for QueryCatalogAccess { fn chunk_summaries(&self) -> Result> { Ok(self.catalog.chunk_summaries()) } + + fn table_schema(&self, table_name: &str) -> Option> { + self.catalog + .table(table_name) + .ok() + .map(|table| Arc::clone(&table.schema().read())) + } } // Datafusion catalog provider interface @@ -249,27 +257,23 @@ impl SchemaProvider for DbSchemaProvider { /// Create a table provider for the named table fn table(&self, table_name: &str) -> Option> { - let mut builder = ProviderBuilder::new(table_name); + let schema = { + let table = self.catalog.table(table_name).ok()?; + Arc::clone(&table.schema().read()) + }; + + let mut builder = ProviderBuilder::new(table_name, schema); builder = builder.add_pruner(Arc::clone(&self.chunk_access) as Arc>); let predicate = PredicateBuilder::new().table(table_name).build(); for chunk in self.chunk_access.candidate_chunks(&predicate) { - // This is unfortunate - a table with incompatible chunks ceases to - // be visible to the query engine - // - // It is also potentially ill-formed as continuing to use the builder - // after it has errored may not yield entirely sensible results - builder = builder - .add_chunk(chunk) - .log_if_error("Adding chunks to table") - .ok()?; + builder = builder.add_chunk(chunk); } match builder.build() { Ok(provider) => Some(Arc::new(provider)), - Err(provider::Error::InternalNoRowsInTable { .. }) => None, Err(e) => panic!("unexpected error: {:?}", e), } } diff --git a/server/src/db/catalog.rs b/server/src/db/catalog.rs index e7e7fe8ab6..6a06b06731 100644 --- a/server/src/db/catalog.rs +++ b/server/src/db/catalog.rs @@ -199,7 +199,7 @@ impl Catalog { &self, table_name: impl AsRef, partition_key: impl AsRef, - ) -> (Arc>, Arc>) { + ) -> (Arc>, Arc>>) { let mut tables = self.tables.write(); let (_, table) = tables .raw_entry_mut() diff --git a/server/src/db/catalog/chunk.rs b/server/src/db/catalog/chunk.rs index 9367745505..b2f03e37c2 100644 --- a/server/src/db/catalog/chunk.rs +++ b/server/src/db/catalog/chunk.rs @@ -271,7 +271,7 @@ impl CatalogChunk { pub(super) fn new_rub_chunk( addr: ChunkAddr, chunk: read_buffer::RBChunk, - schema: Schema, + schema: Arc, metrics: ChunkMetrics, ) -> Self { // TODO: Move RUB to single table (#1295) @@ -282,7 +282,7 @@ impl CatalogChunk { let stage = ChunkStage::Frozen { meta: Arc::new(ChunkMetadata { table_summary: Arc::new(summary), - schema: Arc::new(schema), + schema, }), representation: ChunkStageFrozenRepr::ReadBuffer(Arc::new(chunk)), }; @@ -603,7 +603,7 @@ impl CatalogChunk { /// Set the chunk in the Moved state, setting the underlying storage handle to db, and /// discarding the underlying mutable buffer storage. - pub fn set_moved(&mut self, chunk: Arc, schema: Schema) -> Result<()> { + pub fn set_moved(&mut self, chunk: Arc, schema: Arc) -> Result<()> { match &mut self.stage { ChunkStage::Frozen { meta, @@ -613,7 +613,7 @@ impl CatalogChunk { // after moved, the chunk is sorted and its schema needs to get updated *meta = Arc::new(ChunkMetadata { table_summary: Arc::clone(&meta.table_summary), - schema: Arc::new(schema), + schema, }); match &representation { diff --git a/server/src/db/catalog/partition.rs b/server/src/db/catalog/partition.rs index 871b3cac50..761514f86a 100644 --- a/server/src/db/catalog/partition.rs +++ b/server/src/db/catalog/partition.rs @@ -166,7 +166,7 @@ impl Partition { pub fn create_rub_chunk( &mut self, chunk: read_buffer::RBChunk, - schema: Schema, + schema: Arc, ) -> Arc> { let chunk_id = self.next_chunk_id; assert_ne!(self.next_chunk_id, u32::MAX, "Chunk ID Overflow"); diff --git a/server/src/db/catalog/table.rs b/server/src/db/catalog/table.rs index b91d8a9724..dec44152df 100644 --- a/server/src/db/catalog/table.rs +++ b/server/src/db/catalog/table.rs @@ -26,7 +26,11 @@ pub struct Table { metrics: TableMetrics, /// Table-wide schema. - schema: Arc>, + /// + /// Notes on the type: + /// - the outer `Arc>` so so that we can reference the locked schema w/o a lifetime to the table + /// - the inner `Arc` is a schema that we don't need to copy when moving it around the query stack + schema: Arc>>, } impl Table { @@ -40,7 +44,7 @@ impl Table { let mut builder = SchemaBuilder::new(); builder.measurement(table_name.as_ref()); let schema = builder.build().expect("cannot build empty schema"); - let schema = Arc::new(metrics.new_table_lock(schema)); + let schema = Arc::new(metrics.new_table_lock(Arc::new(schema))); Self { db_name, @@ -93,7 +97,7 @@ impl Table { self.partitions.values().map(|x| x.read().summary()) } - pub fn schema(&self) -> Arc> { + pub fn schema(&self) -> Arc>> { Arc::clone(&self.schema) } } @@ -104,12 +108,12 @@ impl Table { enum TableSchemaUpsertHandleInner<'a> { /// Schema will not be changed. NoChange { - table_schema_read: RwLockReadGuard<'a, Schema>, + table_schema_read: RwLockReadGuard<'a, Arc>, }, /// Schema might change (if write to mutable buffer is successfull). MightChange { - table_schema_write: RwLockWriteGuard<'a, Schema>, + table_schema_write: RwLockWriteGuard<'a, Arc>, merged_schema: Schema, }, } @@ -122,7 +126,7 @@ pub struct TableSchemaUpsertHandle<'a> { impl<'a> TableSchemaUpsertHandle<'a> { pub(crate) fn new( - table_schema: &'a RwLock, + table_schema: &'a RwLock>, new_schema: &Schema, ) -> Result { // Be optimistic and only get a read lock. It is rather rare that the schema will change when new data arrives @@ -134,7 +138,7 @@ impl<'a> TableSchemaUpsertHandle<'a> { let merged_schema = Self::try_merge(&table_schema_read, new_schema)?; // Now check if this would actually change the schema: - if &merged_schema == table_schema_read.deref() { + if &merged_schema == table_schema_read.deref().deref() { // Optimism payed off and we get away we the read lock. Ok(Self { inner: TableSchemaUpsertHandleInner::NoChange { table_schema_read }, @@ -181,7 +185,7 @@ impl<'a> TableSchemaUpsertHandle<'a> { merged_schema, } => { // Commit new schema and drop write guard; - *table_schema_write = merged_schema; + *table_schema_write = Arc::new(merged_schema); drop(table_schema_write); } } @@ -204,7 +208,7 @@ mod tests { .influx_column("tag2", InfluxColumnType::Tag) .build() .unwrap(); - let table_schema = lock_tracker.new_lock(table_schema_orig.clone()); + let table_schema = lock_tracker.new_lock(Arc::new(table_schema_orig.clone())); // writing with the same schema must not trigger a change let schema1 = SchemaBuilder::new() @@ -218,9 +222,9 @@ mod tests { handle.inner, TableSchemaUpsertHandleInner::NoChange { .. } )); - assert_eq!(table_schema.read().deref(), &table_schema_orig); + assert_eq!(table_schema.read().deref().deref(), &table_schema_orig); handle.commit(); - assert_eq!(table_schema.read().deref(), &table_schema_orig); + assert_eq!(table_schema.read().deref().deref(), &table_schema_orig); // writing with different column order must not trigger a change let schema2 = SchemaBuilder::new() @@ -234,9 +238,9 @@ mod tests { handle.inner, TableSchemaUpsertHandleInner::NoChange { .. } )); - assert_eq!(table_schema.read().deref(), &table_schema_orig); + assert_eq!(table_schema.read().deref().deref(), &table_schema_orig); handle.commit(); - assert_eq!(table_schema.read().deref(), &table_schema_orig); + assert_eq!(table_schema.read().deref().deref(), &table_schema_orig); // writing with a column subset must not trigger a change let schema3 = SchemaBuilder::new() @@ -249,9 +253,9 @@ mod tests { handle.inner, TableSchemaUpsertHandleInner::NoChange { .. } )); - assert_eq!(table_schema.read().deref(), &table_schema_orig); + assert_eq!(table_schema.read().deref().deref(), &table_schema_orig); handle.commit(); - assert_eq!(table_schema.read().deref(), &table_schema_orig); + assert_eq!(table_schema.read().deref().deref(), &table_schema_orig); } #[test] @@ -263,7 +267,7 @@ mod tests { .influx_column("tag2", InfluxColumnType::Tag) .build() .unwrap(); - let table_schema = lock_tracker.new_lock(table_schema_orig); + let table_schema = lock_tracker.new_lock(Arc::new(table_schema_orig)); let new_schema = SchemaBuilder::new() .measurement("m1") @@ -288,7 +292,7 @@ mod tests { .influx_column("tag3", InfluxColumnType::Tag) .build() .unwrap(); - assert_eq!(table_schema.read().deref(), &table_schema_expected); + assert_eq!(table_schema.read().deref().deref(), &table_schema_expected); } #[test] @@ -300,7 +304,7 @@ mod tests { .influx_column("tag2", InfluxColumnType::Tag) .build() .unwrap(); - let table_schema = lock_tracker.new_lock(table_schema_orig.clone()); + let table_schema = lock_tracker.new_lock(Arc::new(table_schema_orig.clone())); let schema1 = SchemaBuilder::new() .measurement("m1") @@ -311,6 +315,6 @@ mod tests { assert!(TableSchemaUpsertHandle::new(&table_schema, &schema1).is_err()); // schema did not change - assert_eq!(table_schema.read().deref(), &table_schema_orig); + assert_eq!(table_schema.read().deref().deref(), &table_schema_orig); } } diff --git a/server/src/db/lifecycle/compact.rs b/server/src/db/lifecycle/compact.rs index 51604298e5..817ac5c638 100644 --- a/server/src/db/lifecycle/compact.rs +++ b/server/src/db/lifecycle/compact.rs @@ -4,6 +4,7 @@ use std::future::Future; use std::sync::Arc; use data_types::job::Job; +use internal_types::schema::merge::SchemaMerger; use lifecycle::LifecycleWriteGuard; use observability_deps::tracing::info; use query::exec::ExecutorType; @@ -58,6 +59,17 @@ pub(crate) fn compact_chunks( }) .collect::>>()?; + // build schema + // Note: we only use the merged schema from the to-be-compacted chunks - not the table-wide schema, since we don't + // need to bother with other columns (e.g. ones that only exist in other partitions). + let mut merger = SchemaMerger::new(); + for db_chunk in &query_chunks { + merger = merger + .merge(&db_chunk.schema()) + .expect("schemas compatible"); + } + let schema = Arc::new(merger.build()); + // drop partition lock let partition = partition.unwrap().partition; @@ -79,7 +91,7 @@ pub(crate) fn compact_chunks( // Cannot move query_chunks as the sort key borrows the column names let (schema, plan) = - ReorgPlanner::new().compact_plan(query_chunks.iter().map(Arc::clone), key)?; + ReorgPlanner::new().compact_plan(schema, query_chunks.iter().map(Arc::clone), key)?; let physical_plan = ctx.prepare_plan(&plan)?; let stream = ctx.execute(physical_plan).await?; diff --git a/server/src/db/lifecycle/move_chunk.rs b/server/src/db/lifecycle/move_chunk.rs index 27eaa4d13c..e3abefd713 100644 --- a/server/src/db/lifecycle/move_chunk.rs +++ b/server/src/db/lifecycle/move_chunk.rs @@ -37,7 +37,11 @@ pub fn move_chunk_to_read_buffer( let table_summary = guard.table_summary(); // snapshot the data - let query_chunks = vec![DbChunk::snapshot(&*guard)]; + // Note: we can just use the chunk-specific schema here since there is only a single chunk and this is somewhat a + // local operation that should only need to deal with the columns that are really present. + let db_chunk = DbChunk::snapshot(&*guard); + let schema = db_chunk.schema(); + let query_chunks = vec![db_chunk]; // Drop locks let chunk = guard.unwrap().chunk; @@ -52,7 +56,7 @@ pub fn move_chunk_to_read_buffer( // Cannot move query_chunks as the sort key borrows the column names let (schema, plan) = - ReorgPlanner::new().compact_plan(query_chunks.iter().map(Arc::clone), key)?; + ReorgPlanner::new().compact_plan(schema, query_chunks.iter().map(Arc::clone), key)?; let physical_plan = ctx.prepare_plan(&plan)?; let stream = ctx.execute(physical_plan).await?;