Merge pull request #1929 from influxdata/crepererum/fix_query_schema_unwrap

refactor: pass schema arcs from catalog to query engine (instead of creating them on-demand)
pull/24376/head
kodiakhq[bot] 2021-07-09 07:53:16 +00:00 committed by GitHub
commit 34dcd991d3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 292 additions and 145 deletions

View File

@ -16,7 +16,7 @@ use internal_types::{
selection::Selection,
};
use observability_deps::tracing::{debug, trace};
use snafu::{ensure, ResultExt, Snafu};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use crate::{
exec::{field::FieldColumns, make_schema_pivot},
@ -141,6 +141,9 @@ pub enum Error {
#[snafu(display("Internal error: aggregate {:?} is not a selector", agg))]
InternalAggregateNotSelector { agg: Aggregate },
#[snafu(display("Table was removed while planning query: {}", table_name))]
TableRemoved { table_name: String },
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -301,7 +304,10 @@ impl InfluxRpcPlanner {
// were already known to have data (based on the contents of known_columns)
for (table_name, chunks) in need_full_plans.into_iter() {
let plan = self.tag_keys_plan(&table_name, &predicate, chunks)?;
let schema = database.table_schema(&table_name).context(TableRemoved {
table_name: &table_name,
})?;
let plan = self.tag_keys_plan(&table_name, schema, &predicate, chunks)?;
if let Some(plan) = plan {
builder = builder.append(plan)
@ -422,7 +428,10 @@ impl InfluxRpcPlanner {
// time in `known_columns`, and some tables in chunks that we
// need to run a plan to find what values pass the predicate.
for (table_name, chunks) in need_full_plans.into_iter() {
let scan_and_filter = self.scan_and_filter(&table_name, &predicate, chunks)?;
let schema = database.table_schema(&table_name).context(TableRemoved {
table_name: &table_name,
})?;
let scan_and_filter = self.scan_and_filter(&table_name, schema, &predicate, chunks)?;
// if we have any data to scan, make a plan!
if let Some(TableScanAndFilter {
@ -483,7 +492,10 @@ impl InfluxRpcPlanner {
let mut field_list_plan = FieldListPlan::new();
for (table_name, chunks) in table_chunks {
if let Some(plan) = self.field_columns_plan(&table_name, &predicate, chunks)? {
let schema = database.table_schema(&table_name).context(TableRemoved {
table_name: &table_name,
})?;
if let Some(plan) = self.field_columns_plan(&table_name, schema, &predicate, chunks)? {
field_list_plan = field_list_plan.append(plan);
}
}
@ -524,8 +536,12 @@ impl InfluxRpcPlanner {
let mut ss_plans = Vec::with_capacity(table_chunks.len());
for (table_name, chunks) in table_chunks {
let prefix_columns: Option<&[&str]> = None;
let schema = database.table_schema(&table_name).context(TableRemoved {
table_name: &table_name,
})?;
let ss_plan = self.read_filter_plan(table_name, prefix_columns, &predicate, chunks)?;
let ss_plan =
self.read_filter_plan(table_name, schema, prefix_columns, &predicate, chunks)?;
// If we have to do real work, add it to the list of plans
if let Some(ss_plan) = ss_plan {
ss_plans.push(ss_plan);
@ -559,11 +575,25 @@ impl InfluxRpcPlanner {
// now, build up plans for each table
let mut ss_plans = Vec::with_capacity(table_chunks.len());
for (table_name, chunks) in table_chunks {
let schema = database.table_schema(&table_name).context(TableRemoved {
table_name: &table_name,
})?;
let ss_plan = match agg {
Aggregate::None => {
self.read_filter_plan(table_name, Some(group_columns), &predicate, chunks)?
}
_ => self.read_group_plan(table_name, &predicate, agg, group_columns, chunks)?,
Aggregate::None => self.read_filter_plan(
table_name,
Arc::clone(&schema),
Some(group_columns),
&predicate,
chunks,
)?,
_ => self.read_group_plan(
table_name,
schema,
&predicate,
agg,
group_columns,
chunks,
)?,
};
// If we have to do real work, add it to the list of plans
@ -604,8 +634,12 @@ impl InfluxRpcPlanner {
// now, build up plans for each table
let mut ss_plans = Vec::with_capacity(table_chunks.len());
for (table_name, chunks) in table_chunks {
let ss_plan = self
.read_window_aggregate_plan(table_name, &predicate, agg, &every, &offset, chunks)?;
let schema = database.table_schema(&table_name).context(TableRemoved {
table_name: &table_name,
})?;
let ss_plan = self.read_window_aggregate_plan(
table_name, schema, &predicate, agg, &every, &offset, chunks,
)?;
// If we have to do real work, add it to the list of plans
if let Some(ss_plan) = ss_plan {
ss_plans.push(ss_plan);
@ -665,13 +699,14 @@ impl InfluxRpcPlanner {
fn tag_keys_plan<C>(
&self,
table_name: &str,
schema: Arc<Schema>,
predicate: &Predicate,
chunks: Vec<Arc<C>>,
) -> Result<Option<StringSetPlan>>
where
C: QueryChunk + 'static,
{
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?;
let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
let TableScanAndFilter {
plan_builder,
@ -727,13 +762,14 @@ impl InfluxRpcPlanner {
fn field_columns_plan<C>(
&self,
table_name: &str,
schema: Arc<Schema>,
predicate: &Predicate,
chunks: Vec<Arc<C>>,
) -> Result<Option<LogicalPlan>>
where
C: QueryChunk + 'static,
{
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?;
let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
let TableScanAndFilter {
plan_builder,
schema,
@ -777,6 +813,7 @@ impl InfluxRpcPlanner {
fn read_filter_plan<C>(
&self,
table_name: impl AsRef<str>,
schema: Arc<Schema>,
prefix_columns: Option<&[impl AsRef<str>]>,
predicate: &Predicate,
chunks: Vec<Arc<C>>,
@ -785,7 +822,7 @@ impl InfluxRpcPlanner {
C: QueryChunk + 'static,
{
let table_name = table_name.as_ref();
let scan_and_filter = self.scan_and_filter(table_name, predicate, chunks)?;
let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
let TableScanAndFilter {
plan_builder,
@ -897,6 +934,7 @@ impl InfluxRpcPlanner {
fn read_group_plan<C>(
&self,
table_name: impl Into<String>,
schema: Arc<Schema>,
predicate: &Predicate,
agg: Aggregate,
group_columns: &[impl AsRef<str>],
@ -906,7 +944,7 @@ impl InfluxRpcPlanner {
C: QueryChunk + 'static,
{
let table_name = table_name.into();
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?;
let scan_and_filter = self.scan_and_filter(&table_name, schema, predicate, chunks)?;
let TableScanAndFilter {
plan_builder,
@ -984,9 +1022,11 @@ impl InfluxRpcPlanner {
/// GroupBy(gby: tag columns, window_function; agg: aggregate(field))
/// Filter(predicate)
/// Scan
#[allow(clippy::too_many_arguments)]
fn read_window_aggregate_plan<C>(
&self,
table_name: impl Into<String>,
schema: Arc<Schema>,
predicate: &Predicate,
agg: Aggregate,
every: &WindowDuration,
@ -997,7 +1037,7 @@ impl InfluxRpcPlanner {
C: QueryChunk + 'static,
{
let table_name = table_name.into();
let scan_and_filter = self.scan_and_filter(&table_name, predicate, chunks)?;
let scan_and_filter = self.scan_and_filter(&table_name, schema, predicate, chunks)?;
let TableScanAndFilter {
plan_builder,
@ -1074,6 +1114,7 @@ impl InfluxRpcPlanner {
fn scan_and_filter<C>(
&self,
table_name: &str,
schema: Arc<Schema>,
predicate: &Predicate,
chunks: Vec<Arc<C>>,
) -> Result<Option<TableScanAndFilter>>
@ -1085,7 +1126,7 @@ impl InfluxRpcPlanner {
let projection = None;
// Prepare the scan of the table
let mut builder = ProviderBuilder::new(table_name);
let mut builder = ProviderBuilder::new(table_name, schema);
// Since the entire predicate is used in the call to
// `database.chunks()` there will not be any additional
@ -1106,9 +1147,7 @@ impl InfluxRpcPlanner {
chunk.id(),
);
builder = builder
.add_chunk(chunk)
.context(CreatingProvider { table_name })?;
builder = builder.add_chunk(chunk);
}
let provider = builder.build().context(CreatingProvider { table_name })?;
@ -1217,7 +1256,7 @@ struct TableScanAndFilter {
/// Represents plan that scans a table and applies optional filtering
plan_builder: LogicalPlanBuilder,
/// The IOx schema of the result
schema: Schema,
schema: Arc<Schema>,
}
/// Reorders tag_columns so that its prefix matches exactly

View File

@ -63,9 +63,10 @@ impl ReorgPlanner {
/// (Scan chunks) <-- any needed deduplication happens here
pub fn compact_plan<C, I>(
&self,
schema: Arc<Schema>,
chunks: I,
output_sort: SortKey<'_>,
) -> Result<(Schema, LogicalPlan)>
) -> Result<(Arc<Schema>, LogicalPlan)>
where
C: QueryChunk + 'static,
I: IntoIterator<Item = Arc<C>>,
@ -73,13 +74,21 @@ impl ReorgPlanner {
let ScanPlan {
plan_builder,
provider,
} = self.scan_and_sort_plan(chunks, output_sort.clone())?;
} = self.scan_and_sort_plan(schema, chunks, output_sort.clone())?;
let mut schema = provider.iox_schema();
// Set the sort_key of the schema to the compacted chunk's sort key
// Try to do this only if the sort key changes so we avoid unnecessary schema copies.
trace!(input_schema=?schema, "Setting sort key on schema");
schema.set_sort_key(&output_sort);
if schema
.sort_key()
.map_or(true, |existing_key| existing_key != output_sort)
{
let mut schema_cloned = schema.as_ref().clone();
schema_cloned.set_sort_key(&output_sort);
schema = Arc::new(schema_cloned);
}
trace!(output_schema=?schema, "Setting sort key on schema");
let plan = plan_builder.build().context(BuildingPlan)?;
@ -136,10 +145,11 @@ impl ReorgPlanner {
/// ```
pub fn split_plan<C, I>(
&self,
schema: Arc<Schema>,
chunks: I,
output_sort: SortKey<'_>,
split_time: i64,
) -> Result<(Schema, LogicalPlan)>
) -> Result<(Arc<Schema>, LogicalPlan)>
where
C: QueryChunk + 'static,
I: IntoIterator<Item = Arc<C>>,
@ -147,7 +157,7 @@ impl ReorgPlanner {
let ScanPlan {
plan_builder,
provider,
} = self.scan_and_sort_plan(chunks, output_sort)?;
} = self.scan_and_sort_plan(schema, chunks, output_sort)?;
// TODO: Set sort key on schema
let schema = provider.iox_schema();
@ -176,7 +186,12 @@ impl ReorgPlanner {
///
/// (Sort on output_sort)
/// (Scan chunks) <-- any needed deduplication happens here
fn scan_and_sort_plan<C, I>(&self, chunks: I, output_sort: SortKey<'_>) -> Result<ScanPlan<C>>
fn scan_and_sort_plan<C, I>(
&self,
schema: Arc<Schema>,
chunks: I,
output_sort: SortKey<'_>,
) -> Result<ScanPlan<C>>
where
C: QueryChunk + 'static,
I: IntoIterator<Item = Arc<C>>,
@ -189,7 +204,7 @@ impl ReorgPlanner {
let table_name = &table_name;
// Prepare the plan for the table
let mut builder = ProviderBuilder::new(table_name);
let mut builder = ProviderBuilder::new(table_name, schema);
// There are no predicates in these plans, so no need to prune them
builder = builder.add_no_op_pruner();
@ -203,9 +218,7 @@ impl ReorgPlanner {
chunk.id(),
);
builder = builder
.add_chunk(chunk)
.context(CreatingProvider { table_name })?;
builder = builder.add_chunk(chunk);
}
let provider = builder.build().context(CreatingProvider { table_name })?;
@ -244,16 +257,17 @@ struct ScanPlan<C: QueryChunk + 'static> {
#[cfg(test)]
mod test {
use arrow_util::assert_batches_eq;
use internal_types::schema::sort::SortOptions;
use internal_types::schema::{merge::SchemaMerger, sort::SortOptions};
use crate::{
exec::{Executor, ExecutorType},
test::{raw_data, TestChunk},
QueryChunkMeta,
};
use super::*;
async fn get_test_chunks() -> Vec<Arc<TestChunk>> {
async fn get_test_chunks() -> (Arc<Schema>, Vec<Arc<TestChunk>>) {
// Chunk 1 with 5 rows of data on 2 tags
let chunk1 = Arc::new(
TestChunk::new(1)
@ -298,12 +312,19 @@ mod test {
];
assert_batches_eq!(&expected, &raw_data(&[Arc::clone(&chunk2)]).await);
vec![chunk1, chunk2]
let schema = SchemaMerger::new()
.merge(&chunk1.schema())
.unwrap()
.merge(&chunk2.schema())
.unwrap()
.build();
(Arc::new(schema), vec![chunk1, chunk2])
}
#[tokio::test]
async fn test_compact_plan() {
let chunks = get_test_chunks().await;
let (schema, chunks) = get_test_chunks().await;
let mut sort_key = SortKey::with_capacity(2);
sort_key.push(
@ -322,7 +343,7 @@ mod test {
);
let (_, compact_plan) = ReorgPlanner::new()
.compact_plan(chunks, sort_key)
.compact_plan(schema, chunks, sort_key)
.expect("created compact plan");
let executor = Executor::new(1);
@ -363,7 +384,7 @@ mod test {
test_helpers::maybe_start_logging();
// validate that the plumbing is all hooked up. The logic of
// the operator is tested in its own module.
let chunks = get_test_chunks().await;
let (schema, chunks) = get_test_chunks().await;
let mut sort_key = SortKey::with_capacity(1);
sort_key.push(
@ -376,7 +397,7 @@ mod test {
// split on 1000 should have timestamps 1000, 5000, and 7000
let (_, split_plan) = ReorgPlanner::new()
.split_plan(chunks, sort_key, 1000)
.split_plan(schema, chunks, sort_key, 1000)
.expect("created compact plan");
let executor = Executor::new(1);

View File

@ -55,6 +55,9 @@ pub trait QueryDatabase: Debug + Send + Sync {
/// Return the partition keys for data in this DB
fn partition_keys(&self) -> Result<Vec<String>, Self::Error>;
/// Schema for a specific table if the table exists.
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>>;
/// Returns a set of chunks within the partition with data that may match
/// the provided predicate. If possible, chunks which have no rows that can
/// possibly match the predicate may be omitted.

View File

@ -40,26 +40,12 @@ use self::{
#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Chunk schema not compatible for table '{}': {}", table_name, source))]
ChunkSchemaNotCompatible {
table_name: String,
source: internal_types::schema::merge::Error,
},
#[snafu(display(
"Internal error: no chunk pruner provided to builder for {}",
table_name,
))]
InternalNoChunkPruner { table_name: String },
#[snafu(display("Internal error: No rows found in table '{}'", table_name))]
InternalNoRowsInTable { table_name: String },
#[snafu(display("Internal error: Cannot verify the push-down predicate '{}'", source,))]
InternalPushdownPredicate {
source: datafusion::error::DataFusionError,
},
#[snafu(display("Internal error: Cannot create projection select expr '{}'", source,))]
InternalSelectExpr {
source: datafusion::error::DataFusionError,
@ -106,35 +92,25 @@ pub trait ChunkPruner<C: QueryChunk>: Sync + Send + std::fmt::Debug {
#[derive(Debug)]
pub struct ProviderBuilder<C: QueryChunk + 'static> {
table_name: Arc<str>,
schema_merger: SchemaMerger,
schema: Arc<Schema>,
chunk_pruner: Option<Arc<dyn ChunkPruner<C>>>,
chunks: Vec<Arc<C>>,
}
impl<C: QueryChunk> ProviderBuilder<C> {
pub fn new(table_name: impl AsRef<str>) -> Self {
pub fn new(table_name: impl AsRef<str>, schema: Arc<Schema>) -> Self {
Self {
table_name: Arc::from(table_name.as_ref()),
schema_merger: SchemaMerger::new(),
schema,
chunk_pruner: None,
chunks: Vec::new(),
}
}
/// Add a new chunk to this provider
pub fn add_chunk(mut self, chunk: Arc<C>) -> Result<Self> {
let chunk_table_schema = chunk.schema();
self.schema_merger = self
.schema_merger
.merge(&chunk_table_schema.as_ref())
.context(ChunkSchemaNotCompatible {
table_name: self.table_name.as_ref(),
})?;
pub fn add_chunk(mut self, chunk: Arc<C>) -> Self {
self.chunks.push(chunk);
Ok(self)
self
}
/// Specify a `ChunkPruner` for the provider that will apply
@ -161,16 +137,6 @@ impl<C: QueryChunk> ProviderBuilder<C> {
/// Create the Provider
pub fn build(self) -> Result<ChunkTableProvider<C>> {
let iox_schema = self.schema_merger.build();
// if the table was reported to exist, it should not be empty
if self.chunks.is_empty() {
return InternalNoRowsInTable {
table_name: self.table_name.as_ref(),
}
.fail();
}
let chunk_pruner = match self.chunk_pruner {
Some(chunk_pruner) => chunk_pruner,
None => {
@ -182,7 +148,7 @@ impl<C: QueryChunk> ProviderBuilder<C> {
};
Ok(ChunkTableProvider {
iox_schema,
iox_schema: self.schema,
chunk_pruner,
table_name: self.table_name,
chunks: self.chunks,
@ -198,7 +164,7 @@ impl<C: QueryChunk> ProviderBuilder<C> {
pub struct ChunkTableProvider<C: QueryChunk + 'static> {
table_name: Arc<str>,
/// The IOx schema (wrapper around Arrow Schemaref) for this table
iox_schema: Schema,
iox_schema: Arc<Schema>,
/// Something that can prune chunks
chunk_pruner: Arc<dyn ChunkPruner<C>>,
// The chunks
@ -207,8 +173,8 @@ pub struct ChunkTableProvider<C: QueryChunk + 'static> {
impl<C: QueryChunk + 'static> ChunkTableProvider<C> {
/// Return the IOx schema view for the data provided by this provider
pub fn iox_schema(&self) -> Schema {
self.iox_schema.clone()
pub fn iox_schema(&self) -> Arc<Schema> {
Arc::clone(&self.iox_schema)
}
/// Return the Arrow schema view for the data provided by this provider
@ -255,8 +221,8 @@ impl<C: QueryChunk + 'static> TableProvider for ChunkTableProvider<C> {
// Figure out the schema of the requested output
let scan_schema = match projection {
Some(indicies) => self.iox_schema.select_by_indices(indicies),
None => self.iox_schema.clone(),
Some(indicies) => Arc::new(self.iox_schema.select_by_indices(indicies)),
None => Arc::clone(&self.iox_schema),
};
// This debug shows the self.arrow_schema() includes all columns in all chunks
@ -266,7 +232,7 @@ impl<C: QueryChunk + 'static> TableProvider for ChunkTableProvider<C> {
let mut deduplicate = Deduplicater::new();
let plan = deduplicate.build_scan_plan(
Arc::clone(&self.table_name),
Arc::new(scan_schema),
scan_schema,
chunks,
predicate,
)?;

View File

@ -116,6 +116,23 @@ impl QueryDatabase for TestDatabase {
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>, Self::Error> {
unimplemented!("summaries not implemented TestDatabase")
}
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
let mut merger = SchemaMerger::new();
let mut found_one = false;
let partitions = self.partitions.lock();
for partition in partitions.values() {
for chunk in partition.values() {
if chunk.table_name() == table_name {
merger = merger.merge(&chunk.schema()).expect("consistent schemas");
found_one = true;
}
}
}
found_one.then(|| Arc::new(merger.build()))
}
}
#[derive(Debug, Default)]

View File

@ -0,0 +1,25 @@
-- Test Setup: OneMeasurementAllChunksDropped
-- SQL: SELECT * from information_schema.tables;
+---------------+--------------------+---------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+---------------+------------+
| public | iox | h2o | BASE TABLE |
| public | system | chunks | BASE TABLE |
| public | system | columns | BASE TABLE |
| public | system | chunk_columns | BASE TABLE |
| public | system | operations | BASE TABLE |
| public | information_schema | tables | VIEW |
| public | information_schema | columns | VIEW |
+---------------+--------------------+---------------+------------+
-- SQL: SHOW TABLES;
+---------------+--------------------+---------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+---------------+------------+
| public | iox | h2o | BASE TABLE |
| public | system | chunks | BASE TABLE |
| public | system | columns | BASE TABLE |
| public | system | chunk_columns | BASE TABLE |
| public | system | operations | BASE TABLE |
| public | information_schema | tables | VIEW |
| public | information_schema | columns | VIEW |
+---------------+--------------------+---------------+------------+

View File

@ -0,0 +1,8 @@
-- Test for predicate push down explains
-- IOX_SETUP: OneMeasurementAllChunksDropped
-- list information schema
SELECT * from information_schema.tables;
-- same but shorter
SHOW TABLES;

View File

@ -4,6 +4,20 @@
use std::path::Path;
use crate::runner::Runner;
#[tokio::test]
// Tests from "all_chunks_dropped.sql",
async fn test_cases_all_chunks_dropped_sql() {
let input_path = Path::new("cases").join("in").join("all_chunks_dropped.sql");
let mut runner = Runner::new();
runner
.run(input_path)
.await
.expect("test failed");
runner
.flush()
.expect("flush worked");
}
#[tokio::test]
// Tests from "duplicates.sql",
async fn test_cases_duplicates_sql() {

View File

@ -294,6 +294,8 @@ impl<W: Write> Runner<W> {
}
/// Return output path for input path.
///
/// This converts `some/prefix/in/foo.sql` (or other file extensions) to `some/prefix/out/foo.out`.
fn make_output_path(input: &Path) -> Result<PathBuf> {
let stem = input.file_stem().context(NoFileStem { path: input })?;
@ -306,6 +308,10 @@ fn make_output_path(input: &Path) -> Result<PathBuf> {
out.push("out");
// set file name and ext
// The PathBuf API is somewhat confusing: `set_file_name` will replace the last component (which at this point is
// the "out"). However we wanna create a file out of the stem and the extension. So as a somewhat hackish
// workaround first push a placeholder that is then replaced.
out.push("placeholder");
out.set_file_name(stem);
out.set_extension("out");
@ -417,8 +423,11 @@ SELECT * from disk;
let in_dir = dir.path().join("in");
std::fs::create_dir(&in_dir).expect("create in-dir");
let out_dir = dir.path().join("out");
std::fs::create_dir(&out_dir).expect("create out-dir");
let mut file = in_dir;
file.set_file_name("foo.sql");
file.push("foo.sql");
std::fs::write(&file, contents).expect("writing data to temp file");
(dir, file)

View File

@ -49,6 +49,7 @@ pub fn get_all_setups() -> &'static HashMap<String, Arc<dyn DbSetup>> {
register_setup!(TwoMeasurements),
register_setup!(TwoMeasurementsPredicatePushDown),
register_setup!(OneMeasurementThreeChunksWithDuplicates),
register_setup!(OneMeasurementAllChunksDropped),
]
.into_iter()
.map(|(name, setup)| (name.to_string(), setup as Arc<dyn DbSetup>))
@ -676,6 +677,34 @@ pub(crate) async fn make_one_chunk_rub_scenario(
vec![scenario]
}
/// This creates two chunks but then drops them all. This should keep the tables.
#[derive(Debug)]
pub struct OneMeasurementAllChunksDropped {}
#[async_trait]
impl DbSetup for OneMeasurementAllChunksDropped {
async fn make(&self) -> Vec<DbScenario> {
let db = make_db().await.db;
let partition_key = "1970-01-01T00";
let table_name = "h2o";
let lp_lines = vec!["h2o,state=MA temp=70.4 50"];
write_lp(&db, &lp_lines.join("\n")).await;
db.rollover_partition(table_name, partition_key)
.await
.unwrap();
db.move_chunk_to_read_buffer(table_name, partition_key, 0)
.await
.unwrap();
db.drop_chunk(table_name, partition_key, 0).unwrap();
vec![DbScenario {
scenario_name: "one measurement but all chunks are dropped".into(),
db,
}]
}
}
/// This function loads one chunk of lp data into different scenarios that simulates
/// the data life cycle.
///

View File

@ -26,6 +26,7 @@ use data_types::{
use datafusion::catalog::{catalog::CatalogProvider, schema::SchemaProvider};
use entry::{Entry, SequencedEntry};
use futures::{stream::BoxStream, StreamExt};
use internal_types::schema::Schema;
use metrics::KeyValue;
use mutable_buffer::chunk::{ChunkMetrics as MutableBufferChunkMetrics, MBChunk};
use object_store::{path::parsed::DirsAndFileName, ObjectStore};
@ -850,6 +851,10 @@ impl QueryDatabase for Db {
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>> {
self.catalog_access.chunk_summaries()
}
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
self.catalog_access.table_schema(table_name)
}
}
/// Convenience implementation of `CatalogProvider` so the rest of the
@ -980,6 +985,7 @@ mod tests {
convert::TryFrom,
iter::Iterator,
num::{NonZeroU64, NonZeroUsize},
ops::Deref,
str,
time::{Duration, Instant},
};
@ -2831,16 +2837,9 @@ mod tests {
assert_eq!(paths_actual, paths_expected);
// ==================== do: remember table schema ====================
let mut table_schemas: HashMap<String, Schema> = Default::default();
let mut table_schemas: HashMap<String, Arc<Schema>> = Default::default();
for (table_name, _partition_key, _chunk_id) in &chunks {
// TODO: use official `db.table_schema` interface later
let schema = db
.catalog
.table(table_name)
.unwrap()
.schema()
.read()
.clone();
let schema = db.table_schema(table_name).unwrap();
table_schemas.insert(table_name.clone(), schema);
}
@ -2868,16 +2867,9 @@ mod tests {
}
));
}
for (table_name, schema) in table_schemas {
// TODO: use official `db.table_schema` interface later
let schema2 = db
.catalog
.table(table_name)
.unwrap()
.schema()
.read()
.clone();
assert_eq!(schema2, schema);
for (table_name, schema) in &table_schemas {
let schema2 = db.table_schema(table_name).unwrap();
assert_eq!(schema2.deref(), schema.deref());
}
// ==================== check: DB still writable ====================

View File

@ -11,16 +11,17 @@ use super::{
};
use async_trait::async_trait;
use data_types::{chunk_metadata::ChunkSummary, error::ErrorLogger};
use data_types::chunk_metadata::ChunkSummary;
use datafusion::{
catalog::{catalog::CatalogProvider, schema::SchemaProvider},
datasource::TableProvider,
};
use internal_types::schema::Schema;
use metrics::{Counter, KeyValue, MetricRegistry};
use observability_deps::tracing::debug;
use query::{
predicate::{Predicate, PredicateBuilder},
provider::{self, ChunkPruner, ProviderBuilder},
provider::{ChunkPruner, ProviderBuilder},
QueryChunk, QueryChunkMeta, DEFAULT_SCHEMA,
};
use system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA};
@ -195,6 +196,13 @@ impl QueryDatabase for QueryCatalogAccess {
fn chunk_summaries(&self) -> Result<Vec<ChunkSummary>> {
Ok(self.catalog.chunk_summaries())
}
fn table_schema(&self, table_name: &str) -> Option<Arc<Schema>> {
self.catalog
.table(table_name)
.ok()
.map(|table| Arc::clone(&table.schema().read()))
}
}
// Datafusion catalog provider interface
@ -249,27 +257,23 @@ impl SchemaProvider for DbSchemaProvider {
/// Create a table provider for the named table
fn table(&self, table_name: &str) -> Option<Arc<dyn TableProvider>> {
let mut builder = ProviderBuilder::new(table_name);
let schema = {
let table = self.catalog.table(table_name).ok()?;
Arc::clone(&table.schema().read())
};
let mut builder = ProviderBuilder::new(table_name, schema);
builder =
builder.add_pruner(Arc::clone(&self.chunk_access) as Arc<dyn ChunkPruner<DbChunk>>);
let predicate = PredicateBuilder::new().table(table_name).build();
for chunk in self.chunk_access.candidate_chunks(&predicate) {
// This is unfortunate - a table with incompatible chunks ceases to
// be visible to the query engine
//
// It is also potentially ill-formed as continuing to use the builder
// after it has errored may not yield entirely sensible results
builder = builder
.add_chunk(chunk)
.log_if_error("Adding chunks to table")
.ok()?;
builder = builder.add_chunk(chunk);
}
match builder.build() {
Ok(provider) => Some(Arc::new(provider)),
Err(provider::Error::InternalNoRowsInTable { .. }) => None,
Err(e) => panic!("unexpected error: {:?}", e),
}
}

View File

@ -199,7 +199,7 @@ impl Catalog {
&self,
table_name: impl AsRef<str>,
partition_key: impl AsRef<str>,
) -> (Arc<RwLock<Partition>>, Arc<RwLock<Schema>>) {
) -> (Arc<RwLock<Partition>>, Arc<RwLock<Arc<Schema>>>) {
let mut tables = self.tables.write();
let (_, table) = tables
.raw_entry_mut()

View File

@ -271,7 +271,7 @@ impl CatalogChunk {
pub(super) fn new_rub_chunk(
addr: ChunkAddr,
chunk: read_buffer::RBChunk,
schema: Schema,
schema: Arc<Schema>,
metrics: ChunkMetrics,
) -> Self {
// TODO: Move RUB to single table (#1295)
@ -282,7 +282,7 @@ impl CatalogChunk {
let stage = ChunkStage::Frozen {
meta: Arc::new(ChunkMetadata {
table_summary: Arc::new(summary),
schema: Arc::new(schema),
schema,
}),
representation: ChunkStageFrozenRepr::ReadBuffer(Arc::new(chunk)),
};
@ -603,7 +603,7 @@ impl CatalogChunk {
/// Set the chunk in the Moved state, setting the underlying storage handle to db, and
/// discarding the underlying mutable buffer storage.
pub fn set_moved(&mut self, chunk: Arc<RBChunk>, schema: Schema) -> Result<()> {
pub fn set_moved(&mut self, chunk: Arc<RBChunk>, schema: Arc<Schema>) -> Result<()> {
match &mut self.stage {
ChunkStage::Frozen {
meta,
@ -613,7 +613,7 @@ impl CatalogChunk {
// after moved, the chunk is sorted and its schema needs to get updated
*meta = Arc::new(ChunkMetadata {
table_summary: Arc::clone(&meta.table_summary),
schema: Arc::new(schema),
schema,
});
match &representation {

View File

@ -166,7 +166,7 @@ impl Partition {
pub fn create_rub_chunk(
&mut self,
chunk: read_buffer::RBChunk,
schema: Schema,
schema: Arc<Schema>,
) -> Arc<RwLock<CatalogChunk>> {
let chunk_id = self.next_chunk_id;
assert_ne!(self.next_chunk_id, u32::MAX, "Chunk ID Overflow");

View File

@ -26,7 +26,11 @@ pub struct Table {
metrics: TableMetrics,
/// Table-wide schema.
schema: Arc<RwLock<Schema>>,
///
/// Notes on the type:
/// - the outer `Arc<RwLock<...>>` so so that we can reference the locked schema w/o a lifetime to the table
/// - the inner `Arc<Schema>` is a schema that we don't need to copy when moving it around the query stack
schema: Arc<RwLock<Arc<Schema>>>,
}
impl Table {
@ -40,7 +44,7 @@ impl Table {
let mut builder = SchemaBuilder::new();
builder.measurement(table_name.as_ref());
let schema = builder.build().expect("cannot build empty schema");
let schema = Arc::new(metrics.new_table_lock(schema));
let schema = Arc::new(metrics.new_table_lock(Arc::new(schema)));
Self {
db_name,
@ -93,7 +97,7 @@ impl Table {
self.partitions.values().map(|x| x.read().summary())
}
pub fn schema(&self) -> Arc<RwLock<Schema>> {
pub fn schema(&self) -> Arc<RwLock<Arc<Schema>>> {
Arc::clone(&self.schema)
}
}
@ -104,12 +108,12 @@ impl Table {
enum TableSchemaUpsertHandleInner<'a> {
/// Schema will not be changed.
NoChange {
table_schema_read: RwLockReadGuard<'a, Schema>,
table_schema_read: RwLockReadGuard<'a, Arc<Schema>>,
},
/// Schema might change (if write to mutable buffer is successfull).
MightChange {
table_schema_write: RwLockWriteGuard<'a, Schema>,
table_schema_write: RwLockWriteGuard<'a, Arc<Schema>>,
merged_schema: Schema,
},
}
@ -122,7 +126,7 @@ pub struct TableSchemaUpsertHandle<'a> {
impl<'a> TableSchemaUpsertHandle<'a> {
pub(crate) fn new(
table_schema: &'a RwLock<Schema>,
table_schema: &'a RwLock<Arc<Schema>>,
new_schema: &Schema,
) -> Result<Self, SchemaMergerError> {
// Be optimistic and only get a read lock. It is rather rare that the schema will change when new data arrives
@ -134,7 +138,7 @@ impl<'a> TableSchemaUpsertHandle<'a> {
let merged_schema = Self::try_merge(&table_schema_read, new_schema)?;
// Now check if this would actually change the schema:
if &merged_schema == table_schema_read.deref() {
if &merged_schema == table_schema_read.deref().deref() {
// Optimism payed off and we get away we the read lock.
Ok(Self {
inner: TableSchemaUpsertHandleInner::NoChange { table_schema_read },
@ -181,7 +185,7 @@ impl<'a> TableSchemaUpsertHandle<'a> {
merged_schema,
} => {
// Commit new schema and drop write guard;
*table_schema_write = merged_schema;
*table_schema_write = Arc::new(merged_schema);
drop(table_schema_write);
}
}
@ -204,7 +208,7 @@ mod tests {
.influx_column("tag2", InfluxColumnType::Tag)
.build()
.unwrap();
let table_schema = lock_tracker.new_lock(table_schema_orig.clone());
let table_schema = lock_tracker.new_lock(Arc::new(table_schema_orig.clone()));
// writing with the same schema must not trigger a change
let schema1 = SchemaBuilder::new()
@ -218,9 +222,9 @@ mod tests {
handle.inner,
TableSchemaUpsertHandleInner::NoChange { .. }
));
assert_eq!(table_schema.read().deref(), &table_schema_orig);
assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
handle.commit();
assert_eq!(table_schema.read().deref(), &table_schema_orig);
assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
// writing with different column order must not trigger a change
let schema2 = SchemaBuilder::new()
@ -234,9 +238,9 @@ mod tests {
handle.inner,
TableSchemaUpsertHandleInner::NoChange { .. }
));
assert_eq!(table_schema.read().deref(), &table_schema_orig);
assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
handle.commit();
assert_eq!(table_schema.read().deref(), &table_schema_orig);
assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
// writing with a column subset must not trigger a change
let schema3 = SchemaBuilder::new()
@ -249,9 +253,9 @@ mod tests {
handle.inner,
TableSchemaUpsertHandleInner::NoChange { .. }
));
assert_eq!(table_schema.read().deref(), &table_schema_orig);
assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
handle.commit();
assert_eq!(table_schema.read().deref(), &table_schema_orig);
assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
}
#[test]
@ -263,7 +267,7 @@ mod tests {
.influx_column("tag2", InfluxColumnType::Tag)
.build()
.unwrap();
let table_schema = lock_tracker.new_lock(table_schema_orig);
let table_schema = lock_tracker.new_lock(Arc::new(table_schema_orig));
let new_schema = SchemaBuilder::new()
.measurement("m1")
@ -288,7 +292,7 @@ mod tests {
.influx_column("tag3", InfluxColumnType::Tag)
.build()
.unwrap();
assert_eq!(table_schema.read().deref(), &table_schema_expected);
assert_eq!(table_schema.read().deref().deref(), &table_schema_expected);
}
#[test]
@ -300,7 +304,7 @@ mod tests {
.influx_column("tag2", InfluxColumnType::Tag)
.build()
.unwrap();
let table_schema = lock_tracker.new_lock(table_schema_orig.clone());
let table_schema = lock_tracker.new_lock(Arc::new(table_schema_orig.clone()));
let schema1 = SchemaBuilder::new()
.measurement("m1")
@ -311,6 +315,6 @@ mod tests {
assert!(TableSchemaUpsertHandle::new(&table_schema, &schema1).is_err());
// schema did not change
assert_eq!(table_schema.read().deref(), &table_schema_orig);
assert_eq!(table_schema.read().deref().deref(), &table_schema_orig);
}
}

View File

@ -4,6 +4,7 @@ use std::future::Future;
use std::sync::Arc;
use data_types::job::Job;
use internal_types::schema::merge::SchemaMerger;
use lifecycle::LifecycleWriteGuard;
use observability_deps::tracing::info;
use query::exec::ExecutorType;
@ -58,6 +59,17 @@ pub(crate) fn compact_chunks(
})
.collect::<Result<Vec<_>>>()?;
// build schema
// Note: we only use the merged schema from the to-be-compacted chunks - not the table-wide schema, since we don't
// need to bother with other columns (e.g. ones that only exist in other partitions).
let mut merger = SchemaMerger::new();
for db_chunk in &query_chunks {
merger = merger
.merge(&db_chunk.schema())
.expect("schemas compatible");
}
let schema = Arc::new(merger.build());
// drop partition lock
let partition = partition.unwrap().partition;
@ -79,7 +91,7 @@ pub(crate) fn compact_chunks(
// Cannot move query_chunks as the sort key borrows the column names
let (schema, plan) =
ReorgPlanner::new().compact_plan(query_chunks.iter().map(Arc::clone), key)?;
ReorgPlanner::new().compact_plan(schema, query_chunks.iter().map(Arc::clone), key)?;
let physical_plan = ctx.prepare_plan(&plan)?;
let stream = ctx.execute(physical_plan).await?;

View File

@ -37,7 +37,11 @@ pub fn move_chunk_to_read_buffer(
let table_summary = guard.table_summary();
// snapshot the data
let query_chunks = vec![DbChunk::snapshot(&*guard)];
// Note: we can just use the chunk-specific schema here since there is only a single chunk and this is somewhat a
// local operation that should only need to deal with the columns that are really present.
let db_chunk = DbChunk::snapshot(&*guard);
let schema = db_chunk.schema();
let query_chunks = vec![db_chunk];
// Drop locks
let chunk = guard.unwrap().chunk;
@ -52,7 +56,7 @@ pub fn move_chunk_to_read_buffer(
// Cannot move query_chunks as the sort key borrows the column names
let (schema, plan) =
ReorgPlanner::new().compact_plan(query_chunks.iter().map(Arc::clone), key)?;
ReorgPlanner::new().compact_plan(schema, query_chunks.iter().map(Arc::clone), key)?;
let physical_plan = ctx.prepare_plan(&plan)?;
let stream = ctx.execute(physical_plan).await?;