Merge pull request #1929 from influxdata/crepererum/fix_query_schema_unwrap

refactor: pass schema arcs from catalog to query engine (instead of creating them on-demand)
pull/24376/head
kodiakhq[bot] 2021-07-09 07:53:16 +00:00 committed by GitHub
commit 34dcd991d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 292 additions and 145 deletions

View File

@ -16,7 +16,7 @@ use internal_types::{
selection::Selection, selection::Selection,
}; };
use observability_deps::tracing::{debug, trace}; use observability_deps::tracing::{debug, trace};
use snafu::{ensure, ResultExt, Snafu}; use snafu::{ensure, OptionExt, ResultExt, Snafu};
use crate::{ use crate::{
exec::{field::FieldColumns, make_schema_pivot}, exec::{field::FieldColumns, make_schema_pivot},
@ -141,6 +141,9 @@ pub enum Error {
#[snafu(display("Internal error: aggregate {:?} is not a selector", agg))] #[snafu(display("Internal error: aggregate {:?} is not a selector", agg))]
InternalAggregateNotSelector { agg: Aggregate }, InternalAggregateNotSelector { agg: Aggregate },
#[snafu(display("Table was removed while planning query: {}", table_name))]
TableRemoved { table_name: String },
} }
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -301,7 +304,10 @@ impl InfluxRpcPlanner {
// were already known to have data (based on the contents of known_columns) // were already known to have data (based on the contents of known_columns)
for (table_name, chunks) in need_full_plans.into_iter() { for (table_name, chunks) in need_full_plans.into_iter() {
let plan = self.tag_keys_plan(&table_name, &predicate, chunks)?; let schema = database.table_schema(&table_name).context(TableRemoved {
table_name: &table_name,
})?;
let plan = self.tag_keys_plan(&table_name, schema, &predicate, chunks)?;
if let Some(plan) = plan { if let Some(plan) = plan {
builder = builder.append(plan) builder = builder.append(plan)
@ -422,7 +428,10 @@ impl InfluxRpcPlanner {
// time in `known_columns`, and some tables in chunks that we // time in `known_columns`, and some tables in chunks that we
// need to run a plan to find what values pass the predicate. // need to run a plan to find what values pass the predicate.
for (table_name, chunks) in need_full_plans.into_iter() { for (table_name, chunks) in need_full_plans.into_iter() {
let scan_and_filter = self.scan_and_filter(&table_name, &predicate, chunks)?; let schema = database.table_schema(&table_name).context(TableRemoved {
table_name: &table_name,
})?;
let scan_and_filter = self.scan_and_filter(&table_name, schema, &predicate, chunks)?;
// if we have any data to scan, make a plan! // if we have any data to scan, make a plan!
if let Some(TableScanAndFilter { if let Some(TableScanAndFilter {
@ -483,7 +492,10 @@ impl InfluxRpcPlanner {
let mut field_list_plan = FieldListPlan::new(); let mut field_list_plan = FieldListPlan::new();
for (table_name, chunks) in table_chunks { for (table_name, chunks) in table_chunks {
if let Some(plan) = self.field_columns_plan(&table_name, &predicate, chunks)? { let schema = database.table_schema(&table_name).context(TableRemoved {
table_name: &table_name,
})?;
if let Some(plan) = self.field_columns_plan(&table_name, schema, &predicate, chunks)? {
field_list_plan = field_list_plan.append(plan); field_list_plan = field_list_plan.append(plan);
} }
} }
@ -524,8 +536,12 @@ impl InfluxRpcPlanner {
let mut ss_plans = Vec::with_capacity(table_chunks.len()); let mut ss_plans = Vec::with_capacity(table_chunks.len());
for (table_name, chunks) in table_chunks { for (table_name, chunks) in table_chunks {
let prefix_columns: Option<&[&str]> = None; let prefix_columns: Option<&[&str]> = None;
let schema = database.table_schema(&table_name).context(TableRemoved {
table_name: &table_name,
})?;
let ss_plan = self.read_filter_plan(table_name, prefix_columns, &predicate, chunks)?; let ss_plan =
self.read_filter_plan(table_name, schema, prefix_columns, &predicate, chunks)?;
// If we have to do real work, add it to the list of plans // If we have to do real work, add it to the list of plans
if let Some(ss_plan) = ss_plan { if let Some(ss_plan) = ss_plan {
ss_plans.push(ss_plan); ss_plans.push(ss_plan);
@ -559,11 +575,25 @@ impl InfluxRpcPlanner {
// now, build up plans for each table // now, build up plans for each table
let mut ss_plans = Vec::with_capacity(table_chunks.len()); let mut ss_plans = Vec::with_capacity(table_chunks.len());
for (table_name, chunks) in table_chunks { for (table_name, chunks) in table_chunks {
let schema = database.table_schema(&table_name).context(TableRemoved {
table_name: &table_name,
})?;
let ss_plan = match agg { let ss_plan = match agg {
Aggregate::None => { Aggregate::None => self.read_filter_plan(
self.read_filter_plan(table_name, Some(group_columns), &predicate, chunks)? table_name,
} Arc::clone(&schema),
_ => self.read_group_plan(table_name, &predicate, agg, group_columns, chunks)?, Some(group_columns),
&predicate,
chunks,
)?,
_ => self.read_group_plan(
table_name,
schema,
&predicate,
agg,
group_columns,
chunks,
)?,
}; };
// If we have to do real work, add it to the list of plans // If we have to do real work, add it to the list of plans
@ -604,8 +634,12 @@ impl InfluxRpcPlanner {
// now, build up plans for each table // now, build up plans for each table
let mut ss_plans = Vec::with_capacity(table_chunks.len()); let mut ss_plans = Vec::with_capacity(table_chunks.len());
for (table_name, chunks) in table_chunks { for (table_name, chunks) in table_chunks {
let ss_plan = self let schema = database.table_schema(&table_name).context(TableRemoved {
.read_window_aggregate_plan(table_name, &predicate, agg, &every, &offset, chunks)?; table_name: &table_name,
})?;
let ss_plan = self.read_window_aggregate_plan(
table_name, schema, &predicate, agg, &every, &offset, chunks,
)?;
// If we have to do real work, add it to the list of plans // If we have to do real work, add it to the list of plans
if let Some(ss_plan) = ss_plan { if let Some(ss_plan) = ss_plan {
ss_plans.push(ss_plan); ss_plans.push(ss_plan);
@ -665,13 +699,14 @@ impl InfluxRpcPlanner {
fn tag_keys_plan<C>( fn tag_keys_plan<C>(
&self, &self,
table_name: &str, table_name: &str,
schema: Arc<Schema>,
predicate: &Predicate, predicate: &Predicate,
chunks: Vec<Arc<C>>, chunks: Vec<Arc<C>>,
) -> Result<Option<StringSetPlan>> ) -> Result<Option<StringSetPlan>>
where where
C: QueryChunk + 'static, C: QueryChunk + 'static,
{ {
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?; let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
let TableScanAndFilter { let TableScanAndFilter {
plan_builder, plan_builder,
@ -727,13 +762,14 @@ impl InfluxRpcPlanner {
fn field_columns_plan<C>( fn field_columns_plan<C>(
&self, &self,
table_name: &str, table_name: &str,
schema: Arc<Schema>,
predicate: &Predicate, predicate: &Predicate,
chunks: Vec<Arc<C>>, chunks: Vec<Arc<C>>,
) -> Result<Option<LogicalPlan>> ) -> Result<Option<LogicalPlan>>
where where
C: QueryChunk + 'static, C: QueryChunk + 'static,
{ {
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?; let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
let TableScanAndFilter { let TableScanAndFilter {
plan_builder, plan_builder,
schema, schema,
@ -777,6 +813,7 @@ impl InfluxRpcPlanner {
fn read_filter_plan<C>( fn read_filter_plan<C>(
&self, &self,
table_name: impl AsRef<str>, table_name: impl AsRef<str>,
schema: Arc<Schema>,
prefix_columns: Option<&[impl AsRef<str>]>, prefix_columns: Option<&[impl AsRef<str>]>,
predicate: &Predicate, predicate: &Predicate,
chunks: Vec<Arc<C>>, chunks: Vec<Arc<C>>,
@ -785,7 +822,7 @@ impl InfluxRpcPlanner {
C: QueryChunk + 'static, C: QueryChunk + 'static,
{ {
let table_name = table_name.as_ref(); let table_name = table_name.as_ref();
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?; let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
let TableScanAndFilter { let TableScanAndFilter {
plan_builder, plan_builder,
@ -897,6 +934,7 @@ impl InfluxRpcPlanner {
fn read_group_plan<C>( fn read_group_plan<C>(
&self, &self,
table_name: impl Into<String>, table_name: impl Into<String>,
schema: Arc<Schema>,
predicate: &Predicate, predicate: &Predicate,
agg: Aggregate, agg: Aggregate,
group_columns: &[impl AsRef<str>], group_columns: &[impl AsRef<str>],
@ -906,7 +944,7 @@ impl InfluxRpcPlanner {
C: QueryChunk + 'static, C: QueryChunk + 'static,
{ {
let table_name = table_name.into(); let table_name = table_name.into();
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?; let scan_and_filter = self.scan_and_filter(&table_name, schema, predicate, chunks)?;
let TableScanAndFilter { let TableScanAndFilter {
plan_builder, plan_builder,
@ -984,9 +1022,11 @@ impl InfluxRpcPlanner {
/// GroupBy(gby: tag columns, window_function; agg: aggregate(field)) /// GroupBy(gby: tag columns, window_function; agg: aggregate(field))
/// Filter(predicate) /// Filter(predicate)
/// Scan /// Scan
#[allow(clippy::too_many_arguments)]
fn read_window_aggregate_plan<C>( fn read_window_aggregate_plan<C>(
&self, &self,
table_name: impl Into<String>, table_name: impl Into<String>,
schema: Arc<Schema>,
predicate: &Predicate, predicate: &Predicate,
agg: Aggregate, agg: Aggregate,
every: &WindowDuration, every: &WindowDuration,
@ -997,7 +1037,7 @@ impl InfluxRpcPlanner {
C: QueryChunk + 'static, C: QueryChunk + 'static,
{ {
let table_name = table_name.into(); let table_name = table_name.into();
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?; let scan_and_filter = self.scan_and_filter(&table_name, schema, predicate, chunks)?;
let TableScanAndFilter { let TableScanAndFilter {
plan_builder, plan_builder,
@ -1074,6 +1114,7 @@ impl InfluxRpcPlanner {
fn scan_and_filter<C>( fn scan_and_filter<C>(
&self, &self,
table_name: &str, table_name: &str,
schema: Arc<Schema>,
predicate: &Predicate, predicate: &Predicate,
chunks: Vec<Arc<C>>, chunks: Vec<Arc<C>>,
) -> Result<Option<TableScanAndFilter>> ) -> Result<Option<TableScanAndFilter>>
@ -1085,7 +1126,7 @@ impl InfluxRpcPlanner {
let projection = None; let projection = None;
// Prepare the scan of the table // Prepare the scan of the table
let mut builder = ProviderBuilder::new(table_name); let mut builder = ProviderBuilder::new(table_name, schema);
// Since the entire predicate is used in the call to // Since the entire predicate is used in the call to
// `database.chunks()` there will not be any additional // `database.chunks()` there will not be any additional
@ -1106,9 +1147,7 @@ impl InfluxRpcPlanner {
chunk.id(), chunk.id(),
); );
builder = builder builder = builder.add_chunk(chunk);
.add_chunk(chunk)
.context(CreatingProvider { table_name })?;
} }
let provider = builder.build().context(CreatingProvider { table_name })?; let provider = builder.build().context(CreatingProvider { table_name })?;
@ -1217,7 +1256,7 @@ struct TableScanAndFilter {
/// Represents plan that scans a table and applies optional filtering /// Represents plan that scans a table and applies optional filtering
plan_builder: LogicalPlanBuilder, plan_builder: LogicalPlanBuilder,
/// The IOx schema of the result /// The IOx schema of the result
schema: Schema, schema: Arc<Schema>,
} }
/// Reorders tag_columns so that its prefix matches exactly /// Reorders tag_columns so that its prefix matches exactly

View File

@ -63,9 +63,10 @@ impl ReorgPlanner {
/// (Scan chunks) <-- any needed deduplication happens here /// (Scan chunks) <-- any needed deduplication happens here
pub fn compact_plan<C, I>( pub fn compact_plan<C, I>(
&self, &self,
schema: Arc<Schema>,
chunks: I, chunks: I,
output_sort: SortKey<'_>, output_sort: SortKey<'_>,
) -> Result<(Schema, LogicalPlan)> ) -> Result<(Arc<Schema>, LogicalPlan)>
where where
C: QueryChunk + 'static, C: QueryChunk + 'static,
I: IntoIterator<Item = Arc<C>>, I: IntoIterator<Item = Arc<C>>,
@ -73,13 +74,21 @@ impl ReorgPlanner {
let ScanPlan { let ScanPlan {
plan_builder, plan_builder,
provider, provider,
} = self.scan_and_sort_plan(chunks, output_sort.clone())?; } = self.scan_and_sort_plan(schema, chunks, output_sort.clone())?;
let mut schema = provider.iox_schema(); let mut schema = provider.iox_schema();
// Set the sort_key of the schema to the compacted chunk's sort key // Set the sort_key of the schema to the compacted chunk's sort key
// Try to do this only if the sort key changes so we avoid unnecessary schema copies.
trace!(input_schema=?schema, "Setting sort key on schema"); trace!(input_schema=?schema, "Setting sort key on schema");
schema.set_sort_key(&output_sort); if schema
.sort_key()
.map_or(true, |existing_key| existing_key != output_sort)
{
let mut schema_cloned = schema.as_ref().clone();
schema_cloned.set_sort_key(&output_sort);
schema = Arc::new(schema_cloned);
}
trace!(output_schema=?schema, "Setting sort key on schema"); trace!(output_schema=?schema, "Setting sort key on schema");
let plan = plan_builder.build().context(BuildingPlan)?; let plan = plan_builder.build().context(BuildingPlan)?;
@ -136,10 +145,11 @@ impl ReorgPlanner {
/// ``` /// ```
pub fn split_plan<C, I>( pub fn split_plan<C, I>(
&self, &self,
schema: Arc<Schema>,
chunks: I, chunks: I,
output_sort: SortKey<'_>, output_sort: SortKey<'_>,
split_time: i64, split_time: i64,
) -> Result<(Schema, LogicalPlan)> ) -> Result<(Arc<Schema>, LogicalPlan)>
where where
C: QueryChunk + 'static, C: QueryChunk + 'static,
I: IntoIterator<Item = Arc<C>>, I: IntoIterator<Item = Arc<C>>,
@ -147,7 +157,7 @@ impl ReorgPlanner {
let ScanPlan { let ScanPlan {
plan_builder, plan_builder,
provider, provider,
} = self.scan_and_sort_plan(chunks, output_sort)?; } = self.scan_and_sort_plan(schema, chunks, output_sort)?;
// TODO: Set sort key on schema // TODO: Set sort key on schema
let schema = provider.iox_schema(); let schema = provider.iox_schema();
@ -176,7 +186,12 @@ impl ReorgPlanner {
/// ///
/// (Sort on output_sort) /// (Sort on output_sort)
/// (Scan chunks) <-- any needed deduplication happens here /// (Scan chunks) <-- any needed deduplication happens here
fn scan_and_sort_plan<C, I>(&self, chunks: I, output_sort: SortKey<'_>) -> Result<ScanPlan<C>> fn scan_and_sort_plan<C, I>(
&self,
schema: Arc<Schema>,
chunks: I,
output_sort: SortKey<'_>,
) -> Result<ScanPlan<C>>
where where
C: QueryChunk + 'static, C: QueryChunk + 'static,
I: IntoIterator<Item = Arc<C>>, I: IntoIterator<Item = Arc<C>>,
@ -189,7 +204,7 @@ impl ReorgPlanner {
let table_name = &table_name; let table_name = &table_name;
// Prepare the plan for the table // Prepare the plan for the table
let mut builder = ProviderBuilder::new(table_name); let mut builder = ProviderBuilder::new(table_name, schema);
// There are no predicates in these plans, so no need to prune them // There are no predicates in these plans, so no need to prune them
builder = builder.add_no_op_pruner(); builder = builder.add_no_op_pruner();
@ -203,9 +218,7 @@ impl ReorgPlanner {
chunk.id(), chunk.id(),
); );
builder = builder builder = builder.add_chunk(chunk);
.add_chunk(chunk)
.context(CreatingProvider { table_name })?;
} }
let provider = builder.build().context(CreatingProvider { table_name })?; let provider = builder.build().context(CreatingProvider { table_name })?;
@ -244,16 +257,17 @@ struct ScanPlan<C: QueryChunk + 'static> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use arrow_util::assert_batches_eq; use arrow_util::assert_batches_eq;
use internal_types::schema::sort::SortOptions; use internal_types::schema::{merge::SchemaMerger, sort::SortOptions};
use crate::{ use crate::{
exec::{Executor, ExecutorType}, exec::{Executor, ExecutorType},
test::{raw_data, TestChunk}, test::{raw_data, TestChunk},
QueryChunkMeta,
}; };
use super::*; use super::*;
async fn get_test_chunks() -> Vec<Arc<TestChunk>> { async fn get_test_chunks() -> (Arc<Schema>, Vec<Arc<TestChunk>>) {
// Chunk 1 with 5 rows of data on 2 tags // Chunk 1 with 5 rows of data on 2 tags
let chunk1 = Arc::new( let chunk1 = Arc::new(
TestChunk::new(1) TestChunk::new(1)
@ -298,12 +312,19 @@ mod test {
]; ];
assert_batches_eq!(&expected, &raw_data(&[Arc::clone(&chunk2)]).await); assert_batches_eq!(&expected, &raw_data(&[Arc::clone(&chunk2)]).await);
vec![chunk1, chunk2] let schema = SchemaMerger::new()
.merge(&chunk1.schema())
.unwrap()
.merge(&chunk2.schema())
.unwrap()
.build();
(Arc::new(schema), vec![chunk1, chunk2])
} }
#[tokio::test] #[tokio::test]
async fn test_compact_plan() { async fn test_compact_plan() {
let chunks = get_test_chunks().await; let (schema, chunks) = get_test_chunks().await;
let mut sort_key = SortKey::with_capacity(2); let mut sort_key = SortKey::with_capacity(2);
sort_key.push( sort_key.push(
@ -322,7 +343,7 @@ mod test {
); );
let (_, compact_plan) = ReorgPlanner::new() let (_, compact_plan) = ReorgPlanner::new()
.compact_plan(chunks, sort_key) .compact_plan(schema, chunks, sort_key)
.expect("created compact plan"); .expect("created compact plan");
let executor = Executor::new(1); let executor = Executor::new(1);
@ -363,7 +384,7 @@ mod test {
test_helpers::maybe_start_logging(); test_helpers::maybe_start_logging();
// validate that the plumbing is all hooked up. The logic of // validate that the plumbing is all hooked up. The logic of
// the operator is tested in its own module. // the operator is tested in its own module.
let chunks = get_test_chunks().await; let (schema, chunks) = get_test_chunks().await;
let mut sort_key = SortKey::with_capacity(1); let mut sort_key = SortKey::with_capacity(1);
sort_key.push( sort_key.push(
@ -376,7 +397,7 @@ mod test {
// split on 1000 should have timestamps 1000, 5000, and 7000 // split on 1000 should have timestamps 1000, 5000, and 7000
let (_, split_plan) = ReorgPlanner::new() let (_, split_plan) = ReorgPlanner::new()
.split_plan(chunks, sort_key, 1000) .split_plan(schema, chunks, sort_key, 1000)
.expect("created compact plan"); .expect("created compact plan");
let executor = Executor::new(1); let executor = Executor::new(1);

View File

@ -55,6 +55,9 @@ pub trait QueryDatabase: Debug + Send + Sync {
/// Return the partition keys for data in this DB /// Return the partition keys for data in this DB
fn partition_keys(&self) -> Result<Vec<String>, Self::Error>; fn partition_keys(&self) -> Result<Vec<String>, Self::Error>;
/// Schema for a specific table if the table exists.
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>>;
/// Returns a set of chunks within the partition with data that may match /// Returns a set of chunks within the partition with data that may match
/// the provided predicate. If possible, chunks which have no rows that can /// the provided predicate. If possible, chunks which have no rows that can
/// possibly match the predicate may be omitted. /// possibly match the predicate may be omitted.

View File

@ -40,26 +40,12 @@ use self::{
#[derive(Debug, Snafu)] #[derive(Debug, Snafu)]
pub enum Error { pub enum Error {
#[snafu(display("Chunk schema not compatible for table '{}': {}", table_name, source))]
ChunkSchemaNotCompatible {
table_name: String,
source: internal_types::schema::merge::Error,
},
#[snafu(display( #[snafu(display(
"Internal error: no chunk pruner provided to builder for {}", "Internal error: no chunk pruner provided to builder for {}",
table_name, table_name,
))] ))]
InternalNoChunkPruner { table_name: String }, InternalNoChunkPruner { table_name: String },
#[snafu(display("Internal error: No rows found in table '{}'", table_name))]
InternalNoRowsInTable { table_name: String },
#[snafu(display("Internal error: Cannot verify the push-down predicate '{}'", source,))]
InternalPushdownPredicate {
source: datafusion::error::DataFusionError,
},
#[snafu(display("Internal error: Cannot create projection select expr '{}'", source,))] #[snafu(display("Internal error: Cannot create projection select expr '{}'", source,))]
InternalSelectExpr { InternalSelectExpr {
source: datafusion::error::DataFusionError, source: datafusion::error::DataFusionError,
@ -106,35 +92,25 @@ pub trait ChunkPruner<C: QueryChunk>: Sync + Send + std::fmt::Debug {
#[derive(Debug)] #[derive(Debug)]
pub struct ProviderBuilder<C: QueryChunk + 'static> { pub struct ProviderBuilder<C: QueryChunk + 'static> {
table_name: Arc<str>, table_name: Arc<str>,
schema_merger: SchemaMerger, schema: Arc<Schema>,
chunk_pruner: Option<Arc<dyn ChunkPruner<C>>>, chunk_pruner: Option<Arc<dyn ChunkPruner<C>>>,
chunks: Vec<Arc<C>>, chunks: Vec<Arc<C>>,
} }
impl<C: QueryChunk> ProviderBuilder<C> { impl<C: QueryChunk> ProviderBuilder<C> {
pub fn new(table_name: impl AsRef<str>) -> Self { pub fn new(table_name: impl AsRef<str>, schema: Arc<Schema>) -> Self {
Self { Self {
table_name: Arc::from(table_name.as_ref()), table_name: Arc::from(table_name.as_ref()),
schema_merger: SchemaMerger::new(), schema,
chunk_pruner: None, chunk_pruner: None,
chunks: Vec::new(), chunks: Vec::new(),
} }
} }
/// Add a new chunk to this provider /// Add a new chunk to this provider
pub fn add_chunk(mut self, chunk: Arc<C>) -> Result<Self> { pub fn add_chunk(mut self, chunk: Arc<C>) -> Self {
let chunk_table_schema = chunk.schema();
self.schema_merger = self
.schema_merger
.merge(&chunk_table_schema.as_ref())
.context(ChunkSchemaNotCompatible {
table_name: self.table_name.as_ref(),
})?;
self.chunks.push(chunk); self.chunks.push(chunk);
self
Ok(self)
} }
/// Specify a `ChunkPruner` for the provider that will apply /// Specify a `ChunkPruner` for the provider that will apply
@ -161,16 +137,6 @@ impl<C: QueryChunk> ProviderBuilder<C> {
/// Create the Provider /// Create the Provider
pub fn build(self) -> Result<ChunkTableProvider<C>> { pub fn build(self) -> Result<ChunkTableProvider<C>> {
let iox_schema = self.schema_merger.build();
// if the table was reported to exist, it should not be empty
if self.chunks.is_empty() {
return InternalNoRowsInTable {
table_name: self.table_name.as_ref(),
}
.fail();
}
let chunk_pruner = match self.chunk_pruner { let chunk_pruner = match self.chunk_pruner {
Some(chunk_pruner) => chunk_pruner, Some(chunk_pruner) => chunk_pruner,
None => { None => {
@ -182,7 +148,7 @@ impl<C: QueryChunk> ProviderBuilder<C> {
}; };
Ok(ChunkTableProvider { Ok(ChunkTableProvider {
iox_schema, iox_schema: self.schema,
chunk_pruner, chunk_pruner,
table_name: self.table_name, table_name: self.table_name,
chunks: self.chunks, chunks: self.chunks,
@ -198,7 +164,7 @@ impl<C: QueryChunk> ProviderBuilder<C> {
pub struct ChunkTableProvider<C: QueryChunk + 'static> { pub struct ChunkTableProvider<C: QueryChunk + 'static> {
table_name: Arc<str>, table_name: Arc<str>,
/// The IOx schema (wrapper around Arrow Schemaref) for this table /// The IOx schema (wrapper around Arrow Schemaref) for this table
iox_schema: Schema, iox_schema: Arc<Schema>,
/// Something that can prune chunks /// Something that can prune chunks
chunk_pruner: Arc<dyn ChunkPruner<C>>, chunk_pruner: Arc<dyn ChunkPruner<C>>,
// The chunks // The chunks
@ -207,8 +173,8 @@ pub struct ChunkTableProvider<C: QueryChunk + 'static> {
impl<C: QueryChunk + 'static> ChunkTableProvider<C> { impl<C: QueryChunk + 'static> ChunkTableProvider<C> {
/// Return the IOx schema view for the data provided by this provider /// Return the IOx schema view for the data provided by this provider
pub fn iox_schema(&self) -> Schema { pub fn iox_schema(&self) -> Arc<Schema> {
self.iox_schema.clone() Arc::clone(&self.iox_schema)
} }
/// Return the Arrow schema view for the data provided by this provider /// Return the Arrow schema view for the data provided by this provider
@ -255,8 +221,8 @@ impl<C: QueryChunk + 'static> TableProvider for ChunkTableProvider<C> {
// Figure out the schema of the requested output // Figure out the schema of the requested output
let scan_schema = match projection { let scan_schema = match projection {
Some(indicies) => self.iox_schema.select_by_indices(indicies), Some(indicies) => Arc::new(self.iox_schema.select_by_indices(indicies)),
None => self.iox_schema.clone(), None => Arc::clone(&self.iox_schema),
}; };
// This debug shows the self.arrow_schema() includes all columns in all chunks // This debug shows the self.arrow_schema() includes all columns in all chunks
@ -266,7 +232,7 @@ impl<C: QueryChunk + 'static> TableProvider for ChunkTableProvider<C> {
let mut deduplicate = Deduplicater::new(); let mut deduplicate = Deduplicater::new();
let plan = deduplicate.build_scan_plan( let plan = deduplicate.build_scan_plan(
Arc::clone(&self.table_name), Arc::clone(&self.table_name),
Arc::new(scan_schema), scan_schema,
chunks, chunks,
predicate, predicate,
)?; )?;

View File

@ -116,6 +116,23 @@ impl QueryDatabase for TestDatabase {
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>, Self::Error> { fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>, Self::Error> {
unimplemented!("summaries not implemented TestDatabase") unimplemented!("summaries not implemented TestDatabase")
} }
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
let mut merger = SchemaMerger::new();
let mut found_one = false;
let partitions = self.partitions.lock();
for partition in partitions.values() {
for chunk in partition.values() {
if chunk.table_name() == table_name {
merger = merger.merge(&chunk.schema()).expect("consistent schemas");
found_one = true;
}
}
}
found_one.then(|| Arc::new(merger.build()))
}
} }
#[derive(Debug, Default)] #[derive(Debug, Default)]

View File

@ -0,0 +1,25 @@
-- Test Setup: OneMeasurementAllChunksDropped
-- SQL: SELECT * from information_schema.tables;
+---------------+--------------------+---------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+---------------+------------+
| public | iox | h2o | BASE TABLE |
| public | system | chunks | BASE TABLE |
| public | system | columns | BASE TABLE |
| public | system | chunk_columns | BASE TABLE |
| public | system | operations | BASE TABLE |
| public | information_schema | tables | VIEW |
| public | information_schema | columns | VIEW |
+---------------+--------------------+---------------+------------+
-- SQL: SHOW TABLES;
+---------------+--------------------+---------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+---------------+------------+
| public | iox | h2o | BASE TABLE |
| public | system | chunks | BASE TABLE |
| public | system | columns | BASE TABLE |
| public | system | chunk_columns | BASE TABLE |
| public | system | operations | BASE TABLE |
| public | information_schema | tables | VIEW |
| public | information_schema | columns | VIEW |
+---------------+--------------------+---------------+------------+

View File

@ -0,0 +1,8 @@
-- Test for predicate push down explains
-- IOX_SETUP: OneMeasurementAllChunksDropped
-- list information schema
SELECT * from information_schema.tables;
-- same but shorter
SHOW TABLES;

View File

@ -4,6 +4,20 @@
use std::path::Path; use std::path::Path;
use crate::runner::Runner; use crate::runner::Runner;
#[tokio::test]
// Tests from "all_chunks_dropped.sql",
async fn test_cases_all_chunks_dropped_sql() {
let input_path = Path::new("cases").join("in").join("all_chunks_dropped.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test] #[tokio::test]
// Tests from "duplicates.sql", // Tests from "duplicates.sql",
async fn test_cases_duplicates_sql() { async fn test_cases_duplicates_sql() {

View File

@ -294,6 +294,8 @@ impl<W: Write> Runner<W> {
} }
/// Return output path for input path. /// Return output path for input path.
///
/// This converts `some/prefix/in/foo.sql` (or other file extensions) to `some/prefix/out/foo.out`.
fn make_output_path(input: &Path) -> Result<PathBuf> { fn make_output_path(input: &Path) -> Result<PathBuf> {
let stem = input.file_stem().context(NoFileStem { path: input })?; let stem = input.file_stem().context(NoFileStem { path: input })?;
@ -306,6 +308,10 @@ fn make_output_path(input: &Path) -> Result<PathBuf> {
out.push("out"); out.push("out");
// set file name and ext // set file name and ext
// The PathBuf API is somewhat confusing: `set_file_name` will replace the last component (which at this point is
// the "out"). However we wanna create a file out of the stem and the extension. So as a somewhat hackish
// workaround first push a placeholder that is then replaced.
out.push("placeholder");
out.set_file_name(stem); out.set_file_name(stem);
out.set_extension("out"); out.set_extension("out");
@ -417,8 +423,11 @@ SELECT * from disk;
let in_dir = dir.path().join("in"); let in_dir = dir.path().join("in");
std::fs::create_dir(&in_dir).expect("create in-dir"); std::fs::create_dir(&in_dir).expect("create in-dir");
let out_dir = dir.path().join("out");
std::fs::create_dir(&out_dir).expect("create out-dir");
let mut file = in_dir; let mut file = in_dir;
file.set_file_name("foo.sql"); file.push("foo.sql");
std::fs::write(&file, contents).expect("writing data to temp file"); std::fs::write(&file, contents).expect("writing data to temp file");
(dir, file) (dir, file)

View File

@ -49,6 +49,7 @@ pub fn get_all_setups() -> &'static HashMap<String, Arc<dyn DbSetup>> {
register_setup!(TwoMeasurements), register_setup!(TwoMeasurements),
register_setup!(TwoMeasurementsPredicatePushDown), register_setup!(TwoMeasurementsPredicatePushDown),
register_setup!(OneMeasurementThreeChunksWithDuplicates), register_setup!(OneMeasurementThreeChunksWithDuplicates),
register_setup!(OneMeasurementAllChunksDropped),
] ]
.into_iter() .into_iter()
.map(|(name, setup)| (name.to_string(), setup as Arc<dyn DbSetup>)) .map(|(name, setup)| (name.to_string(), setup as Arc<dyn DbSetup>))
@ -676,6 +677,34 @@ pub(crate) async fn make_one_chunk_rub_scenario(
vec![scenario] vec![scenario]
} }
/// This creates two chunks but then drops them all. This should keep the tables.
#[derive(Debug)]
pub struct OneMeasurementAllChunksDropped {}
#[async_trait]
impl DbSetup for OneMeasurementAllChunksDropped {
async fn make(&self) -> Vec<DbScenario> {
let db = make_db().await.db;
let partition_key = "1970-01-01T00";
let table_name = "h2o";
let lp_lines = vec!["h2o,state=MA temp=70.4 50"];
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await
.unwrap();
db.drop_chunk(table_name, partition_key, 0).unwrap();
vec![DbScenario {
scenario_name: "one measurement but all chunks are dropped".into(),
db,
}]
}
}
/// This function loads one chunk of lp data into different scenarios that simulates /// This function loads one chunk of lp data into different scenarios that simulates
/// the data life cycle. /// the data life cycle.
/// ///

View File

@ -26,6 +26,7 @@ use data_types::{
use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider}; use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
use entry::{Entry, SequencedEntry}; use entry::{Entry, SequencedEntry};
use futures::{stream::BoxStream, StreamExt}; use futures::{stream::BoxStream, StreamExt};
use internal_types::schema::Schema;
use metrics::KeyValue; use metrics::KeyValue;
use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk}; use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use object_store::{path::parsed::DirsAndFileName, ObjectStore}; use object_store::{path::parsed::DirsAndFileName, ObjectStore};
@ -850,6 +851,10 @@ impl QueryDatabase for Db {
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>> { fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>> {
self.catalog_access.chunk_summaries() self.catalog_access.chunk_summaries()
} }
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
self.catalog_access.table_schema(table_name)
}
} }
/// Convenience implementation of `CatalogProvider` so the rest of the /// Convenience implementation of `CatalogProvider` so the rest of the
@ -980,6 +985,7 @@ mod tests {
convert::TryFrom, convert::TryFrom,
iter::Iterator, iter::Iterator,
num::{NonZeroU64, NonZeroUsize}, num::{NonZeroU64, NonZeroUsize},
ops::Deref,
str, str,
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@ -2831,16 +2837,9 @@ mod tests {
assert_eq!(paths_actual, paths_expected); assert_eq!(paths_actual, paths_expected);
// ==================== do: remember table schema ==================== // ==================== do: remember table schema ====================
let mut table_schemas: HashMap<String, Schema> = Default::default(); let mut table_schemas: HashMap<String, Arc<Schema>> = Default::default();
for (table_name, _partition_key, _chunk_id) in &chunks { for (table_name, _partition_key, _chunk_id) in &chunks {
// TODO: use official `db.table_schema` interface later let schema = db.table_schema(table_name).unwrap();
let schema = db
.catalog
.table(table_name)
.unwrap()
.schema()
.read()
.clone();
table_schemas.insert(table_name.clone(), schema); table_schemas.insert(table_name.clone(), schema);
} }
@ -2868,16 +2867,9 @@ mod tests {
} }
)); ));
} }
for (table_name, schema) in table_schemas { for (table_name, schema) in &table_schemas {
// TODO: use official `db.table_schema` interface later let schema2 = db.table_schema(table_name).unwrap();
let schema2 = db assert_eq!(schema2.deref(), schema.deref());
.catalog
.table(table_name)
.unwrap()
.schema()
.read()
.clone();
assert_eq!(schema2, schema);
} }
// ==================== check: DB still writable ==================== // ==================== check: DB still writable ====================

View File

@ -11,16 +11,17 @@ use super::{
}; };
use async_trait::async_trait; use async_trait::async_trait;
use data_types::{chunk_metadata::ChunkSummary, error::ErrorLogger}; use data_types::chunk_metadata::ChunkSummary;
use datafusion::{ use datafusion::{
catalog::{catalog::CatalogProvider, schema::SchemaProvider}, catalog::{catalog::CatalogProvider, schema::SchemaProvider},
datasource::TableProvider, datasource::TableProvider,
}; };
use internal_types::schema::Schema;
use metrics::{Counter, KeyValue, MetricRegistry}; use metrics::{Counter, KeyValue, MetricRegistry};
use observability_deps::tracing::debug; use observability_deps::tracing::debug;
use query::{ use query::{
predicate::{Predicate, PredicateBuilder}, predicate::{Predicate, PredicateBuilder},
provider::{self, ChunkPruner, ProviderBuilder}, provider::{ChunkPruner, ProviderBuilder},
QueryChunk, QueryChunkMeta, DEFAULT_SCHEMA, QueryChunk, QueryChunkMeta, DEFAULT_SCHEMA,
}; };
use system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA}; use system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA};
@ -195,6 +196,13 @@ impl QueryDatabase for QueryCatalogAccess {
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>> { fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>> {
Ok(self.catalog.chunk_summaries()) Ok(self.catalog.chunk_summaries())
} }
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
self.catalog
.table(table_name)
.ok()
.map(|table| Arc::clone(&table.schema().read()))
}
} }
// Datafusion catalog provider interface // Datafusion catalog provider interface
@ -249,27 +257,23 @@ impl SchemaProvider for DbSchemaProvider {
/// Create a table provider for the named table /// Create a table provider for the named table
fn table(&self, table_name: &str) -> Option<Arc<dyn TableProvider>> { fn table(&self, table_name: &str) -> Option<Arc<dyn TableProvider>> {
let mut builder = ProviderBuilder::new(table_name); let schema = {
let table = self.catalog.table(table_name).ok()?;
Arc::clone(&table.schema().read())
};
let mut builder = ProviderBuilder::new(table_name, schema);
builder = builder =
builder.add_pruner(Arc::clone(&self.chunk_access) as Arc<dyn ChunkPruner<DbChunk>>); builder.add_pruner(Arc::clone(&self.chunk_access) as Arc<dyn ChunkPruner<DbChunk>>);
let predicate = PredicateBuilder::new().table(table_name).build(); let predicate = PredicateBuilder::new().table(table_name).build();
for chunk in self.chunk_access.candidate_chunks(&predicate) { for chunk in self.chunk_access.candidate_chunks(&predicate) {
// This is unfortunate - a table with incompatible chunks ceases to builder = builder.add_chunk(chunk);
// be visible to the query engine
//
// It is also potentially ill-formed as continuing to use the builder
// after it has errored may not yield entirely sensible results
builder = builder
.add_chunk(chunk)
.log_if_error("Adding chunks to table")
.ok()?;
} }
match builder.build() { match builder.build() {
Ok(provider) => Some(Arc::new(provider)), Ok(provider) => Some(Arc::new(provider)),
Err(provider::Error::InternalNoRowsInTable { .. }) => None,
Err(e) => panic!("unexpected error: {:?}", e), Err(e) => panic!("unexpected error: {:?}", e),
} }
} }

View File

@ -199,7 +199,7 @@ impl Catalog {
&self, &self,
table_name: impl AsRef<str>, table_name: impl AsRef<str>,
partition_key: impl AsRef<str>, partition_key: impl AsRef<str>,
) -> (Arc<RwLock<Partition>>, Arc<RwLock<Schema>>) { ) -> (Arc<RwLock<Partition>>, Arc<RwLock<Arc<Schema>>>) {
let mut tables = self.tables.write(); let mut tables = self.tables.write();
let (_, table) = tables let (_, table) = tables
.raw_entry_mut() .raw_entry_mut()

View File

@ -271,7 +271,7 @@ impl CatalogChunk {
pub(super) fn new_rub_chunk( pub(super) fn new_rub_chunk(
addr: ChunkAddr, addr: ChunkAddr,
chunk: read_buffer::RBChunk, chunk: read_buffer::RBChunk,
schema: Schema, schema: Arc<Schema>,
metrics: ChunkMetrics, metrics: ChunkMetrics,
) -> Self { ) -> Self {
// TODO: Move RUB to single table (#1295) // TODO: Move RUB to single table (#1295)
@ -282,7 +282,7 @@ impl CatalogChunk {
let stage = ChunkStage::Frozen { let stage = ChunkStage::Frozen {
meta: Arc::new(ChunkMetadata { meta: Arc::new(ChunkMetadata {
table_summary: Arc::new(summary), table_summary: Arc::new(summary),
schema: Arc::new(schema), schema,
}), }),
representation: ChunkStageFrozenRepr::ReadBuffer(Arc::new(chunk)), representation: ChunkStageFrozenRepr::ReadBuffer(Arc::new(chunk)),
}; };
@ -603,7 +603,7 @@ impl CatalogChunk {
/// Set the chunk in the Moved state, setting the underlying storage handle to db, and /// Set the chunk in the Moved state, setting the underlying storage handle to db, and
/// discarding the underlying mutable buffer storage. /// discarding the underlying mutable buffer storage.
pub fn set_moved(&mut self, chunk: Arc<RBChunk>, schema: Schema) -> Result<()> { pub fn set_moved(&mut self, chunk: Arc<RBChunk>, schema: Arc<Schema>) -> Result<()> {
match &mut self.stage { match &mut self.stage {
ChunkStage::Frozen { ChunkStage::Frozen {
meta, meta,
@ -613,7 +613,7 @@ impl CatalogChunk {
// after moved, the chunk is sorted and its schema needs to get updated // after moved, the chunk is sorted and its schema needs to get updated
*meta = Arc::new(ChunkMetadata { *meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary), table_summary: Arc::clone(&meta.table_summary),
schema: Arc::new(schema), schema,
}); });
match &representation { match &representation {

View File

@ -166,7 +166,7 @@ impl Partition {
pub fn create_rub_chunk( pub fn create_rub_chunk(
&mut self, &mut self,
chunk: read_buffer::RBChunk, chunk: read_buffer::RBChunk,
schema: Schema, schema: Arc<Schema>,
) -> Arc<RwLock<CatalogChunk>> { ) -> Arc<RwLock<CatalogChunk>> {
let chunk_id = self.next_chunk_id; let chunk_id = self.next_chunk_id;
assert_ne!(self.next_chunk_id, u32::MAX, "Chunk ID Overflow"); assert_ne!(self.next_chunk_id, u32::MAX, "Chunk ID Overflow");

View File

@ -26,7 +26,11 @@ pub struct Table {
metrics: TableMetrics, metrics: TableMetrics,
/// Table-wide schema. /// Table-wide schema.
schema: Arc<RwLock<Schema>>, ///
/// Notes on the type:
/// - the outer `Arc<RwLock<...>>` so so that we can reference the locked schema w/o a lifetime to the table
/// - the inner `Arc<Schema>` is a schema that we don't need to copy when moving it around the query stack
schema: Arc<RwLock<Arc<Schema>>>,
} }
impl Table { impl Table {
@ -40,7 +44,7 @@ impl Table {
let mut builder = SchemaBuilder::new(); let mut builder = SchemaBuilder::new();
builder.measurement(table_name.as_ref()); builder.measurement(table_name.as_ref());
let schema = builder.build().expect("cannot build empty schema"); let schema = builder.build().expect("cannot build empty schema");
let schema = Arc::new(metrics.new_table_lock(schema)); let schema = Arc::new(metrics.new_table_lock(Arc::new(schema)));
Self { Self {
db_name, db_name,
@ -93,7 +97,7 @@ impl Table {
self.partitions.values().map(|x| x.read().summary()) self.partitions.values().map(|x| x.read().summary())
} }
pub fn schema(&self) -> Arc<RwLock<Schema>> { pub fn schema(&self) -> Arc<RwLock<Arc<Schema>>> {
Arc::clone(&self.schema) Arc::clone(&self.schema)
} }
} }
@ -104,12 +108,12 @@ impl Table {
enum TableSchemaUpsertHandleInner<'a> { enum TableSchemaUpsertHandleInner<'a> {
/// Schema will not be changed. /// Schema will not be changed.
NoChange { NoChange {
table_schema_read: RwLockReadGuard<'a, Schema>, table_schema_read: RwLockReadGuard<'a, Arc<Schema>>,
}, },
/// Schema might change (if write to mutable buffer is successfull). /// Schema might change (if write to mutable buffer is successfull).
MightChange { MightChange {
table_schema_write: RwLockWriteGuard<'a, Schema>, table_schema_write: RwLockWriteGuard<'a, Arc<Schema>>,
merged_schema: Schema, merged_schema: Schema,
}, },
} }
@ -122,7 +126,7 @@ pub struct TableSchemaUpsertHandle<'a> {
impl<'a> TableSchemaUpsertHandle<'a> { impl<'a> TableSchemaUpsertHandle<'a> {
pub(crate) fn new( pub(crate) fn new(
table_schema: &'a RwLock<Schema>, table_schema: &'a RwLock<Arc<Schema>>,
new_schema: &Schema, new_schema: &Schema,
) -> Result<Self, SchemaMergerError> { ) -> Result<Self, SchemaMergerError> {
// Be optimistic and only get a read lock. It is rather rare that the schema will change when new data arrives // Be optimistic and only get a read lock. It is rather rare that the schema will change when new data arrives
@ -134,7 +138,7 @@ impl<'a> TableSchemaUpsertHandle<'a> {
let merged_schema = Self::try_merge(&table_schema_read, new_schema)?; let merged_schema = Self::try_merge(&table_schema_read, new_schema)?;
// Now check if this would actually change the schema: // Now check if this would actually change the schema:
if &merged_schema == table_schema_read.deref() { if &merged_schema == table_schema_read.deref().deref() {
// Optimism payed off and we get away we the read lock. // Optimism payed off and we get away we the read lock.
Ok(Self { Ok(Self {
inner: TableSchemaUpsertHandleInner::NoChange { table_schema_read }, inner: TableSchemaUpsertHandleInner::NoChange { table_schema_read },
@ -181,7 +185,7 @@ impl<'a> TableSchemaUpsertHandle<'a> {
merged_schema, merged_schema,
} => { } => {
// Commit new schema and drop write guard; // Commit new schema and drop write guard;
*table_schema_write = merged_schema; *table_schema_write = Arc::new(merged_schema);
drop(table_schema_write); drop(table_schema_write);
} }
} }
@ -204,7 +208,7 @@ mod tests {
.influx_column("tag2", InfluxColumnType::Tag) .influx_column("tag2", InfluxColumnType::Tag)
.build() .build()
.unwrap(); .unwrap();
let table_schema = lock_tracker.new_lock(table_schema_orig.clone()); let table_schema = lock_tracker.new_lock(Arc::new(table_schema_orig.clone()));
// writing with the same schema must not trigger a change // writing with the same schema must not trigger a change
let schema1 = SchemaBuilder::new() let schema1 = SchemaBuilder::new()
@ -218,9 +222,9 @@ mod tests {
handle.inner, handle.inner,
TableSchemaUpsertHandleInner::NoChange { .. } TableSchemaUpsertHandleInner::NoChange { .. }
)); ));
assert_eq!(table_schema.read().deref(), &table_schema_orig); assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
handle.commit(); handle.commit();
assert_eq!(table_schema.read().deref(), &table_schema_orig); assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
// writing with different column order must not trigger a change // writing with different column order must not trigger a change
let schema2 = SchemaBuilder::new() let schema2 = SchemaBuilder::new()
@ -234,9 +238,9 @@ mod tests {
handle.inner, handle.inner,
TableSchemaUpsertHandleInner::NoChange { .. } TableSchemaUpsertHandleInner::NoChange { .. }
)); ));
assert_eq!(table_schema.read().deref(), &table_schema_orig); assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
handle.commit(); handle.commit();
assert_eq!(table_schema.read().deref(), &table_schema_orig); assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
// writing with a column subset must not trigger a change // writing with a column subset must not trigger a change
let schema3 = SchemaBuilder::new() let schema3 = SchemaBuilder::new()
@ -249,9 +253,9 @@ mod tests {
handle.inner, handle.inner,
TableSchemaUpsertHandleInner::NoChange { .. } TableSchemaUpsertHandleInner::NoChange { .. }
)); ));
assert_eq!(table_schema.read().deref(), &table_schema_orig); assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
handle.commit(); handle.commit();
assert_eq!(table_schema.read().deref(), &table_schema_orig); assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
} }
#[test] #[test]
@ -263,7 +267,7 @@ mod tests {
.influx_column("tag2", InfluxColumnType::Tag) .influx_column("tag2", InfluxColumnType::Tag)
.build() .build()
.unwrap(); .unwrap();
let table_schema = lock_tracker.new_lock(table_schema_orig); let table_schema = lock_tracker.new_lock(Arc::new(table_schema_orig));
let new_schema = SchemaBuilder::new() let new_schema = SchemaBuilder::new()
.measurement("m1") .measurement("m1")
@ -288,7 +292,7 @@ mod tests {
.influx_column("tag3", InfluxColumnType::Tag) .influx_column("tag3", InfluxColumnType::Tag)
.build() .build()
.unwrap(); .unwrap();
assert_eq!(table_schema.read().deref(), &table_schema_expected); assert_eq!(table_schema.read().deref().deref(), &table_schema_expected);
} }
#[test] #[test]
@ -300,7 +304,7 @@ mod tests {
.influx_column("tag2", InfluxColumnType::Tag) .influx_column("tag2", InfluxColumnType::Tag)
.build() .build()
.unwrap(); .unwrap();
let table_schema = lock_tracker.new_lock(table_schema_orig.clone()); let table_schema = lock_tracker.new_lock(Arc::new(table_schema_orig.clone()));
let schema1 = SchemaBuilder::new() let schema1 = SchemaBuilder::new()
.measurement("m1") .measurement("m1")
@ -311,6 +315,6 @@ mod tests {
assert!(TableSchemaUpsertHandle::new(&table_schema, &schema1).is_err()); assert!(TableSchemaUpsertHandle::new(&table_schema, &schema1).is_err());
// schema did not change // schema did not change
assert_eq!(table_schema.read().deref(), &table_schema_orig); assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
} }
} }

View File

@ -4,6 +4,7 @@ use std::future::Future;
use std::sync::Arc; use std::sync::Arc;
use data_types::job::Job; use data_types::job::Job;
use internal_types::schema::merge::SchemaMerger;
use lifecycle::LifecycleWriteGuard; use lifecycle::LifecycleWriteGuard;
use observability_deps::tracing::info; use observability_deps::tracing::info;
use query::exec::ExecutorType; use query::exec::ExecutorType;
@ -58,6 +59,17 @@ pub(crate) fn compact_chunks(
}) })
.collect::<Result<Vec<_>>>()?; .collect::<Result<Vec<_>>>()?;
// build schema
// Note: we only use the merged schema from the to-be-compacted chunks - not the table-wide schema, since we don't
// need to bother with other columns (e.g. ones that only exist in other partitions).
let mut merger = SchemaMerger::new();
for db_chunk in &query_chunks {
merger = merger
.merge(&db_chunk.schema())
.expect("schemas compatible");
}
let schema = Arc::new(merger.build());
// drop partition lock // drop partition lock
let partition = partition.unwrap().partition; let partition = partition.unwrap().partition;
@ -79,7 +91,7 @@ pub(crate) fn compact_chunks(
// Cannot move query_chunks as the sort key borrows the column names // Cannot move query_chunks as the sort key borrows the column names
let (schema, plan) = let (schema, plan) =
ReorgPlanner::new().compact_plan(query_chunks.iter().map(Arc::clone), key)?; ReorgPlanner::new().compact_plan(schema, query_chunks.iter().map(Arc::clone), key)?;
let physical_plan = ctx.prepare_plan(&plan)?; let physical_plan = ctx.prepare_plan(&plan)?;
let stream = ctx.execute(physical_plan).await?; let stream = ctx.execute(physical_plan).await?;

View File

@ -37,7 +37,11 @@ pub fn move_chunk_to_read_buffer(
let table_summary = guard.table_summary(); let table_summary = guard.table_summary();
// snapshot the data // snapshot the data
let query_chunks = vec![DbChunk::snapshot(&*guard)]; // Note: we can just use the chunk-specific schema here since there is only a single chunk and this is somewhat a
// local operation that should only need to deal with the columns that are really present.
let db_chunk = DbChunk::snapshot(&*guard);
let schema = db_chunk.schema();
let query_chunks = vec![db_chunk];
// Drop locks // Drop locks
let chunk = guard.unwrap().chunk; let chunk = guard.unwrap().chunk;
@ -52,7 +56,7 @@ pub fn move_chunk_to_read_buffer(
// Cannot move query_chunks as the sort key borrows the column names // Cannot move query_chunks as the sort key borrows the column names
let (schema, plan) = let (schema, plan) =
ReorgPlanner::new().compact_plan(query_chunks.iter().map(Arc::clone), key)?; ReorgPlanner::new().compact_plan(schema, query_chunks.iter().map(Arc::clone), key)?;
let physical_plan = ctx.prepare_plan(&plan)?; let physical_plan = ctx.prepare_plan(&plan)?;
let stream = ctx.execute(physical_plan).await?; let stream = ctx.execute(physical_plan).await?;