Merge pull request #1929 from influxdata/crepererum/fix_query_schema_unwrap
refactor: pass schema arcs from catalog to query engine (instead of creating them on-demand)pull/24376/head
commit
34dcd991d3
|
@ -16,7 +16,7 @@ use internal_types::{
|
|||
selection::Selection,
|
||||
};
|
||||
use observability_deps::tracing::{debug, trace};
|
||||
use snafu::{ensure, ResultExt, Snafu};
|
||||
use snafu::{ensure, OptionExt, ResultExt, Snafu};
|
||||
|
||||
use crate::{
|
||||
exec::{field::FieldColumns, make_schema_pivot},
|
||||
|
@ -141,6 +141,9 @@ pub enum Error {
|
|||
|
||||
#[snafu(display("Internal error: aggregate {:?} is not a selector", agg))]
|
||||
InternalAggregateNotSelector { agg: Aggregate },
|
||||
|
||||
#[snafu(display("Table was removed while planning query: {}", table_name))]
|
||||
TableRemoved { table_name: String },
|
||||
}
|
||||
|
||||
pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||
|
@ -301,7 +304,10 @@ impl InfluxRpcPlanner {
|
|||
// were already known to have data (based on the contents of known_columns)
|
||||
|
||||
for (table_name, chunks) in need_full_plans.into_iter() {
|
||||
let plan = self.tag_keys_plan(&table_name, &predicate, chunks)?;
|
||||
let schema = database.table_schema(&table_name).context(TableRemoved {
|
||||
table_name: &table_name,
|
||||
})?;
|
||||
let plan = self.tag_keys_plan(&table_name, schema, &predicate, chunks)?;
|
||||
|
||||
if let Some(plan) = plan {
|
||||
builder = builder.append(plan)
|
||||
|
@ -422,7 +428,10 @@ impl InfluxRpcPlanner {
|
|||
// time in `known_columns`, and some tables in chunks that we
|
||||
// need to run a plan to find what values pass the predicate.
|
||||
for (table_name, chunks) in need_full_plans.into_iter() {
|
||||
let scan_and_filter = self.scan_and_filter(&table_name, &predicate, chunks)?;
|
||||
let schema = database.table_schema(&table_name).context(TableRemoved {
|
||||
table_name: &table_name,
|
||||
})?;
|
||||
let scan_and_filter = self.scan_and_filter(&table_name, schema, &predicate, chunks)?;
|
||||
|
||||
// if we have any data to scan, make a plan!
|
||||
if let Some(TableScanAndFilter {
|
||||
|
@ -483,7 +492,10 @@ impl InfluxRpcPlanner {
|
|||
|
||||
let mut field_list_plan = FieldListPlan::new();
|
||||
for (table_name, chunks) in table_chunks {
|
||||
if let Some(plan) = self.field_columns_plan(&table_name, &predicate, chunks)? {
|
||||
let schema = database.table_schema(&table_name).context(TableRemoved {
|
||||
table_name: &table_name,
|
||||
})?;
|
||||
if let Some(plan) = self.field_columns_plan(&table_name, schema, &predicate, chunks)? {
|
||||
field_list_plan = field_list_plan.append(plan);
|
||||
}
|
||||
}
|
||||
|
@ -524,8 +536,12 @@ impl InfluxRpcPlanner {
|
|||
let mut ss_plans = Vec::with_capacity(table_chunks.len());
|
||||
for (table_name, chunks) in table_chunks {
|
||||
let prefix_columns: Option<&[&str]> = None;
|
||||
let schema = database.table_schema(&table_name).context(TableRemoved {
|
||||
table_name: &table_name,
|
||||
})?;
|
||||
|
||||
let ss_plan = self.read_filter_plan(table_name, prefix_columns, &predicate, chunks)?;
|
||||
let ss_plan =
|
||||
self.read_filter_plan(table_name, schema, prefix_columns, &predicate, chunks)?;
|
||||
// If we have to do real work, add it to the list of plans
|
||||
if let Some(ss_plan) = ss_plan {
|
||||
ss_plans.push(ss_plan);
|
||||
|
@ -559,11 +575,25 @@ impl InfluxRpcPlanner {
|
|||
// now, build up plans for each table
|
||||
let mut ss_plans = Vec::with_capacity(table_chunks.len());
|
||||
for (table_name, chunks) in table_chunks {
|
||||
let schema = database.table_schema(&table_name).context(TableRemoved {
|
||||
table_name: &table_name,
|
||||
})?;
|
||||
let ss_plan = match agg {
|
||||
Aggregate::None => {
|
||||
self.read_filter_plan(table_name, Some(group_columns), &predicate, chunks)?
|
||||
}
|
||||
_ => self.read_group_plan(table_name, &predicate, agg, group_columns, chunks)?,
|
||||
Aggregate::None => self.read_filter_plan(
|
||||
table_name,
|
||||
Arc::clone(&schema),
|
||||
Some(group_columns),
|
||||
&predicate,
|
||||
chunks,
|
||||
)?,
|
||||
_ => self.read_group_plan(
|
||||
table_name,
|
||||
schema,
|
||||
&predicate,
|
||||
agg,
|
||||
group_columns,
|
||||
chunks,
|
||||
)?,
|
||||
};
|
||||
|
||||
// If we have to do real work, add it to the list of plans
|
||||
|
@ -604,8 +634,12 @@ impl InfluxRpcPlanner {
|
|||
// now, build up plans for each table
|
||||
let mut ss_plans = Vec::with_capacity(table_chunks.len());
|
||||
for (table_name, chunks) in table_chunks {
|
||||
let ss_plan = self
|
||||
.read_window_aggregate_plan(table_name, &predicate, agg, &every, &offset, chunks)?;
|
||||
let schema = database.table_schema(&table_name).context(TableRemoved {
|
||||
table_name: &table_name,
|
||||
})?;
|
||||
let ss_plan = self.read_window_aggregate_plan(
|
||||
table_name, schema, &predicate, agg, &every, &offset, chunks,
|
||||
)?;
|
||||
// If we have to do real work, add it to the list of plans
|
||||
if let Some(ss_plan) = ss_plan {
|
||||
ss_plans.push(ss_plan);
|
||||
|
@ -665,13 +699,14 @@ impl InfluxRpcPlanner {
|
|||
fn tag_keys_plan<C>(
|
||||
&self,
|
||||
table_name: &str,
|
||||
schema: Arc<Schema>,
|
||||
predicate: &Predicate,
|
||||
chunks: Vec<Arc<C>>,
|
||||
) -> Result<Option<StringSetPlan>>
|
||||
where
|
||||
C: QueryChunk + 'static,
|
||||
{
|
||||
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
|
||||
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
|
@ -727,13 +762,14 @@ impl InfluxRpcPlanner {
|
|||
fn field_columns_plan<C>(
|
||||
&self,
|
||||
table_name: &str,
|
||||
schema: Arc<Schema>,
|
||||
predicate: &Predicate,
|
||||
chunks: Vec<Arc<C>>,
|
||||
) -> Result<Option<LogicalPlan>>
|
||||
where
|
||||
C: QueryChunk + 'static,
|
||||
{
|
||||
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
schema,
|
||||
|
@ -777,6 +813,7 @@ impl InfluxRpcPlanner {
|
|||
fn read_filter_plan<C>(
|
||||
&self,
|
||||
table_name: impl AsRef<str>,
|
||||
schema: Arc<Schema>,
|
||||
prefix_columns: Option<&[impl AsRef<str>]>,
|
||||
predicate: &Predicate,
|
||||
chunks: Vec<Arc<C>>,
|
||||
|
@ -785,7 +822,7 @@ impl InfluxRpcPlanner {
|
|||
C: QueryChunk + 'static,
|
||||
{
|
||||
let table_name = table_name.as_ref();
|
||||
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
|
||||
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
|
@ -897,6 +934,7 @@ impl InfluxRpcPlanner {
|
|||
fn read_group_plan<C>(
|
||||
&self,
|
||||
table_name: impl Into<String>,
|
||||
schema: Arc<Schema>,
|
||||
predicate: &Predicate,
|
||||
agg: Aggregate,
|
||||
group_columns: &[impl AsRef<str>],
|
||||
|
@ -906,7 +944,7 @@ impl InfluxRpcPlanner {
|
|||
C: QueryChunk + 'static,
|
||||
{
|
||||
let table_name = table_name.into();
|
||||
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(&table_name, schema, predicate, chunks)?;
|
||||
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
|
@ -984,9 +1022,11 @@ impl InfluxRpcPlanner {
|
|||
/// GroupBy(gby: tag columns, window_function; agg: aggregate(field))
|
||||
/// Filter(predicate)
|
||||
/// Scan
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
fn read_window_aggregate_plan<C>(
|
||||
&self,
|
||||
table_name: impl Into<String>,
|
||||
schema: Arc<Schema>,
|
||||
predicate: &Predicate,
|
||||
agg: Aggregate,
|
||||
every: &WindowDuration,
|
||||
|
@ -997,7 +1037,7 @@ impl InfluxRpcPlanner {
|
|||
C: QueryChunk + 'static,
|
||||
{
|
||||
let table_name = table_name.into();
|
||||
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(&table_name, schema, predicate, chunks)?;
|
||||
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
|
@ -1074,6 +1114,7 @@ impl InfluxRpcPlanner {
|
|||
fn scan_and_filter<C>(
|
||||
&self,
|
||||
table_name: &str,
|
||||
schema: Arc<Schema>,
|
||||
predicate: &Predicate,
|
||||
chunks: Vec<Arc<C>>,
|
||||
) -> Result<Option<TableScanAndFilter>>
|
||||
|
@ -1085,7 +1126,7 @@ impl InfluxRpcPlanner {
|
|||
let projection = None;
|
||||
|
||||
// Prepare the scan of the table
|
||||
let mut builder = ProviderBuilder::new(table_name);
|
||||
let mut builder = ProviderBuilder::new(table_name, schema);
|
||||
|
||||
// Since the entire predicate is used in the call to
|
||||
// `database.chunks()` there will not be any additional
|
||||
|
@ -1106,9 +1147,7 @@ impl InfluxRpcPlanner {
|
|||
chunk.id(),
|
||||
);
|
||||
|
||||
builder = builder
|
||||
.add_chunk(chunk)
|
||||
.context(CreatingProvider { table_name })?;
|
||||
builder = builder.add_chunk(chunk);
|
||||
}
|
||||
|
||||
let provider = builder.build().context(CreatingProvider { table_name })?;
|
||||
|
@ -1217,7 +1256,7 @@ struct TableScanAndFilter {
|
|||
/// Represents plan that scans a table and applies optional filtering
|
||||
plan_builder: LogicalPlanBuilder,
|
||||
/// The IOx schema of the result
|
||||
schema: Schema,
|
||||
schema: Arc<Schema>,
|
||||
}
|
||||
|
||||
/// Reorders tag_columns so that its prefix matches exactly
|
||||
|
|
|
@ -63,9 +63,10 @@ impl ReorgPlanner {
|
|||
/// (Scan chunks) <-- any needed deduplication happens here
|
||||
pub fn compact_plan<C, I>(
|
||||
&self,
|
||||
schema: Arc<Schema>,
|
||||
chunks: I,
|
||||
output_sort: SortKey<'_>,
|
||||
) -> Result<(Schema, LogicalPlan)>
|
||||
) -> Result<(Arc<Schema>, LogicalPlan)>
|
||||
where
|
||||
C: QueryChunk + 'static,
|
||||
I: IntoIterator<Item = Arc<C>>,
|
||||
|
@ -73,13 +74,21 @@ impl ReorgPlanner {
|
|||
let ScanPlan {
|
||||
plan_builder,
|
||||
provider,
|
||||
} = self.scan_and_sort_plan(chunks, output_sort.clone())?;
|
||||
} = self.scan_and_sort_plan(schema, chunks, output_sort.clone())?;
|
||||
|
||||
let mut schema = provider.iox_schema();
|
||||
|
||||
// Set the sort_key of the schema to the compacted chunk's sort key
|
||||
// Try to do this only if the sort key changes so we avoid unnecessary schema copies.
|
||||
trace!(input_schema=?schema, "Setting sort key on schema");
|
||||
schema.set_sort_key(&output_sort);
|
||||
if schema
|
||||
.sort_key()
|
||||
.map_or(true, |existing_key| existing_key != output_sort)
|
||||
{
|
||||
let mut schema_cloned = schema.as_ref().clone();
|
||||
schema_cloned.set_sort_key(&output_sort);
|
||||
schema = Arc::new(schema_cloned);
|
||||
}
|
||||
trace!(output_schema=?schema, "Setting sort key on schema");
|
||||
|
||||
let plan = plan_builder.build().context(BuildingPlan)?;
|
||||
|
@ -136,10 +145,11 @@ impl ReorgPlanner {
|
|||
/// ```
|
||||
pub fn split_plan<C, I>(
|
||||
&self,
|
||||
schema: Arc<Schema>,
|
||||
chunks: I,
|
||||
output_sort: SortKey<'_>,
|
||||
split_time: i64,
|
||||
) -> Result<(Schema, LogicalPlan)>
|
||||
) -> Result<(Arc<Schema>, LogicalPlan)>
|
||||
where
|
||||
C: QueryChunk + 'static,
|
||||
I: IntoIterator<Item = Arc<C>>,
|
||||
|
@ -147,7 +157,7 @@ impl ReorgPlanner {
|
|||
let ScanPlan {
|
||||
plan_builder,
|
||||
provider,
|
||||
} = self.scan_and_sort_plan(chunks, output_sort)?;
|
||||
} = self.scan_and_sort_plan(schema, chunks, output_sort)?;
|
||||
|
||||
// TODO: Set sort key on schema
|
||||
let schema = provider.iox_schema();
|
||||
|
@ -176,7 +186,12 @@ impl ReorgPlanner {
|
|||
///
|
||||
/// (Sort on output_sort)
|
||||
/// (Scan chunks) <-- any needed deduplication happens here
|
||||
fn scan_and_sort_plan<C, I>(&self, chunks: I, output_sort: SortKey<'_>) -> Result<ScanPlan<C>>
|
||||
fn scan_and_sort_plan<C, I>(
|
||||
&self,
|
||||
schema: Arc<Schema>,
|
||||
chunks: I,
|
||||
output_sort: SortKey<'_>,
|
||||
) -> Result<ScanPlan<C>>
|
||||
where
|
||||
C: QueryChunk + 'static,
|
||||
I: IntoIterator<Item = Arc<C>>,
|
||||
|
@ -189,7 +204,7 @@ impl ReorgPlanner {
|
|||
let table_name = &table_name;
|
||||
|
||||
// Prepare the plan for the table
|
||||
let mut builder = ProviderBuilder::new(table_name);
|
||||
let mut builder = ProviderBuilder::new(table_name, schema);
|
||||
|
||||
// There are no predicates in these plans, so no need to prune them
|
||||
builder = builder.add_no_op_pruner();
|
||||
|
@ -203,9 +218,7 @@ impl ReorgPlanner {
|
|||
chunk.id(),
|
||||
);
|
||||
|
||||
builder = builder
|
||||
.add_chunk(chunk)
|
||||
.context(CreatingProvider { table_name })?;
|
||||
builder = builder.add_chunk(chunk);
|
||||
}
|
||||
|
||||
let provider = builder.build().context(CreatingProvider { table_name })?;
|
||||
|
@ -244,16 +257,17 @@ struct ScanPlan<C: QueryChunk + 'static> {
|
|||
#[cfg(test)]
|
||||
mod test {
|
||||
use arrow_util::assert_batches_eq;
|
||||
use internal_types::schema::sort::SortOptions;
|
||||
use internal_types::schema::{merge::SchemaMerger, sort::SortOptions};
|
||||
|
||||
use crate::{
|
||||
exec::{Executor, ExecutorType},
|
||||
test::{raw_data, TestChunk},
|
||||
QueryChunkMeta,
|
||||
};
|
||||
|
||||
use super::*;
|
||||
|
||||
async fn get_test_chunks() -> Vec<Arc<TestChunk>> {
|
||||
async fn get_test_chunks() -> (Arc<Schema>, Vec<Arc<TestChunk>>) {
|
||||
// Chunk 1 with 5 rows of data on 2 tags
|
||||
let chunk1 = Arc::new(
|
||||
TestChunk::new(1)
|
||||
|
@ -298,12 +312,19 @@ mod test {
|
|||
];
|
||||
assert_batches_eq!(&expected, &raw_data(&[Arc::clone(&chunk2)]).await);
|
||||
|
||||
vec![chunk1, chunk2]
|
||||
let schema = SchemaMerger::new()
|
||||
.merge(&chunk1.schema())
|
||||
.unwrap()
|
||||
.merge(&chunk2.schema())
|
||||
.unwrap()
|
||||
.build();
|
||||
|
||||
(Arc::new(schema), vec![chunk1, chunk2])
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_compact_plan() {
|
||||
let chunks = get_test_chunks().await;
|
||||
let (schema, chunks) = get_test_chunks().await;
|
||||
|
||||
let mut sort_key = SortKey::with_capacity(2);
|
||||
sort_key.push(
|
||||
|
@ -322,7 +343,7 @@ mod test {
|
|||
);
|
||||
|
||||
let (_, compact_plan) = ReorgPlanner::new()
|
||||
.compact_plan(chunks, sort_key)
|
||||
.compact_plan(schema, chunks, sort_key)
|
||||
.expect("created compact plan");
|
||||
|
||||
let executor = Executor::new(1);
|
||||
|
@ -363,7 +384,7 @@ mod test {
|
|||
test_helpers::maybe_start_logging();
|
||||
// validate that the plumbing is all hooked up. The logic of
|
||||
// the operator is tested in its own module.
|
||||
let chunks = get_test_chunks().await;
|
||||
let (schema, chunks) = get_test_chunks().await;
|
||||
|
||||
let mut sort_key = SortKey::with_capacity(1);
|
||||
sort_key.push(
|
||||
|
@ -376,7 +397,7 @@ mod test {
|
|||
|
||||
// split on 1000 should have timestamps 1000, 5000, and 7000
|
||||
let (_, split_plan) = ReorgPlanner::new()
|
||||
.split_plan(chunks, sort_key, 1000)
|
||||
.split_plan(schema, chunks, sort_key, 1000)
|
||||
.expect("created compact plan");
|
||||
|
||||
let executor = Executor::new(1);
|
||||
|
|
|
@ -55,6 +55,9 @@ pub trait QueryDatabase: Debug + Send + Sync {
|
|||
/// Return the partition keys for data in this DB
|
||||
fn partition_keys(&self) -> Result<Vec<String>, Self::Error>;
|
||||
|
||||
/// Schema for a specific table if the table exists.
|
||||
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>>;
|
||||
|
||||
/// Returns a set of chunks within the partition with data that may match
|
||||
/// the provided predicate. If possible, chunks which have no rows that can
|
||||
/// possibly match the predicate may be omitted.
|
||||
|
|
|
@ -40,26 +40,12 @@ use self::{
|
|||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("Chunk schema not compatible for table '{}': {}", table_name, source))]
|
||||
ChunkSchemaNotCompatible {
|
||||
table_name: String,
|
||||
source: internal_types::schema::merge::Error,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Internal error: no chunk pruner provided to builder for {}",
|
||||
table_name,
|
||||
))]
|
||||
InternalNoChunkPruner { table_name: String },
|
||||
|
||||
#[snafu(display("Internal error: No rows found in table '{}'", table_name))]
|
||||
InternalNoRowsInTable { table_name: String },
|
||||
|
||||
#[snafu(display("Internal error: Cannot verify the push-down predicate '{}'", source,))]
|
||||
InternalPushdownPredicate {
|
||||
source: datafusion::error::DataFusionError,
|
||||
},
|
||||
|
||||
#[snafu(display("Internal error: Cannot create projection select expr '{}'", source,))]
|
||||
InternalSelectExpr {
|
||||
source: datafusion::error::DataFusionError,
|
||||
|
@ -106,35 +92,25 @@ pub trait ChunkPruner<C: QueryChunk>: Sync + Send + std::fmt::Debug {
|
|||
#[derive(Debug)]
|
||||
pub struct ProviderBuilder<C: QueryChunk + 'static> {
|
||||
table_name: Arc<str>,
|
||||
schema_merger: SchemaMerger,
|
||||
schema: Arc<Schema>,
|
||||
chunk_pruner: Option<Arc<dyn ChunkPruner<C>>>,
|
||||
chunks: Vec<Arc<C>>,
|
||||
}
|
||||
|
||||
impl<C: QueryChunk> ProviderBuilder<C> {
|
||||
pub fn new(table_name: impl AsRef<str>) -> Self {
|
||||
pub fn new(table_name: impl AsRef<str>, schema: Arc<Schema>) -> Self {
|
||||
Self {
|
||||
table_name: Arc::from(table_name.as_ref()),
|
||||
schema_merger: SchemaMerger::new(),
|
||||
schema,
|
||||
chunk_pruner: None,
|
||||
chunks: Vec::new(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Add a new chunk to this provider
|
||||
pub fn add_chunk(mut self, chunk: Arc<C>) -> Result<Self> {
|
||||
let chunk_table_schema = chunk.schema();
|
||||
|
||||
self.schema_merger = self
|
||||
.schema_merger
|
||||
.merge(&chunk_table_schema.as_ref())
|
||||
.context(ChunkSchemaNotCompatible {
|
||||
table_name: self.table_name.as_ref(),
|
||||
})?;
|
||||
|
||||
pub fn add_chunk(mut self, chunk: Arc<C>) -> Self {
|
||||
self.chunks.push(chunk);
|
||||
|
||||
Ok(self)
|
||||
self
|
||||
}
|
||||
|
||||
/// Specify a `ChunkPruner` for the provider that will apply
|
||||
|
@ -161,16 +137,6 @@ impl<C: QueryChunk> ProviderBuilder<C> {
|
|||
|
||||
/// Create the Provider
|
||||
pub fn build(self) -> Result<ChunkTableProvider<C>> {
|
||||
let iox_schema = self.schema_merger.build();
|
||||
|
||||
// if the table was reported to exist, it should not be empty
|
||||
if self.chunks.is_empty() {
|
||||
return InternalNoRowsInTable {
|
||||
table_name: self.table_name.as_ref(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
|
||||
let chunk_pruner = match self.chunk_pruner {
|
||||
Some(chunk_pruner) => chunk_pruner,
|
||||
None => {
|
||||
|
@ -182,7 +148,7 @@ impl<C: QueryChunk> ProviderBuilder<C> {
|
|||
};
|
||||
|
||||
Ok(ChunkTableProvider {
|
||||
iox_schema,
|
||||
iox_schema: self.schema,
|
||||
chunk_pruner,
|
||||
table_name: self.table_name,
|
||||
chunks: self.chunks,
|
||||
|
@ -198,7 +164,7 @@ impl<C: QueryChunk> ProviderBuilder<C> {
|
|||
pub struct ChunkTableProvider<C: QueryChunk + 'static> {
|
||||
table_name: Arc<str>,
|
||||
/// The IOx schema (wrapper around Arrow Schemaref) for this table
|
||||
iox_schema: Schema,
|
||||
iox_schema: Arc<Schema>,
|
||||
/// Something that can prune chunks
|
||||
chunk_pruner: Arc<dyn ChunkPruner<C>>,
|
||||
// The chunks
|
||||
|
@ -207,8 +173,8 @@ pub struct ChunkTableProvider<C: QueryChunk + 'static> {
|
|||
|
||||
impl<C: QueryChunk + 'static> ChunkTableProvider<C> {
|
||||
/// Return the IOx schema view for the data provided by this provider
|
||||
pub fn iox_schema(&self) -> Schema {
|
||||
self.iox_schema.clone()
|
||||
pub fn iox_schema(&self) -> Arc<Schema> {
|
||||
Arc::clone(&self.iox_schema)
|
||||
}
|
||||
|
||||
/// Return the Arrow schema view for the data provided by this provider
|
||||
|
@ -255,8 +221,8 @@ impl<C: QueryChunk + 'static> TableProvider for ChunkTableProvider<C> {
|
|||
|
||||
// Figure out the schema of the requested output
|
||||
let scan_schema = match projection {
|
||||
Some(indicies) => self.iox_schema.select_by_indices(indicies),
|
||||
None => self.iox_schema.clone(),
|
||||
Some(indicies) => Arc::new(self.iox_schema.select_by_indices(indicies)),
|
||||
None => Arc::clone(&self.iox_schema),
|
||||
};
|
||||
|
||||
// This debug shows the self.arrow_schema() includes all columns in all chunks
|
||||
|
@ -266,7 +232,7 @@ impl<C: QueryChunk + 'static> TableProvider for ChunkTableProvider<C> {
|
|||
let mut deduplicate = Deduplicater::new();
|
||||
let plan = deduplicate.build_scan_plan(
|
||||
Arc::clone(&self.table_name),
|
||||
Arc::new(scan_schema),
|
||||
scan_schema,
|
||||
chunks,
|
||||
predicate,
|
||||
)?;
|
||||
|
|
|
@ -116,6 +116,23 @@ impl QueryDatabase for TestDatabase {
|
|||
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>, Self::Error> {
|
||||
unimplemented!("summaries not implemented TestDatabase")
|
||||
}
|
||||
|
||||
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
|
||||
let mut merger = SchemaMerger::new();
|
||||
let mut found_one = false;
|
||||
|
||||
let partitions = self.partitions.lock();
|
||||
for partition in partitions.values() {
|
||||
for chunk in partition.values() {
|
||||
if chunk.table_name() == table_name {
|
||||
merger = merger.merge(&chunk.schema()).expect("consistent schemas");
|
||||
found_one = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
found_one.then(|| Arc::new(merger.build()))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
-- Test Setup: OneMeasurementAllChunksDropped
|
||||
-- SQL: SELECT * from information_schema.tables;
|
||||
+---------------+--------------------+---------------+------------+
|
||||
| table_catalog | table_schema | table_name | table_type |
|
||||
+---------------+--------------------+---------------+------------+
|
||||
| public | iox | h2o | BASE TABLE |
|
||||
| public | system | chunks | BASE TABLE |
|
||||
| public | system | columns | BASE TABLE |
|
||||
| public | system | chunk_columns | BASE TABLE |
|
||||
| public | system | operations | BASE TABLE |
|
||||
| public | information_schema | tables | VIEW |
|
||||
| public | information_schema | columns | VIEW |
|
||||
+---------------+--------------------+---------------+------------+
|
||||
-- SQL: SHOW TABLES;
|
||||
+---------------+--------------------+---------------+------------+
|
||||
| table_catalog | table_schema | table_name | table_type |
|
||||
+---------------+--------------------+---------------+------------+
|
||||
| public | iox | h2o | BASE TABLE |
|
||||
| public | system | chunks | BASE TABLE |
|
||||
| public | system | columns | BASE TABLE |
|
||||
| public | system | chunk_columns | BASE TABLE |
|
||||
| public | system | operations | BASE TABLE |
|
||||
| public | information_schema | tables | VIEW |
|
||||
| public | information_schema | columns | VIEW |
|
||||
+---------------+--------------------+---------------+------------+
|
|
@ -0,0 +1,8 @@
|
|||
-- Test for predicate push down explains
|
||||
-- IOX_SETUP: OneMeasurementAllChunksDropped
|
||||
|
||||
-- list information schema
|
||||
SELECT * from information_schema.tables;
|
||||
|
||||
-- same but shorter
|
||||
SHOW TABLES;
|
|
@ -4,6 +4,20 @@
|
|||
use std::path::Path;
|
||||
use crate::runner::Runner;
|
||||
|
||||
#[tokio::test]
|
||||
// Tests from "all_chunks_dropped.sql",
|
||||
async fn test_cases_all_chunks_dropped_sql() {
|
||||
let input_path = Path::new("cases").join("in").join("all_chunks_dropped.sql");
|
||||
let mut runner = Runner::new();
|
||||
runner
|
||||
.run(input_path)
|
||||
.await
|
||||
.expect("test failed");
|
||||
runner
|
||||
.flush()
|
||||
.expect("flush worked");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
// Tests from "duplicates.sql",
|
||||
async fn test_cases_duplicates_sql() {
|
||||
|
|
|
@ -294,6 +294,8 @@ impl<W: Write> Runner<W> {
|
|||
}
|
||||
|
||||
/// Return output path for input path.
|
||||
///
|
||||
/// This converts `some/prefix/in/foo.sql` (or other file extensions) to `some/prefix/out/foo.out`.
|
||||
fn make_output_path(input: &Path) -> Result<PathBuf> {
|
||||
let stem = input.file_stem().context(NoFileStem { path: input })?;
|
||||
|
||||
|
@ -306,6 +308,10 @@ fn make_output_path(input: &Path) -> Result<PathBuf> {
|
|||
out.push("out");
|
||||
|
||||
// set file name and ext
|
||||
// The PathBuf API is somewhat confusing: `set_file_name` will replace the last component (which at this point is
|
||||
// the "out"). However we wanna create a file out of the stem and the extension. So as a somewhat hackish
|
||||
// workaround first push a placeholder that is then replaced.
|
||||
out.push("placeholder");
|
||||
out.set_file_name(stem);
|
||||
out.set_extension("out");
|
||||
|
||||
|
@ -417,8 +423,11 @@ SELECT * from disk;
|
|||
let in_dir = dir.path().join("in");
|
||||
std::fs::create_dir(&in_dir).expect("create in-dir");
|
||||
|
||||
let out_dir = dir.path().join("out");
|
||||
std::fs::create_dir(&out_dir).expect("create out-dir");
|
||||
|
||||
let mut file = in_dir;
|
||||
file.set_file_name("foo.sql");
|
||||
file.push("foo.sql");
|
||||
|
||||
std::fs::write(&file, contents).expect("writing data to temp file");
|
||||
(dir, file)
|
||||
|
|
|
@ -49,6 +49,7 @@ pub fn get_all_setups() -> &'static HashMap<String, Arc<dyn DbSetup>> {
|
|||
register_setup!(TwoMeasurements),
|
||||
register_setup!(TwoMeasurementsPredicatePushDown),
|
||||
register_setup!(OneMeasurementThreeChunksWithDuplicates),
|
||||
register_setup!(OneMeasurementAllChunksDropped),
|
||||
]
|
||||
.into_iter()
|
||||
.map(|(name, setup)| (name.to_string(), setup as Arc<dyn DbSetup>))
|
||||
|
@ -676,6 +677,34 @@ pub(crate) async fn make_one_chunk_rub_scenario(
|
|||
vec![scenario]
|
||||
}
|
||||
|
||||
/// This creates two chunks but then drops them all. This should keep the tables.
|
||||
#[derive(Debug)]
|
||||
pub struct OneMeasurementAllChunksDropped {}
|
||||
#[async_trait]
|
||||
impl DbSetup for OneMeasurementAllChunksDropped {
|
||||
async fn make(&self) -> Vec<DbScenario> {
|
||||
let db = make_db().await.db;
|
||||
|
||||
let partition_key = "1970-01-01T00";
|
||||
let table_name = "h2o";
|
||||
|
||||
let lp_lines = vec!["h2o,state=MA temp=70.4 50"];
|
||||
write_lp(&db, &lp_lines.join("\n")).await;
|
||||
db.rollover_partition(table_name, partition_key)
|
||||
.await
|
||||
.unwrap();
|
||||
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
|
||||
.await
|
||||
.unwrap();
|
||||
db.drop_chunk(table_name, partition_key, 0).unwrap();
|
||||
|
||||
vec![DbScenario {
|
||||
scenario_name: "one measurement but all chunks are dropped".into(),
|
||||
db,
|
||||
}]
|
||||
}
|
||||
}
|
||||
|
||||
/// This function loads one chunk of lp data into different scenarios that simulates
|
||||
/// the data life cycle.
|
||||
///
|
||||
|
|
|
@ -26,6 +26,7 @@ use data_types::{
|
|||
use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
|
||||
use entry::{Entry, SequencedEntry};
|
||||
use futures::{stream::BoxStream, StreamExt};
|
||||
use internal_types::schema::Schema;
|
||||
use metrics::KeyValue;
|
||||
use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
|
||||
use object_store::{path::parsed::DirsAndFileName, ObjectStore};
|
||||
|
@ -850,6 +851,10 @@ impl QueryDatabase for Db {
|
|||
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>> {
|
||||
self.catalog_access.chunk_summaries()
|
||||
}
|
||||
|
||||
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
|
||||
self.catalog_access.table_schema(table_name)
|
||||
}
|
||||
}
|
||||
|
||||
/// Convenience implementation of `CatalogProvider` so the rest of the
|
||||
|
@ -980,6 +985,7 @@ mod tests {
|
|||
convert::TryFrom,
|
||||
iter::Iterator,
|
||||
num::{NonZeroU64, NonZeroUsize},
|
||||
ops::Deref,
|
||||
str,
|
||||
time::{Duration, Instant},
|
||||
};
|
||||
|
@ -2831,16 +2837,9 @@ mod tests {
|
|||
assert_eq!(paths_actual, paths_expected);
|
||||
|
||||
// ==================== do: remember table schema ====================
|
||||
let mut table_schemas: HashMap<String, Schema> = Default::default();
|
||||
let mut table_schemas: HashMap<String, Arc<Schema>> = Default::default();
|
||||
for (table_name, _partition_key, _chunk_id) in &chunks {
|
||||
// TODO: use official `db.table_schema` interface later
|
||||
let schema = db
|
||||
.catalog
|
||||
.table(table_name)
|
||||
.unwrap()
|
||||
.schema()
|
||||
.read()
|
||||
.clone();
|
||||
let schema = db.table_schema(table_name).unwrap();
|
||||
table_schemas.insert(table_name.clone(), schema);
|
||||
}
|
||||
|
||||
|
@ -2868,16 +2867,9 @@ mod tests {
|
|||
}
|
||||
));
|
||||
}
|
||||
for (table_name, schema) in table_schemas {
|
||||
// TODO: use official `db.table_schema` interface later
|
||||
let schema2 = db
|
||||
.catalog
|
||||
.table(table_name)
|
||||
.unwrap()
|
||||
.schema()
|
||||
.read()
|
||||
.clone();
|
||||
assert_eq!(schema2, schema);
|
||||
for (table_name, schema) in &table_schemas {
|
||||
let schema2 = db.table_schema(table_name).unwrap();
|
||||
assert_eq!(schema2.deref(), schema.deref());
|
||||
}
|
||||
|
||||
// ==================== check: DB still writable ====================
|
||||
|
|
|
@ -11,16 +11,17 @@ use super::{
|
|||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use data_types::{chunk_metadata::ChunkSummary, error::ErrorLogger};
|
||||
use data_types::chunk_metadata::ChunkSummary;
|
||||
use datafusion::{
|
||||
catalog::{catalog::CatalogProvider, schema::SchemaProvider},
|
||||
datasource::TableProvider,
|
||||
};
|
||||
use internal_types::schema::Schema;
|
||||
use metrics::{Counter, KeyValue, MetricRegistry};
|
||||
use observability_deps::tracing::debug;
|
||||
use query::{
|
||||
predicate::{Predicate, PredicateBuilder},
|
||||
provider::{self, ChunkPruner, ProviderBuilder},
|
||||
provider::{ChunkPruner, ProviderBuilder},
|
||||
QueryChunk, QueryChunkMeta, DEFAULT_SCHEMA,
|
||||
};
|
||||
use system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA};
|
||||
|
@ -195,6 +196,13 @@ impl QueryDatabase for QueryCatalogAccess {
|
|||
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>> {
|
||||
Ok(self.catalog.chunk_summaries())
|
||||
}
|
||||
|
||||
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
|
||||
self.catalog
|
||||
.table(table_name)
|
||||
.ok()
|
||||
.map(|table| Arc::clone(&table.schema().read()))
|
||||
}
|
||||
}
|
||||
|
||||
// Datafusion catalog provider interface
|
||||
|
@ -249,27 +257,23 @@ impl SchemaProvider for DbSchemaProvider {
|
|||
|
||||
/// Create a table provider for the named table
|
||||
fn table(&self, table_name: &str) -> Option<Arc<dyn TableProvider>> {
|
||||
let mut builder = ProviderBuilder::new(table_name);
|
||||
let schema = {
|
||||
let table = self.catalog.table(table_name).ok()?;
|
||||
Arc::clone(&table.schema().read())
|
||||
};
|
||||
|
||||
let mut builder = ProviderBuilder::new(table_name, schema);
|
||||
builder =
|
||||
builder.add_pruner(Arc::clone(&self.chunk_access) as Arc<dyn ChunkPruner<DbChunk>>);
|
||||
|
||||
let predicate = PredicateBuilder::new().table(table_name).build();
|
||||
|
||||
for chunk in self.chunk_access.candidate_chunks(&predicate) {
|
||||
// This is unfortunate - a table with incompatible chunks ceases to
|
||||
// be visible to the query engine
|
||||
//
|
||||
// It is also potentially ill-formed as continuing to use the builder
|
||||
// after it has errored may not yield entirely sensible results
|
||||
builder = builder
|
||||
.add_chunk(chunk)
|
||||
.log_if_error("Adding chunks to table")
|
||||
.ok()?;
|
||||
builder = builder.add_chunk(chunk);
|
||||
}
|
||||
|
||||
match builder.build() {
|
||||
Ok(provider) => Some(Arc::new(provider)),
|
||||
Err(provider::Error::InternalNoRowsInTable { .. }) => None,
|
||||
Err(e) => panic!("unexpected error: {:?}", e),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -199,7 +199,7 @@ impl Catalog {
|
|||
&self,
|
||||
table_name: impl AsRef<str>,
|
||||
partition_key: impl AsRef<str>,
|
||||
) -> (Arc<RwLock<Partition>>, Arc<RwLock<Schema>>) {
|
||||
) -> (Arc<RwLock<Partition>>, Arc<RwLock<Arc<Schema>>>) {
|
||||
let mut tables = self.tables.write();
|
||||
let (_, table) = tables
|
||||
.raw_entry_mut()
|
||||
|
|
|
@ -271,7 +271,7 @@ impl CatalogChunk {
|
|||
pub(super) fn new_rub_chunk(
|
||||
addr: ChunkAddr,
|
||||
chunk: read_buffer::RBChunk,
|
||||
schema: Schema,
|
||||
schema: Arc<Schema>,
|
||||
metrics: ChunkMetrics,
|
||||
) -> Self {
|
||||
// TODO: Move RUB to single table (#1295)
|
||||
|
@ -282,7 +282,7 @@ impl CatalogChunk {
|
|||
let stage = ChunkStage::Frozen {
|
||||
meta: Arc::new(ChunkMetadata {
|
||||
table_summary: Arc::new(summary),
|
||||
schema: Arc::new(schema),
|
||||
schema,
|
||||
}),
|
||||
representation: ChunkStageFrozenRepr::ReadBuffer(Arc::new(chunk)),
|
||||
};
|
||||
|
@ -603,7 +603,7 @@ impl CatalogChunk {
|
|||
|
||||
/// Set the chunk in the Moved state, setting the underlying storage handle to db, and
|
||||
/// discarding the underlying mutable buffer storage.
|
||||
pub fn set_moved(&mut self, chunk: Arc<RBChunk>, schema: Schema) -> Result<()> {
|
||||
pub fn set_moved(&mut self, chunk: Arc<RBChunk>, schema: Arc<Schema>) -> Result<()> {
|
||||
match &mut self.stage {
|
||||
ChunkStage::Frozen {
|
||||
meta,
|
||||
|
@ -613,7 +613,7 @@ impl CatalogChunk {
|
|||
// after moved, the chunk is sorted and its schema needs to get updated
|
||||
*meta = Arc::new(ChunkMetadata {
|
||||
table_summary: Arc::clone(&meta.table_summary),
|
||||
schema: Arc::new(schema),
|
||||
schema,
|
||||
});
|
||||
|
||||
match &representation {
|
||||
|
|
|
@ -166,7 +166,7 @@ impl Partition {
|
|||
pub fn create_rub_chunk(
|
||||
&mut self,
|
||||
chunk: read_buffer::RBChunk,
|
||||
schema: Schema,
|
||||
schema: Arc<Schema>,
|
||||
) -> Arc<RwLock<CatalogChunk>> {
|
||||
let chunk_id = self.next_chunk_id;
|
||||
assert_ne!(self.next_chunk_id, u32::MAX, "Chunk ID Overflow");
|
||||
|
|
|
@ -26,7 +26,11 @@ pub struct Table {
|
|||
metrics: TableMetrics,
|
||||
|
||||
/// Table-wide schema.
|
||||
schema: Arc<RwLock<Schema>>,
|
||||
///
|
||||
/// Notes on the type:
|
||||
/// - the outer `Arc<RwLock<...>>` so so that we can reference the locked schema w/o a lifetime to the table
|
||||
/// - the inner `Arc<Schema>` is a schema that we don't need to copy when moving it around the query stack
|
||||
schema: Arc<RwLock<Arc<Schema>>>,
|
||||
}
|
||||
|
||||
impl Table {
|
||||
|
@ -40,7 +44,7 @@ impl Table {
|
|||
let mut builder = SchemaBuilder::new();
|
||||
builder.measurement(table_name.as_ref());
|
||||
let schema = builder.build().expect("cannot build empty schema");
|
||||
let schema = Arc::new(metrics.new_table_lock(schema));
|
||||
let schema = Arc::new(metrics.new_table_lock(Arc::new(schema)));
|
||||
|
||||
Self {
|
||||
db_name,
|
||||
|
@ -93,7 +97,7 @@ impl Table {
|
|||
self.partitions.values().map(|x| x.read().summary())
|
||||
}
|
||||
|
||||
pub fn schema(&self) -> Arc<RwLock<Schema>> {
|
||||
pub fn schema(&self) -> Arc<RwLock<Arc<Schema>>> {
|
||||
Arc::clone(&self.schema)
|
||||
}
|
||||
}
|
||||
|
@ -104,12 +108,12 @@ impl Table {
|
|||
enum TableSchemaUpsertHandleInner<'a> {
|
||||
/// Schema will not be changed.
|
||||
NoChange {
|
||||
table_schema_read: RwLockReadGuard<'a, Schema>,
|
||||
table_schema_read: RwLockReadGuard<'a, Arc<Schema>>,
|
||||
},
|
||||
|
||||
/// Schema might change (if write to mutable buffer is successfull).
|
||||
MightChange {
|
||||
table_schema_write: RwLockWriteGuard<'a, Schema>,
|
||||
table_schema_write: RwLockWriteGuard<'a, Arc<Schema>>,
|
||||
merged_schema: Schema,
|
||||
},
|
||||
}
|
||||
|
@ -122,7 +126,7 @@ pub struct TableSchemaUpsertHandle<'a> {
|
|||
|
||||
impl<'a> TableSchemaUpsertHandle<'a> {
|
||||
pub(crate) fn new(
|
||||
table_schema: &'a RwLock<Schema>,
|
||||
table_schema: &'a RwLock<Arc<Schema>>,
|
||||
new_schema: &Schema,
|
||||
) -> Result<Self, SchemaMergerError> {
|
||||
// Be optimistic and only get a read lock. It is rather rare that the schema will change when new data arrives
|
||||
|
@ -134,7 +138,7 @@ impl<'a> TableSchemaUpsertHandle<'a> {
|
|||
let merged_schema = Self::try_merge(&table_schema_read, new_schema)?;
|
||||
|
||||
// Now check if this would actually change the schema:
|
||||
if &merged_schema == table_schema_read.deref() {
|
||||
if &merged_schema == table_schema_read.deref().deref() {
|
||||
// Optimism payed off and we get away we the read lock.
|
||||
Ok(Self {
|
||||
inner: TableSchemaUpsertHandleInner::NoChange { table_schema_read },
|
||||
|
@ -181,7 +185,7 @@ impl<'a> TableSchemaUpsertHandle<'a> {
|
|||
merged_schema,
|
||||
} => {
|
||||
// Commit new schema and drop write guard;
|
||||
*table_schema_write = merged_schema;
|
||||
*table_schema_write = Arc::new(merged_schema);
|
||||
drop(table_schema_write);
|
||||
}
|
||||
}
|
||||
|
@ -204,7 +208,7 @@ mod tests {
|
|||
.influx_column("tag2", InfluxColumnType::Tag)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table_schema = lock_tracker.new_lock(table_schema_orig.clone());
|
||||
let table_schema = lock_tracker.new_lock(Arc::new(table_schema_orig.clone()));
|
||||
|
||||
// writing with the same schema must not trigger a change
|
||||
let schema1 = SchemaBuilder::new()
|
||||
|
@ -218,9 +222,9 @@ mod tests {
|
|||
handle.inner,
|
||||
TableSchemaUpsertHandleInner::NoChange { .. }
|
||||
));
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_orig);
|
||||
assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
|
||||
handle.commit();
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_orig);
|
||||
assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
|
||||
|
||||
// writing with different column order must not trigger a change
|
||||
let schema2 = SchemaBuilder::new()
|
||||
|
@ -234,9 +238,9 @@ mod tests {
|
|||
handle.inner,
|
||||
TableSchemaUpsertHandleInner::NoChange { .. }
|
||||
));
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_orig);
|
||||
assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
|
||||
handle.commit();
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_orig);
|
||||
assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
|
||||
|
||||
// writing with a column subset must not trigger a change
|
||||
let schema3 = SchemaBuilder::new()
|
||||
|
@ -249,9 +253,9 @@ mod tests {
|
|||
handle.inner,
|
||||
TableSchemaUpsertHandleInner::NoChange { .. }
|
||||
));
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_orig);
|
||||
assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
|
||||
handle.commit();
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_orig);
|
||||
assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -263,7 +267,7 @@ mod tests {
|
|||
.influx_column("tag2", InfluxColumnType::Tag)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table_schema = lock_tracker.new_lock(table_schema_orig);
|
||||
let table_schema = lock_tracker.new_lock(Arc::new(table_schema_orig));
|
||||
|
||||
let new_schema = SchemaBuilder::new()
|
||||
.measurement("m1")
|
||||
|
@ -288,7 +292,7 @@ mod tests {
|
|||
.influx_column("tag3", InfluxColumnType::Tag)
|
||||
.build()
|
||||
.unwrap();
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_expected);
|
||||
assert_eq!(table_schema.read().deref().deref(), &table_schema_expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -300,7 +304,7 @@ mod tests {
|
|||
.influx_column("tag2", InfluxColumnType::Tag)
|
||||
.build()
|
||||
.unwrap();
|
||||
let table_schema = lock_tracker.new_lock(table_schema_orig.clone());
|
||||
let table_schema = lock_tracker.new_lock(Arc::new(table_schema_orig.clone()));
|
||||
|
||||
let schema1 = SchemaBuilder::new()
|
||||
.measurement("m1")
|
||||
|
@ -311,6 +315,6 @@ mod tests {
|
|||
assert!(TableSchemaUpsertHandle::new(&table_schema, &schema1).is_err());
|
||||
|
||||
// schema did not change
|
||||
assert_eq!(table_schema.read().deref(), &table_schema_orig);
|
||||
assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ use std::future::Future;
|
|||
use std::sync::Arc;
|
||||
|
||||
use data_types::job::Job;
|
||||
use internal_types::schema::merge::SchemaMerger;
|
||||
use lifecycle::LifecycleWriteGuard;
|
||||
use observability_deps::tracing::info;
|
||||
use query::exec::ExecutorType;
|
||||
|
@ -58,6 +59,17 @@ pub(crate) fn compact_chunks(
|
|||
})
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
// build schema
|
||||
// Note: we only use the merged schema from the to-be-compacted chunks - not the table-wide schema, since we don't
|
||||
// need to bother with other columns (e.g. ones that only exist in other partitions).
|
||||
let mut merger = SchemaMerger::new();
|
||||
for db_chunk in &query_chunks {
|
||||
merger = merger
|
||||
.merge(&db_chunk.schema())
|
||||
.expect("schemas compatible");
|
||||
}
|
||||
let schema = Arc::new(merger.build());
|
||||
|
||||
// drop partition lock
|
||||
let partition = partition.unwrap().partition;
|
||||
|
||||
|
@ -79,7 +91,7 @@ pub(crate) fn compact_chunks(
|
|||
|
||||
// Cannot move query_chunks as the sort key borrows the column names
|
||||
let (schema, plan) =
|
||||
ReorgPlanner::new().compact_plan(query_chunks.iter().map(Arc::clone), key)?;
|
||||
ReorgPlanner::new().compact_plan(schema, query_chunks.iter().map(Arc::clone), key)?;
|
||||
|
||||
let physical_plan = ctx.prepare_plan(&plan)?;
|
||||
let stream = ctx.execute(physical_plan).await?;
|
||||
|
|
|
@ -37,7 +37,11 @@ pub fn move_chunk_to_read_buffer(
|
|||
let table_summary = guard.table_summary();
|
||||
|
||||
// snapshot the data
|
||||
let query_chunks = vec![DbChunk::snapshot(&*guard)];
|
||||
// Note: we can just use the chunk-specific schema here since there is only a single chunk and this is somewhat a
|
||||
// local operation that should only need to deal with the columns that are really present.
|
||||
let db_chunk = DbChunk::snapshot(&*guard);
|
||||
let schema = db_chunk.schema();
|
||||
let query_chunks = vec![db_chunk];
|
||||
|
||||
// Drop locks
|
||||
let chunk = guard.unwrap().chunk;
|
||||
|
@ -52,7 +56,7 @@ pub fn move_chunk_to_read_buffer(
|
|||
|
||||
// Cannot move query_chunks as the sort key borrows the column names
|
||||
let (schema, plan) =
|
||||
ReorgPlanner::new().compact_plan(query_chunks.iter().map(Arc::clone), key)?;
|
||||
ReorgPlanner::new().compact_plan(schema, query_chunks.iter().map(Arc::clone), key)?;
|
||||
|
||||
let physical_plan = ctx.prepare_plan(&plan)?;
|
||||
let stream = ctx.execute(physical_plan).await?;
|
||||
|
|
Loading…
Reference in New Issue