refactor: improve namespace caching in querier (#5492)

1. Cache converted schema instead of catalog schema. This safes a buch
   of memcopies during conversion.
2. Simplify creation of new chunks, we now only need a `CachedTable`
   instead of a namespace and a table schema.

In an artificial benchmark, this removed around 10ms from the query
(although that was prior to #5467 which moved schema conversion one
level up). Still I think it is the cleaner cache design.

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Marco Neumann 2022-08-30 11:42:21 +00:00 committed by GitHub
parent 430536f05f
commit fecbbd9fa1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 220 additions and 211 deletions

View File

@ -13,12 +13,13 @@ use cache_system::{
loader::{metrics::MetricsLoader, FunctionLoader},
resource_consumption::FunctionEstimator,
};
use data_types::{ColumnId, NamespaceSchema};
use data_types::{ColumnId, NamespaceId, NamespaceSchema, TableId, TableSchema};
use iox_catalog::interface::{get_schema_by_name, Catalog};
use iox_time::TimeProvider;
use schema::Schema;
use std::{
collections::{HashMap, HashSet},
mem::size_of_val,
mem::{size_of, size_of_val},
sync::Arc,
time::Duration,
};
@ -94,9 +95,7 @@ impl NamespaceCache {
.await
.expect("retry forever")?;
Some(Arc::new(CachedNamespace {
schema: Arc::new(schema),
}))
Some(Arc::new((&schema).into()))
}
},
));
@ -164,18 +163,19 @@ impl NamespaceCache {
///
/// Expire namespace if the cached schema does NOT cover the given set of columns. The set is given as a list of
/// pairs of table name and column set.
pub async fn schema(
pub async fn get(
&self,
name: Arc<str>,
should_cover: &[(&str, &HashSet<ColumnId>)],
span: Option<Span>,
) -> Option<Arc<NamespaceSchema>> {
) -> Option<Arc<CachedNamespace>> {
self.remove_if_handle.remove_if(&name, |cached_namespace| {
if let Some(namespace) = cached_namespace.as_ref() {
should_cover.iter().any(|(table_name, columns)| {
if let Some(table) = namespace.schema.tables.get(*table_name) {
let covered: HashSet<_> = table.columns.values().map(|c| c.id).collect();
columns.iter().any(|col| !covered.contains(col))
if let Some(table) = namespace.tables.get(*table_name) {
columns
.iter()
.any(|col| !table.column_id_map.contains_key(col))
} else {
// table unknown => need to update
true
@ -187,32 +187,90 @@ impl NamespaceCache {
}
});
self.cache
.get(name, ((), span))
.await
.map(|n| Arc::clone(&n.schema))
self.cache.get(name, ((), span)).await
}
}
#[derive(Debug, Clone)]
struct CachedNamespace {
schema: Arc<NamespaceSchema>,
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CachedTable {
pub id: TableId,
pub schema: Arc<Schema>,
pub column_id_map: HashMap<ColumnId, Arc<str>>,
}
impl CachedTable {
/// RAM-bytes EXCLUDING `self`.
fn size(&self) -> usize {
self.schema.estimate_size()
+ self.column_id_map.capacity() * size_of::<(ColumnId, Arc<str>)>()
+ self
.column_id_map
.iter()
.map(|(_id, name)| name.len())
.sum::<usize>()
}
}
impl From<&TableSchema> for CachedTable {
fn from(table: &TableSchema) -> Self {
let mut column_id_map: HashMap<ColumnId, Arc<str>> = table
.columns
.iter()
.map(|(name, c)| (c.id, Arc::from(name.clone())))
.collect();
column_id_map.shrink_to_fit();
Self {
id: table.id,
schema: Arc::new(
table
.clone()
.try_into()
.expect("Catalog table schema broken"),
),
column_id_map,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CachedNamespace {
pub id: NamespaceId,
pub tables: HashMap<Arc<str>, CachedTable>,
}
impl CachedNamespace {
/// RAM-bytes EXCLUDING `self`.
fn size(&self) -> usize {
self.schema.size() - size_of_val(&self.schema)
self.tables.capacity() * size_of::<(Arc<str>, Arc<Schema>)>()
+ self
.tables
.iter()
.map(|(name, table)| name.len() + table.size())
.sum::<usize>()
}
}
impl From<&NamespaceSchema> for CachedNamespace {
fn from(ns: &NamespaceSchema) -> Self {
let mut tables: HashMap<Arc<str>, CachedTable> = ns
.tables
.iter()
.map(|(name, table)| (Arc::from(name.clone()), table.into()))
.collect();
tables.shrink_to_fit();
Self { id: ns.id, tables }
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeMap;
use crate::cache::{ram::test_util::test_ram_pool, test_util::assert_histogram_metric_count};
use data_types::{ColumnSchema, ColumnType, TableSchema};
use arrow::datatypes::DataType;
use data_types::ColumnType;
use iox_tests::util::TestCatalog;
use schema::SchemaBuilder;
use super::*;
@ -230,10 +288,10 @@ mod tests {
let col111 = table11.create_column("col1", ColumnType::I64).await;
let col112 = table11.create_column("col2", ColumnType::Tag).await;
let col113 = table11.create_column("col3", ColumnType::Time).await;
let col113 = table11.create_column("time", ColumnType::Time).await;
let col121 = table12.create_column("col1", ColumnType::F64).await;
let col122 = table12.create_column("col2", ColumnType::Time).await;
let col211 = table21.create_column("col1", ColumnType::Time).await;
let col122 = table12.create_column("time", ColumnType::Time).await;
let col211 = table21.create_column("time", ColumnType::Time).await;
let cache = NamespaceCache::new(
catalog.catalog(),
@ -245,101 +303,80 @@ mod tests {
true,
);
let schema1_a = cache
.schema(Arc::from(String::from("ns1")), &[], None)
let actual_ns_1_a = cache
.get(Arc::from(String::from("ns1")), &[], None)
.await
.unwrap();
let expected_schema_1 = NamespaceSchema {
let expected_ns_1 = CachedNamespace {
id: ns1.namespace.id,
topic_id: ns1.namespace.topic_id,
query_pool_id: ns1.namespace.query_pool_id,
tables: BTreeMap::from([
tables: HashMap::from([
(
String::from("table1"),
TableSchema {
Arc::from("table1"),
CachedTable {
id: table11.table.id,
columns: BTreeMap::from([
(
String::from("col1"),
ColumnSchema {
id: col111.column.id,
column_type: ColumnType::I64,
},
),
(
String::from("col2"),
ColumnSchema {
id: col112.column.id,
column_type: ColumnType::Tag,
},
),
(
String::from("col3"),
ColumnSchema {
id: col113.column.id,
column_type: ColumnType::Time,
},
),
schema: Arc::new(
SchemaBuilder::new()
.field("col1", DataType::Int64)
.tag("col2")
.timestamp()
.build()
.unwrap(),
),
column_id_map: HashMap::from([
(col111.column.id, Arc::from(col111.column.name.clone())),
(col112.column.id, Arc::from(col112.column.name.clone())),
(col113.column.id, Arc::from(col113.column.name.clone())),
]),
},
),
(
String::from("table2"),
TableSchema {
Arc::from("table2"),
CachedTable {
id: table12.table.id,
columns: BTreeMap::from([
(
String::from("col1"),
ColumnSchema {
id: col121.column.id,
column_type: ColumnType::F64,
},
),
(
String::from("col2"),
ColumnSchema {
id: col122.column.id,
column_type: ColumnType::Time,
},
),
schema: Arc::new(
SchemaBuilder::new()
.field("col1", DataType::Float64)
.timestamp()
.build()
.unwrap(),
),
column_id_map: HashMap::from([
(col121.column.id, Arc::from(col121.column.name.clone())),
(col122.column.id, Arc::from(col122.column.name.clone())),
]),
},
),
]),
};
assert_eq!(schema1_a.as_ref(), &expected_schema_1);
assert_eq!(actual_ns_1_a.as_ref(), &expected_ns_1);
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1);
let schema2 = cache
.schema(Arc::from(String::from("ns2")), &[], None)
let actual_ns_2 = cache
.get(Arc::from(String::from("ns2")), &[], None)
.await
.unwrap();
let expected_schema_2 = NamespaceSchema {
let expected_ns_2 = CachedNamespace {
id: ns2.namespace.id,
topic_id: ns2.namespace.topic_id,
query_pool_id: ns2.namespace.query_pool_id,
tables: BTreeMap::from([(
String::from("table1"),
TableSchema {
tables: HashMap::from([(
Arc::from("table1"),
CachedTable {
id: table21.table.id,
columns: BTreeMap::from([(
String::from("col1"),
ColumnSchema {
id: col211.column.id,
column_type: ColumnType::Time,
},
schema: Arc::new(SchemaBuilder::new().timestamp().build().unwrap()),
column_id_map: HashMap::from([(
col211.column.id,
Arc::from(col211.column.name.clone()),
)]),
},
)]),
};
assert_eq!(schema2.as_ref(), &expected_schema_2);
assert_eq!(actual_ns_2.as_ref(), &expected_ns_2);
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2);
let schema1_b = cache
.schema(Arc::from(String::from("ns1")), &[], None)
let actual_ns_1_b = cache
.get(Arc::from(String::from("ns1")), &[], None)
.await
.unwrap();
assert!(Arc::ptr_eq(&schema1_a, &schema1_b));
assert!(Arc::ptr_eq(&actual_ns_1_a, &actual_ns_1_b));
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2);
}
@ -357,15 +394,11 @@ mod tests {
true,
);
let none = cache
.schema(Arc::from(String::from("foo")), &[], None)
.await;
let none = cache.get(Arc::from(String::from("foo")), &[], None).await;
assert!(none.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1);
let none = cache
.schema(Arc::from(String::from("foo")), &[], None)
.await;
let none = cache.get(Arc::from(String::from("foo")), &[], None).await;
assert!(none.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1);
}
@ -385,14 +418,14 @@ mod tests {
);
// ========== namespace unknown ==========
assert!(cache.schema(Arc::from("ns1"), &[], None).await.is_none());
assert!(cache.get(Arc::from("ns1"), &[], None).await.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1);
assert!(cache.schema(Arc::from("ns1"), &[], None).await.is_none());
assert!(cache.get(Arc::from("ns1"), &[], None).await.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 1);
assert!(cache
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.get(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.await
.is_none());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 2);
@ -401,13 +434,13 @@ mod tests {
let ns1 = catalog.create_namespace("ns1").await;
assert!(cache
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.get(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.await
.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 3);
assert!(cache
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.get(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.await
.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 4);
@ -416,13 +449,13 @@ mod tests {
let t1 = ns1.create_table("t1").await;
assert!(cache
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.get(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.await
.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5);
assert!(cache
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.get(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.await
.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5);
@ -432,13 +465,13 @@ mod tests {
let c2 = t1.create_column("c2", ColumnType::Bool).await;
assert!(cache
.schema(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.get(Arc::from("ns1"), &[("t1", &HashSet::from([]))], None)
.await
.is_some());
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 5);
assert!(cache
.schema(
.get(
Arc::from("ns1"),
&[("t1", &HashSet::from([c1.column.id]))],
None
@ -448,7 +481,7 @@ mod tests {
assert_histogram_metric_count(&catalog.metric_registry, "namespace_get_by_name", 6);
assert!(cache
.schema(
.get(
Arc::from("ns1"),
&[("t1", &HashSet::from([c2.column.id]))],
None

View File

@ -1,9 +1,10 @@
//! Querier Chunks
use crate::cache::namespace::CachedTable;
use crate::cache::CatalogCache;
use data_types::{
ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, NamespaceSchema, ParquetFile,
ParquetFileId, PartitionId, SequenceNumber, ShardId, TableSummary, TimestampMinMax,
ChunkId, ChunkOrder, CompactionLevel, DeletePredicate, ParquetFile, ParquetFileId, PartitionId,
SequenceNumber, ShardId, TableSummary, TimestampMinMax,
};
use iox_catalog::interface::Catalog;
use parking_lot::RwLock;
@ -387,8 +388,7 @@ impl ChunkAdapter {
pub async fn new_chunk(
&self,
namespace_schema: Arc<NamespaceSchema>,
table_schema: Arc<Schema>,
cached_table: &CachedTable,
table_name: Arc<str>,
parquet_file: Arc<ParquetFile>,
span: Option<Span>,
@ -396,8 +396,7 @@ impl ChunkAdapter {
let span_recorder = SpanRecorder::new(span);
let parts = self
.chunk_parts(
namespace_schema,
table_schema,
cached_table,
table_name,
Arc::clone(&parquet_file),
span_recorder.child_span("chunk_parts"),
@ -431,29 +430,31 @@ impl ChunkAdapter {
async fn chunk_parts(
&self,
namespace_schema: Arc<NamespaceSchema>,
table_schema: Arc<Schema>,
cached_table: &CachedTable,
table_name: Arc<str>,
parquet_file: Arc<ParquetFile>,
span: Option<Span>,
) -> Option<ChunkParts> {
let span_recorder = SpanRecorder::new(span);
let table_schema_catalog = namespace_schema.tables.get(table_name.as_ref())?;
let parquet_file_col_ids: HashSet<_> = parquet_file.column_set.iter().collect();
let parquet_file_cols: HashSet<_> = parquet_file
.column_set
.iter()
.map(|id| {
cached_table
.column_id_map
.get(id)
.expect("catalog has all columns")
.as_ref()
})
.collect();
// relevant_pk_columns is everything from the primary key for the table, that is actually in this parquet file
let relevant_pk_columns: Vec<_> = table_schema
let relevant_pk_columns: Vec<_> = cached_table
.schema
.primary_key()
.into_iter()
.filter(|c| {
let col_id = table_schema_catalog
.columns
.get(*c)
.expect("catalog has all columns")
.id;
parquet_file_col_ids.contains(&col_id)
})
.filter(|c| parquet_file_cols.contains(*c))
.collect();
let partition_sort_key = self
.catalog_cache
@ -478,29 +479,24 @@ impl ChunkAdapter {
// IMPORTANT: Do NOT use the sort key to list columns because the sort key only contains primary-key columns.
// NOTE: The schema that we calculate here may have a different column order than the actual parquet file. This
// is OK because the IOx parquet reader can deal with that (see #4921).
let column_names: Vec<_> = table_schema
.as_ref()
let column_names: Vec<_> = cached_table
.schema
.iter()
.filter_map(|(_, field)| {
let name = field.name().as_str();
let column_id = match table_schema_catalog.columns.get(name) {
Some(col) => col.id,
None => return None,
};
if parquet_file_col_ids.contains(&column_id) {
Some(name)
let name = field.name();
if parquet_file_cols.contains(name.as_str()) {
Some(name.clone())
} else {
None
}
})
.map(|s| s.to_owned())
.collect();
let schema = self
.catalog_cache
.projected_schema()
.get(
parquet_file.table_id,
Arc::clone(&table_schema),
Arc::clone(&cached_table.schema),
column_names,
span_recorder.child_span("cache GET projected schema"),
)
@ -546,10 +542,12 @@ struct ChunkParts {
#[cfg(test)]
pub mod tests {
use crate::cache::namespace::CachedNamespace;
use super::*;
use arrow::{datatypes::DataType, record_batch::RecordBatch};
use arrow_util::assert_batches_eq;
use data_types::ColumnType;
use data_types::{ColumnType, NamespaceSchema};
use futures::StreamExt;
use iox_query::{exec::IOxSessionContext, QueryChunk, QueryChunkMeta};
use iox_tests::util::{TestCatalog, TestNamespace, TestParquetFileBuilder};
@ -750,15 +748,11 @@ pub mod tests {
}
async fn chunk(&self, namespace_schema: Arc<NamespaceSchema>) -> QuerierChunk {
let table_schema_catalog = namespace_schema.tables.get("table").expect("table exists");
let table_schema: Schema = table_schema_catalog
.clone()
.try_into()
.expect("Invalid table schema in catalog");
let cached_namespace: CachedNamespace = namespace_schema.as_ref().into();
let cached_table = cached_namespace.tables.get("table").expect("table exists");
self.adapter
.new_chunk(
Arc::clone(&namespace_schema),
Arc::new(table_schema),
cached_table,
Arc::from("table"),
Arc::clone(&self.parquet_file),
None,

View File

@ -164,10 +164,10 @@ impl QuerierDatabase {
pub async fn namespace(&self, name: &str, span: Option<Span>) -> Option<Arc<QuerierNamespace>> {
let span_recorder = SpanRecorder::new(span);
let name = Arc::from(name.to_owned());
let schema = self
let ns = self
.catalog_cache
.namespace()
.schema(
.get(
Arc::clone(&name),
// we have no specific need for any tables or columns at this point, so nothing to cover
&[],
@ -176,7 +176,7 @@ impl QuerierDatabase {
.await?;
Some(Arc::new(QuerierNamespace::new(
Arc::clone(&self.chunk_adapter),
schema,
ns,
name,
Arc::clone(&self.exec),
self.ingester_connection.clone(),

View File

@ -1,17 +1,16 @@
//! Namespace within the whole database.
use crate::{
cache::CatalogCache,
cache::{namespace::CachedNamespace, CatalogCache},
chunk::ChunkAdapter,
ingester::IngesterConnection,
query_log::QueryLog,
table::{PruneMetrics, QuerierTable, QuerierTableArgs},
QuerierChunkLoadSetting,
};
use data_types::{NamespaceId, NamespaceSchema, ParquetFileId, ShardIndex};
use data_types::{NamespaceId, ParquetFileId, ShardIndex};
use iox_query::exec::Executor;
use parquet_file::storage::ParquetStorage;
use schema::Schema;
use sharder::JumpHash;
use std::{collections::HashMap, sync::Arc};
@ -56,7 +55,7 @@ impl QuerierNamespace {
#[allow(clippy::too_many_arguments)]
pub fn new(
chunk_adapter: Arc<ChunkAdapter>,
schema: Arc<NamespaceSchema>,
ns: Arc<CachedNamespace>,
name: Arc<str>,
exec: Arc<Executor>,
ingester_connection: Option<Arc<dyn IngesterConnection>>,
@ -65,20 +64,16 @@ impl QuerierNamespace {
max_table_query_bytes: usize,
prune_metrics: Arc<PruneMetrics>,
) -> Self {
let tables: HashMap<_, _> = schema
let tables: HashMap<_, _> = ns
.tables
.iter()
.map(|(table_name, table_schema)| {
let table_name = Arc::from(table_name.clone());
let id = table_schema.id;
let schema = Schema::try_from(table_schema.clone()).expect("cannot build schema");
.map(|(table_name, cached_table)| {
let table = Arc::new(QuerierTable::new(QuerierTableArgs {
sharder: Arc::clone(&sharder),
namespace_name: Arc::clone(&name),
id,
table_name: Arc::clone(&table_name),
schema: Arc::new(schema),
id: cached_table.id,
table_name: Arc::clone(table_name),
schema: Arc::clone(&cached_table.schema),
ingester_connection: ingester_connection.clone(),
chunk_adapter: Arc::clone(&chunk_adapter),
exec: Arc::clone(&exec),
@ -86,11 +81,11 @@ impl QuerierNamespace {
prune_metrics: Arc::clone(&prune_metrics),
}));
(table_name, table)
(Arc::clone(table_name), table)
})
.collect();
let id = schema.id;
let id = ns.id;
Self {
id,
@ -109,7 +104,7 @@ impl QuerierNamespace {
store: ParquetStorage,
metric_registry: Arc<metric::Registry>,
name: Arc<str>,
schema: Arc<NamespaceSchema>,
ns: Arc<CachedNamespace>,
exec: Arc<Executor>,
ingester_connection: Option<Arc<dyn IngesterConnection>>,
sharder: Arc<JumpHash<Arc<ShardIndex>>>,
@ -128,7 +123,7 @@ impl QuerierNamespace {
Self::new(
chunk_adapter,
schema,
ns,
name,
exec,
ingester_connection,
@ -157,7 +152,7 @@ mod tests {
use crate::namespace::test_util::querier_namespace;
use data_types::ColumnType;
use iox_tests::util::TestCatalog;
use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType};
use schema::{builder::SchemaBuilder, InfluxColumnType, InfluxFieldType, Schema};
#[tokio::test]
async fn test_sync_tables() {

View File

@ -1,5 +1,7 @@
use super::QuerierNamespace;
use crate::{create_ingester_connection_for_testing, QuerierCatalogCache};
use crate::{
cache::namespace::CachedNamespace, create_ingester_connection_for_testing, QuerierCatalogCache,
};
use data_types::{ShardIndex, TableId};
use iox_catalog::interface::get_schema_by_name;
use iox_tests::util::TestNamespace;
@ -19,11 +21,10 @@ pub async fn querier_namespace_with_limit(
max_table_query_bytes: usize,
) -> QuerierNamespace {
let mut repos = ns.catalog.catalog.repositories().await;
let schema = Arc::new(
get_schema_by_name(&ns.namespace.name, repos.as_mut())
.await
.unwrap(),
);
let schema = get_schema_by_name(&ns.namespace.name, repos.as_mut())
.await
.unwrap();
let cached_ns = Arc::new(CachedNamespace::from(&schema));
let catalog_cache = Arc::new(QuerierCatalogCache::new_testing(
ns.catalog.catalog(),
@ -39,7 +40,7 @@ pub async fn querier_namespace_with_limit(
ParquetStorage::new(ns.catalog.object_store()),
ns.catalog.metric_registry(),
ns.namespace.name.clone().into(),
schema,
cached_ns,
ns.catalog.exec(),
Some(create_ingester_connection_for_testing()),
sharder,

View File

@ -243,45 +243,30 @@ impl QuerierTable {
.iter()
.flat_map(|cached_file| cached_file.column_set.iter().copied())
.collect();
let namespace_schema = self
let cached_namespace = self
.chunk_adapter
.catalog_cache()
.namespace()
.schema(
.get(
Arc::clone(&self.namespace_name),
&[(&self.table_name, &columns)],
span_recorder.child_span("cache GET namespace schema"),
)
.await;
let namespace_schema = namespace_schema.as_ref();
let table_schema_catalog = match &namespace_schema {
Some(n) => n.tables.get(self.table_name.as_ref()),
None => None,
};
let cached_table = cached_namespace
.as_ref()
.and_then(|ns| ns.tables.get(self.table_name.as_ref()));
// create parquet files
let parquet_files: Vec<_> = match (namespace_schema, table_schema_catalog) {
(Some(namespace_schema), Some(table_schema_catalog)) => {
let column_id_map = table_schema_catalog.column_id_map();
for col in &columns {
assert!(
column_id_map.contains_key(col),
"Column {} occurs in parquet file but is not part of the table schema",
col.get()
);
}
let table_schema: Schema = table_schema_catalog
.clone()
.try_into()
.expect("Invalid table schema in catalog");
let parquet_files: Vec<_> = match cached_table {
Some(cached_table) => {
let basic_summaries: Vec<_> = parquet_files
.files
.iter()
.map(|p| {
Arc::new(create_basic_summary(
p.row_count as u64,
&table_schema,
&cached_table.schema,
TimestampMinMax {
min: p.min_time.get(),
max: p.max_time.get(),
@ -290,20 +275,22 @@ impl QuerierTable {
})
.map(Some)
.collect();
let table_schema = &Arc::new(table_schema);
// Prune on the most basic summary data (timestamps and column names) before trying to fully load the chunks
let keeps =
match prune_summaries(Arc::clone(table_schema), &basic_summaries, predicate) {
Ok(keeps) => keeps,
Err(reason) => {
// Ignore pruning failures here - the chunk pruner should have already logged them.
// Just skip pruning and gather all the metadata. We have another chance to prune them
// once all the metadata is available
debug!(?reason, "Could not prune before metadata fetch");
vec![true; basic_summaries.len()]
}
};
let keeps = match prune_summaries(
Arc::clone(&cached_table.schema),
&basic_summaries,
predicate,
) {
Ok(keeps) => keeps,
Err(reason) => {
// Ignore pruning failures here - the chunk pruner should have already logged them.
// Just skip pruning and gather all the metadata. We have another chance to prune them
// once all the metadata is available
debug!(?reason, "Could not prune before metadata fetch");
vec![true; basic_summaries.len()]
}
};
let early_pruning_observer =
&MetricPruningObserver::new(Arc::clone(&self.prune_metrics));
@ -320,8 +307,7 @@ impl QuerierTable {
let span = span_recorder.child_span("new_chunk");
chunk_adapter
.new_chunk(
Arc::clone(namespace_schema),
Arc::clone(table_schema),
cached_table,
Arc::clone(self.table_name()),
Arc::clone(cached_parquet_file),
span,
@ -331,7 +317,7 @@ impl QuerierTable {
.collect()
.await
}
(_, _) => Vec::new(),
_ => Vec::new(),
};
let chunks = self

View File

@ -946,7 +946,7 @@ impl MockIngester {
ParquetStorage::new(catalog.object_store()),
catalog.metric_registry(),
ns.namespace.name.clone().into(),
schema,
Arc::new(schema.as_ref().into()),
catalog.exec(),
Some(ingester_connection),
sharder,