diff --git a/querier/src/cache/namespace.rs b/querier/src/cache/namespace.rs index c83efdf43b..9589e4f773 100644 --- a/querier/src/cache/namespace.rs +++ b/querier/src/cache/namespace.rs @@ -13,12 +13,13 @@ use cache_system::{ loader::{metrics::MetricsLoader, FunctionLoader}, resource_consumption::FunctionEstimator, }; -use data_types::{ColumnId, NamespaceSchema}; +use data_types::{ColumnId, NamespaceId, NamespaceSchema, TableId, TableSchema}; use iox_catalog::interface::{get_schema_by_name, Catalog}; use iox_time::TimeProvider; +use schema::Schema; use std::{ collections::{HashMap, HashSet}, - mem::size_of_val, + mem::{size_of, size_of_val}, sync::Arc, time::Duration, }; @@ -94,9 +95,7 @@ impl NamespaceCache { .await .expect("retry forever")?; - Some(Arc::new(CachedNamespace { - schema: Arc::new(schema), - })) + Some(Arc::new((&schema).into())) } }, )); @@ -164,18 +163,19 @@ impl NamespaceCache { /// /// Expire namespace if the cached schema does NOT cover the given set of columns. The set is given as a list of /// pairs of table name and column set. - pub async fn schema( + pub async fn get( &self, name: Arc, should_cover: &[(&str, &HashSet)], span: Option, - ) -> Option> { + ) -> Option> { self.remove_if_handle.remove_if(&name, |cached_namespace| { if let Some(namespace) = cached_namespace.as_ref() { should_cover.iter().any(|(table_name, columns)| { - if let Some(table) = namespace.schema.tables.get(*table_name) { - let covered: HashSet<_> = table.columns.values().map(|c| c.id).collect(); - columns.iter().any(|col| !covered.contains(col)) + if let Some(table) = namespace.tables.get(*table_name) { + columns + .iter() + .any(|col| !table.column_id_map.contains_key(col)) } else { // table unknown => need to update true @@ -187,32 +187,90 @@ impl NamespaceCache { } }); - self.cache - .get(name, ((), span)) - .await - .map(|n| Arc::clone(&n.schema)) + self.cache.get(name, ((), span)).await } } -#[derive(Debug, Clone)] -struct CachedNamespace { - schema: Arc, +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CachedTable { + pub id: TableId, + pub schema: Arc, + pub column_id_map: HashMap>, +} + +impl CachedTable { + /// RAM-bytes EXCLUDING `self`. + fn size(&self) -> usize { + self.schema.estimate_size() + + self.column_id_map.capacity() * size_of::<(ColumnId, Arc)>() + + self + .column_id_map + .iter() + .map(|(_id, name)| name.len()) + .sum::() + } +} + +impl From<&TableSchema> for CachedTable { + fn from(table: &TableSchema) -> Self { + let mut column_id_map: HashMap> = table + .columns + .iter() + .map(|(name, c)| (c.id, Arc::from(name.clone()))) + .collect(); + column_id_map.shrink_to_fit(); + + Self { + id: table.id, + schema: Arc::new( + table + .clone() + .try_into() + .expect("Catalog table schema broken"), + ), + column_id_map, + } + } +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CachedNamespace { + pub id: NamespaceId, + pub tables: HashMap, CachedTable>, } impl CachedNamespace { /// RAM-bytes EXCLUDING `self`. fn size(&self) -> usize { - self.schema.size() - size_of_val(&self.schema) + self.tables.capacity() * size_of::<(Arc, Arc)>() + + self + .tables + .iter() + .map(|(name, table)| name.len() + table.size()) + .sum::() + } +} + +impl From<&NamespaceSchema> for CachedNamespace { + fn from(ns: &NamespaceSchema) -> Self { + let mut tables: HashMap, CachedTable> = ns + .tables + .iter() + .map(|(name, table)| (Arc::from(name.clone()), table.into())) + .collect(); + tables.shrink_to_fit(); + + Self { id: ns.id, tables } } } #[cfg(test)] mod tests { - use std::collections::BTreeMap; - use crate::cache::{ram::test_util::test_ram_pool, test_util::assert_histogram_metric_count}; - use data_types::{ColumnSchema, ColumnType, TableSchema}; + use arrow::datatypes::DataType; + use data_types::ColumnType; use iox_tests::util::TestCatalog; + use schema::SchemaBuilder; use super::*; @@ -230,10 +288,10 @@ mod tests { let col111 = table11.create_column("col1", ColumnType::I64).await; let col112 = table11.create_column("col2", ColumnType::Tag).await; - let col113 = table11.create_column("col3", ColumnType::Time).await; + let col113 = table11.create_column("time", ColumnType::Time).await; let col121 = table12.create_column("col1", ColumnType::F64).await; - let col122 = table12.create_column("col2", ColumnType::Time).await; - let col211 = table21.create_column("col1", ColumnType::Time).await; + let col122 = table12.create_column("time", ColumnType::Time).await; + let col211 = table21.create_column("time", ColumnType::Time).await; let cache = NamespaceCache::new( catalog.catalog(), @@ -245,101 +303,80 @@ mod tests { true, ); - let schema1_a = cache - .schema(Arc::from(String::from("ns1")), &[], None) + let actual_ns_1_a = cache + .get(Arc::from(String::from("ns1")), &[], None) .await .unwrap(); - let expected_schema_1 = NamespaceSchema { + let expected_ns_1 = CachedNamespace { id: ns1.namespace.id, - topic_id: ns1.namespace.topic_id, - query_pool_id: ns1.namespace.query_pool_id, - tables: BTreeMap::from([ + tables: HashMap::from([ ( - String::from("table1"), - TableSchema { + Arc::from("table1"), + CachedTable { id: table11.table.id, - columns: BTreeMap::from([ - ( - String::from("col1"), - ColumnSchema { - id: col111.column.id, - column_type: ColumnType::I64, - }, - ), - ( - String::from("col2"), - ColumnSchema { - id: col112.column.id, - column_type: ColumnType::Tag, - }, - ), - ( - String::from("col3"), - ColumnSchema { - id: col113.column.id, - column_type: ColumnType::Time, - }, - ), + schema: Arc::new( + SchemaBuilder::new() + .field("col1", DataType::Int64) + .tag("col2") + .timestamp() + .build() + .unwrap(), + ), + column_id_map: HashMap::from([ + (col111.column.id, Arc::from(col111.column.name.clone())), + (col112.column.id, Arc::from(col112.column.name.clone())), + (col113.column.id, Arc::from(col113.column.name.clone())), ]), }, ), ( - String::from("table2"), - TableSchema { + Arc::from("table2"), + CachedTable { id: table12.table.id, - columns: BTreeMap::from([ - ( - String::from("col1"), - ColumnSchema { - id: col121.column.id, - column_type: ColumnType::F64, - }, - ), - ( - String::from("col2"), - ColumnSchema { - id: col122.column.id, - column_type: ColumnType::Time, - }, - ), + schema: Arc::new( + SchemaBuilder::new() + .field("col1", DataType::Float64) + .timestamp() + .build() + .unwrap(), + ), + column_id_map: HashMap::from([ + (col121.column.id, Arc::from(col121.column.name.clone())), + (col122.column.id, Arc::from(col122.column.name.clone())), ]), }, ), ]), }; - assert_eq!(schema1_a.as_ref(), &expected_schema_1); + assert_eq!(actual_ns_1_a.as_ref(), &expected_ns_1); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1); - let schema2 = cache - .schema(Arc::from(String::from("ns2")), &[], None) + let actual_ns_2 = cache + .get(Arc::from(String::from("ns2")), &[], None) .await .unwrap(); - let expected_schema_2 = NamespaceSchema { + let expected_ns_2 = CachedNamespace { id: ns2.namespace.id, - topic_id: ns2.namespace.topic_id, - query_pool_id: ns2.namespace.query_pool_id, - tables: BTreeMap::from([( - String::from("table1"), - TableSchema { + tables: HashMap::from([( + Arc::from("table1"), + CachedTable { id: table21.table.id, - columns: BTreeMap::from([( - String::from("col1"), - ColumnSchema { - id: col211.column.id, - column_type: ColumnType::Time, - }, + schema: Arc::new(SchemaBuilder::new().timestamp().build().unwrap()), + column_id_map: HashMap::from([( + col211.column.id, + Arc::from(col211.column.name.clone()), )]), }, )]), }; - assert_eq!(schema2.as_ref(), &expected_schema_2); + assert_eq!(actual_ns_2.as_ref(), &expected_ns_2); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2); - let schema1_b = cache - .schema(Arc::from(String::from("ns1")), &[], None) + let actual_ns_1_b = cache + .get(Arc::from(String::from("ns1")), &[], None) .await .unwrap(); - assert!(Arc::ptr_eq(&schema1_a, &schema1_b)); + assert!(Arc::ptr_eq(&actual_ns_1_a, &actual_ns_1_b)); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2); } @@ -357,15 +394,11 @@ mod tests { true, ); - let none = cache - .schema(Arc::from(String::from("foo")), &[], None) - .await; + let none = cache.get(Arc::from(String::from("foo")), &[], None).await; assert!(none.is_none()); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1); - let none = cache - .schema(Arc::from(String::from("foo")), &[], None) - .await; + let none = cache.get(Arc::from(String::from("foo")), &[], None).await; assert!(none.is_none()); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1); } @@ -385,14 +418,14 @@ mod tests { ); // ========== namespace unknown ========== - assert!(cache.schema(Arc::from("ns1"), &[], None).await.is_none()); + assert!(cache.get(Arc::from("ns1"), &[], None).await.is_none()); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1); - assert!(cache.schema(Arc::from("ns1"), &[], None).await.is_none()); + assert!(cache.get(Arc::from("ns1"), &[], None).await.is_none()); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1); assert!(cache - .schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None) + .get(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None) .await .is_none()); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2); @@ -401,13 +434,13 @@ mod tests { let ns1 = catalog.create_namespace("ns1").await; assert!(cache - .schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None) + .get(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None) .await .is_some()); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 3); assert!(cache - .schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None) + .get(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None) .await .is_some()); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 4); @@ -416,13 +449,13 @@ mod tests { let t1 = ns1.create_table("t1").await; assert!(cache - .schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None) + .get(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None) .await .is_some()); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5); assert!(cache - .schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None) + .get(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None) .await .is_some()); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5); @@ -432,13 +465,13 @@ mod tests { let c2 = t1.create_column("c2", ColumnType::Bool).await; assert!(cache - .schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None) + .get(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None) .await .is_some()); assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5); assert!(cache - .schema( + .get( Arc::from("ns1"), &[("t1", &HashSet::from([c1.column.id]))], None @@ -448,7 +481,7 @@ mod tests { assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 6); assert!(cache - .schema( + .get( Arc::from("ns1"), &[("t1", &HashSet::from([c2.column.id]))], None diff --git a/querier/src/chunk/mod.rs b/querier/src/chunk/mod.rs index 14f916b0fa..c784b51157 100644 --- a/querier/src/chunk/mod.rs +++ b/querier/src/chunk/mod.rs @@ -1,9 +1,10 @@ //! Querier Chunks +use crate::cache::namespace::CachedTable; use crate::cache::CatalogCache; use data_types::{ - ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, NamespaceSchema, ParquetFile, - ParquetFileId, PartitionId, SequenceNumber, ShardId, TableSummary, TimestampMinMax, + ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, ParquetFile, ParquetFileId, PartitionId, + SequenceNumber, ShardId, TableSummary, TimestampMinMax, }; use iox_catalog::interface::Catalog; use parking_lot::RwLock; @@ -387,8 +388,7 @@ impl ChunkAdapter { pub async fn new_chunk( &self, - namespace_schema: Arc, - table_schema: Arc, + cached_table: &CachedTable, table_name: Arc, parquet_file: Arc, span: Option, @@ -396,8 +396,7 @@ impl ChunkAdapter { let span_recorder = SpanRecorder::new(span); let parts = self .chunk_parts( - namespace_schema, - table_schema, + cached_table, table_name, Arc::clone(&parquet_file), span_recorder.child_span("chunk_parts"), @@ -431,29 +430,31 @@ impl ChunkAdapter { async fn chunk_parts( &self, - namespace_schema: Arc, - table_schema: Arc, + cached_table: &CachedTable, table_name: Arc, parquet_file: Arc, span: Option, ) -> Option { let span_recorder = SpanRecorder::new(span); - let table_schema_catalog = namespace_schema.tables.get(table_name.as_ref())?; - let parquet_file_col_ids: HashSet<_> = parquet_file.column_set.iter().collect(); + let parquet_file_cols: HashSet<_> = parquet_file + .column_set + .iter() + .map(|id| { + cached_table + .column_id_map + .get(id) + .expect("catalog has all columns") + .as_ref() + }) + .collect(); // relevant_pk_columns is everything from the primary key for the table, that is actually in this parquet file - let relevant_pk_columns: Vec<_> = table_schema + let relevant_pk_columns: Vec<_> = cached_table + .schema .primary_key() .into_iter() - .filter(|c| { - let col_id = table_schema_catalog - .columns - .get(*c) - .expect("catalog has all columns") - .id; - parquet_file_col_ids.contains(&col_id) - }) + .filter(|c| parquet_file_cols.contains(*c)) .collect(); let partition_sort_key = self .catalog_cache @@ -478,29 +479,24 @@ impl ChunkAdapter { // IMPORTANT: Do NOT use the sort key to list columns because the sort key only contains primary-key columns. // NOTE: The schema that we calculate here may have a different column order than the actual parquet file. This // is OK because the IOx parquet reader can deal with that (see #4921). - let column_names: Vec<_> = table_schema - .as_ref() + let column_names: Vec<_> = cached_table + .schema .iter() .filter_map(|(_, field)| { - let name = field.name().as_str(); - let column_id = match table_schema_catalog.columns.get(name) { - Some(col) => col.id, - None => return None, - }; - if parquet_file_col_ids.contains(&column_id) { - Some(name) + let name = field.name(); + if parquet_file_cols.contains(name.as_str()) { + Some(name.clone()) } else { None } }) - .map(|s| s.to_owned()) .collect(); let schema = self .catalog_cache .projected_schema() .get( parquet_file.table_id, - Arc::clone(&table_schema), + Arc::clone(&cached_table.schema), column_names, span_recorder.child_span("cache GET projected schema"), ) @@ -546,10 +542,12 @@ struct ChunkParts { #[cfg(test)] pub mod tests { + use crate::cache::namespace::CachedNamespace; + use super::*; use arrow::{datatypes::DataType, record_batch::RecordBatch}; use arrow_util::assert_batches_eq; - use data_types::ColumnType; + use data_types::{ColumnType, NamespaceSchema}; use futures::StreamExt; use iox_query::{exec::IOxSessionContext, QueryChunk, QueryChunkMeta}; use iox_tests::util::{TestCatalog, TestNamespace, TestParquetFileBuilder}; @@ -750,15 +748,11 @@ pub mod tests { } async fn chunk(&self, namespace_schema: Arc) -> QuerierChunk { - let table_schema_catalog = namespace_schema.tables.get("table").expect("table exists"); - let table_schema: Schema = table_schema_catalog - .clone() - .try_into() - .expect("Invalid table schema in catalog"); + let cached_namespace: CachedNamespace = namespace_schema.as_ref().into(); + let cached_table = cached_namespace.tables.get("table").expect("table exists"); self.adapter .new_chunk( - Arc::clone(&namespace_schema), - Arc::new(table_schema), + cached_table, Arc::from("table"), Arc::clone(&self.parquet_file), None, diff --git a/querier/src/database.rs b/querier/src/database.rs index e18f8fea4f..8445612b56 100644 --- a/querier/src/database.rs +++ b/querier/src/database.rs @@ -164,10 +164,10 @@ impl QuerierDatabase { pub async fn namespace(&self, name: &str, span: Option) -> Option> { let span_recorder = SpanRecorder::new(span); let name = Arc::from(name.to_owned()); - let schema = self + let ns = self .catalog_cache .namespace() - .schema( + .get( Arc::clone(&name), // we have no specific need for any tables or columns at this point, so nothing to cover &[], @@ -176,7 +176,7 @@ impl QuerierDatabase { .await?; Some(Arc::new(QuerierNamespace::new( Arc::clone(&self.chunk_adapter), - schema, + ns, name, Arc::clone(&self.exec), self.ingester_connection.clone(), diff --git a/querier/src/namespace/mod.rs b/querier/src/namespace/mod.rs index 1d8d451539..1985716139 100644 --- a/querier/src/namespace/mod.rs +++ b/querier/src/namespace/mod.rs @@ -1,17 +1,16 @@ //! Namespace within the whole database. use crate::{ - cache::CatalogCache, + cache::{namespace::CachedNamespace, CatalogCache}, chunk::ChunkAdapter, ingester::IngesterConnection, query_log::QueryLog, table::{PruneMetrics, QuerierTable, QuerierTableArgs}, QuerierChunkLoadSetting, }; -use data_types::{NamespaceId, NamespaceSchema, ParquetFileId, ShardIndex}; +use data_types::{NamespaceId, ParquetFileId, ShardIndex}; use iox_query::exec::Executor; use parquet_file::storage::ParquetStorage; -use schema::Schema; use sharder::JumpHash; use std::{collections::HashMap, sync::Arc}; @@ -56,7 +55,7 @@ impl QuerierNamespace { #[allow(clippy::too_many_arguments)] pub fn new( chunk_adapter: Arc, - schema: Arc, + ns: Arc, name: Arc, exec: Arc, ingester_connection: Option>, @@ -65,20 +64,16 @@ impl QuerierNamespace { max_table_query_bytes: usize, prune_metrics: Arc, ) -> Self { - let tables: HashMap<_, _> = schema + let tables: HashMap<_, _> = ns .tables .iter() - .map(|(table_name, table_schema)| { - let table_name = Arc::from(table_name.clone()); - let id = table_schema.id; - let schema = Schema::try_from(table_schema.clone()).expect("cannot build schema"); - + .map(|(table_name, cached_table)| { let table = Arc::new(QuerierTable::new(QuerierTableArgs { sharder: Arc::clone(&sharder), namespace_name: Arc::clone(&name), - id, - table_name: Arc::clone(&table_name), - schema: Arc::new(schema), + id: cached_table.id, + table_name: Arc::clone(table_name), + schema: Arc::clone(&cached_table.schema), ingester_connection: ingester_connection.clone(), chunk_adapter: Arc::clone(&chunk_adapter), exec: Arc::clone(&exec), @@ -86,11 +81,11 @@ impl QuerierNamespace { prune_metrics: Arc::clone(&prune_metrics), })); - (table_name, table) + (Arc::clone(table_name), table) }) .collect(); - let id = schema.id; + let id = ns.id; Self { id, @@ -109,7 +104,7 @@ impl QuerierNamespace { store: ParquetStorage, metric_registry: Arc, name: Arc, - schema: Arc, + ns: Arc, exec: Arc, ingester_connection: Option>, sharder: Arc>>, @@ -128,7 +123,7 @@ impl QuerierNamespace { Self::new( chunk_adapter, - schema, + ns, name, exec, ingester_connection, @@ -157,7 +152,7 @@ mod tests { use crate::namespace::test_util::querier_namespace; use data_types::ColumnType; use iox_tests::util::TestCatalog; - use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType}; + use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType, Schema}; #[tokio::test] async fn test_sync_tables() { diff --git a/querier/src/namespace/test_util.rs b/querier/src/namespace/test_util.rs index 07f24e947f..74e49c6035 100644 --- a/querier/src/namespace/test_util.rs +++ b/querier/src/namespace/test_util.rs @@ -1,5 +1,7 @@ use super::QuerierNamespace; -use crate::{create_ingester_connection_for_testing, QuerierCatalogCache}; +use crate::{ + cache::namespace::CachedNamespace, create_ingester_connection_for_testing, QuerierCatalogCache, +}; use data_types::{ShardIndex, TableId}; use iox_catalog::interface::get_schema_by_name; use iox_tests::util::TestNamespace; @@ -19,11 +21,10 @@ pub async fn querier_namespace_with_limit( max_table_query_bytes: usize, ) -> QuerierNamespace { let mut repos = ns.catalog.catalog.repositories().await; - let schema = Arc::new( - get_schema_by_name(&ns.namespace.name, repos.as_mut()) - .await - .unwrap(), - ); + let schema = get_schema_by_name(&ns.namespace.name, repos.as_mut()) + .await + .unwrap(); + let cached_ns = Arc::new(CachedNamespace::from(&schema)); let catalog_cache = Arc::new(QuerierCatalogCache::new_testing( ns.catalog.catalog(), @@ -39,7 +40,7 @@ pub async fn querier_namespace_with_limit( ParquetStorage::new(ns.catalog.object_store()), ns.catalog.metric_registry(), ns.namespace.name.clone().into(), - schema, + cached_ns, ns.catalog.exec(), Some(create_ingester_connection_for_testing()), sharder, diff --git a/querier/src/table/mod.rs b/querier/src/table/mod.rs index 58b56d7521..ff180c85d0 100644 --- a/querier/src/table/mod.rs +++ b/querier/src/table/mod.rs @@ -243,45 +243,30 @@ impl QuerierTable { .iter() .flat_map(|cached_file| cached_file.column_set.iter().copied()) .collect(); - let namespace_schema = self + let cached_namespace = self .chunk_adapter .catalog_cache() .namespace() - .schema( + .get( Arc::clone(&self.namespace_name), &[(&self.table_name, &columns)], span_recorder.child_span("cache GET namespace schema"), ) .await; - let namespace_schema = namespace_schema.as_ref(); - let table_schema_catalog = match &namespace_schema { - Some(n) => n.tables.get(self.table_name.as_ref()), - None => None, - }; + let cached_table = cached_namespace + .as_ref() + .and_then(|ns| ns.tables.get(self.table_name.as_ref())); // create parquet files - let parquet_files: Vec<_> = match (namespace_schema, table_schema_catalog) { - (Some(namespace_schema), Some(table_schema_catalog)) => { - let column_id_map = table_schema_catalog.column_id_map(); - for col in &columns { - assert!( - column_id_map.contains_key(col), - "Column {} occurs in parquet file but is not part of the table schema", - col.get() - ); - } - let table_schema: Schema = table_schema_catalog - .clone() - .try_into() - .expect("Invalid table schema in catalog"); - + let parquet_files: Vec<_> = match cached_table { + Some(cached_table) => { let basic_summaries: Vec<_> = parquet_files .files .iter() .map(|p| { Arc::new(create_basic_summary( p.row_count as u64, - &table_schema, + &cached_table.schema, TimestampMinMax { min: p.min_time.get(), max: p.max_time.get(), @@ -290,20 +275,22 @@ impl QuerierTable { }) .map(Some) .collect(); - let table_schema = &Arc::new(table_schema); // Prune on the most basic summary data (timestamps and column names) before trying to fully load the chunks - let keeps = - match prune_summaries(Arc::clone(table_schema), &basic_summaries, predicate) { - Ok(keeps) => keeps, - Err(reason) => { - // Ignore pruning failures here - the chunk pruner should have already logged them. - // Just skip pruning and gather all the metadata. We have another chance to prune them - // once all the metadata is available - debug!(?reason, "Could not prune before metadata fetch"); - vec![true; basic_summaries.len()] - } - }; + let keeps = match prune_summaries( + Arc::clone(&cached_table.schema), + &basic_summaries, + predicate, + ) { + Ok(keeps) => keeps, + Err(reason) => { + // Ignore pruning failures here - the chunk pruner should have already logged them. + // Just skip pruning and gather all the metadata. We have another chance to prune them + // once all the metadata is available + debug!(?reason, "Could not prune before metadata fetch"); + vec![true; basic_summaries.len()] + } + }; let early_pruning_observer = &MetricPruningObserver::new(Arc::clone(&self.prune_metrics)); @@ -320,8 +307,7 @@ impl QuerierTable { let span = span_recorder.child_span("new_chunk"); chunk_adapter .new_chunk( - Arc::clone(namespace_schema), - Arc::clone(table_schema), + cached_table, Arc::clone(self.table_name()), Arc::clone(cached_parquet_file), span, @@ -331,7 +317,7 @@ impl QuerierTable { .collect() .await } - (_, _) => Vec::new(), + _ => Vec::new(), }; let chunks = self diff --git a/query_tests/src/scenarios/util.rs b/query_tests/src/scenarios/util.rs index dd4ea43039..7c83d16bcd 100644 --- a/query_tests/src/scenarios/util.rs +++ b/query_tests/src/scenarios/util.rs @@ -946,7 +946,7 @@ impl MockIngester { ParquetStorage::new(catalog.object_store()), catalog.metric_registry(), ns.namespace.name.clone().into(), - schema, + Arc::new(schema.as_ref().into()), catalog.exec(), Some(ingester_connection), sharder,