//! This module contains the interface to the Catalog / Chunks used by //! the query engine use super::{ catalog::{Catalog, TableNameFilter}, chunk::DbChunk, query_log::QueryLog, }; use crate::system_tables; use async_trait::async_trait; use data_types::{chunk_metadata::ChunkSummary, partition_metadata::PartitionAddr}; use datafusion::{ catalog::{catalog::CatalogProvider, schema::SchemaProvider}, datasource::TableProvider, }; use hashbrown::HashMap; use job_registry::JobRegistry; use metric::{Attributes, Metric, U64Counter}; use observability_deps::tracing::debug; use parking_lot::{MappedMutexGuard, Mutex, MutexGuard}; use predicate::predicate::{Predicate, PredicateBuilder}; use query::{ provider::{ChunkPruner, ProviderBuilder}, pruning::{prune_chunks, PruningObserver}, QueryChunk, QueryChunkMeta, QueryDatabase, DEFAULT_SCHEMA, }; use schema::Schema; use std::{any::Any, sync::Arc}; use system_tables::{SystemSchemaProvider, SYSTEM_SCHEMA}; use time::TimeProvider; /// The number of entries to store in the circular query buffer log const QUERY_LOG_SIZE: usize = 100; /// Metrics related to chunk access (pruning specifically) #[derive(Debug)] struct AccessMetrics { /// The database name db_name: Arc, /// Total number of chunks pruned via statistics pruned_chunks: Metric, /// Total number of rows pruned using statistics pruned_rows: Metric, /// Keyed by table name tables: Mutex, TableAccessMetrics>>, } impl AccessMetrics { fn new(registry: &metric::Registry, db_name: Arc) -> Self { let pruned_chunks = registry.register_metric( "query_access_pruned_chunks", "Number of chunks pruned using metadata", ); let pruned_rows = registry.register_metric( "query_access_pruned_rows", "Number of rows pruned using metadata", ); Self { db_name, pruned_chunks, pruned_rows, tables: Default::default(), } } fn table_metrics(&self, table: &Arc) -> MappedMutexGuard<'_, TableAccessMetrics> { MutexGuard::map(self.tables.lock(), |tables| { let (_, metrics) = tables.raw_entry_mut().from_key(table).or_insert_with(|| { let attributes = Attributes::from([ ("db_name", self.db_name.to_string().into()), ("table_name", table.to_string().into()), ]); let metrics = TableAccessMetrics { pruned_chunks: self.pruned_chunks.recorder(attributes.clone()), pruned_rows: self.pruned_rows.recorder(attributes), }; (Arc::clone(table), metrics) }); metrics }) } } /// Metrics related to chunk access for a specific table #[derive(Debug)] struct TableAccessMetrics { /// Total number of chunks pruned via statistics pruned_chunks: U64Counter, /// Total number of rows pruned using statistics pruned_rows: U64Counter, } /// `QueryCatalogAccess` implements traits that allow the query engine /// (and DataFusion) to access the contents of the IOx catalog. #[derive(Debug)] pub(crate) struct QueryCatalogAccess { /// The catalog to have access to catalog: Arc, /// Handles finding / pruning chunks based on predicates chunk_access: Arc, /// Stores queries which have been executed query_log: Arc, /// Provides access to system tables system_tables: Arc, /// Provides access to "normal" user tables user_tables: Arc, } impl QueryCatalogAccess { pub fn new( db_name: impl Into, catalog: Arc, jobs: Arc, time_provider: Arc, metric_registry: &metric::Registry, ) -> Self { let db_name = Arc::from(db_name.into()); let access_metrics = AccessMetrics::new(metric_registry, Arc::clone(&db_name)); let chunk_access = Arc::new(ChunkAccess::new(Arc::clone(&catalog), access_metrics)); let query_log = Arc::new(QueryLog::new(QUERY_LOG_SIZE, time_provider)); let system_tables = Arc::new(SystemSchemaProvider::new( db_name.as_ref(), Arc::clone(&catalog), jobs, Arc::clone(&query_log), )); let user_tables = Arc::new(DbSchemaProvider::new( Arc::clone(&catalog), Arc::clone(&chunk_access), )); Self { catalog, chunk_access, query_log, system_tables, user_tables, } } } /// Encapsulates everything needed to find candidate chunks for /// queries (including pruning based on metadata) #[derive(Debug)] struct ChunkAccess { /// The catalog to have access to catalog: Arc, /// Metrics about query processing access_metrics: AccessMetrics, } impl ChunkAccess { fn new(catalog: Arc, access_metrics: AccessMetrics) -> Self { Self { catalog, access_metrics, } } /// Returns all chunks that may have data that passes the /// specified predicates. The chunks are pruned as aggressively as /// possible based on metadata. fn candidate_chunks(&self, predicate: &Predicate) -> Vec> { let partition_key = predicate.partition_key.as_deref(); let table_names: TableNameFilter<'_> = predicate.table_names.as_ref().into(); // Apply initial partition key / table name pruning let chunks = self .catalog .filtered_chunks(table_names, partition_key, DbChunk::snapshot); self.prune_chunks(chunks, predicate) } } impl ChunkPruner for ChunkAccess { fn prune_chunks(&self, chunks: Vec>, predicate: &Predicate) -> Vec> { // TODO: call "apply_predicate" here too for additional // metadata based pruning debug!(num_chunks=chunks.len(), %predicate, "Attempting to prune chunks"); prune_chunks(self, chunks, predicate) } } impl PruningObserver for ChunkAccess { type Observed = DbChunk; fn was_pruned(&self, chunk: &Self::Observed) { let metrics = self.access_metrics.table_metrics(chunk.table_name()); metrics.pruned_chunks.inc(1); metrics.pruned_rows.inc(chunk.summary().total_count()) } fn could_not_prune_chunk(&self, chunk: &Self::Observed, reason: &str) { debug!( chunk_id=%chunk.id().get(), reason, "could not prune chunk from query", ) } } #[async_trait] impl QueryDatabase for QueryCatalogAccess { type Chunk = DbChunk; /// Return a covering set of chunks for a particular partition fn chunks(&self, predicate: &Predicate) -> Vec> { self.chunk_access.candidate_chunks(predicate) } fn partition_addrs(&self) -> Vec { self.catalog.partition_addrs() } fn chunk_summaries(&self) -> Vec { self.catalog.chunk_summaries() } fn table_schema(&self, table_name: &str) -> Option> { self.catalog .table(table_name) .ok() .map(|table| Arc::clone(&table.schema().read())) } fn record_query(&self, query_type: impl Into, query_text: impl Into) { self.query_log.push(query_type, query_text) } } // Datafusion catalog provider interface impl CatalogProvider for QueryCatalogAccess { fn as_any(&self) -> &dyn Any { self as &dyn Any } fn schema_names(&self) -> Vec { vec![ DEFAULT_SCHEMA.to_string(), system_tables::SYSTEM_SCHEMA.to_string(), ] } fn schema(&self, name: &str) -> Option> { match name { DEFAULT_SCHEMA => Some(Arc::clone(&self.user_tables) as Arc), SYSTEM_SCHEMA => Some(Arc::clone(&self.system_tables) as Arc), _ => None, } } } /// Implement the DataFusion schema provider API #[derive(Debug)] struct DbSchemaProvider { /// The catalog to have access to catalog: Arc, /// Handles finding / pruning chunks based on predicates chunk_access: Arc, } impl DbSchemaProvider { fn new(catalog: Arc, chunk_access: Arc) -> Self { Self { catalog, chunk_access, } } } impl SchemaProvider for DbSchemaProvider { fn as_any(&self) -> &dyn Any { self as &dyn Any } fn table_names(&self) -> Vec { self.catalog.table_names() } /// Create a table provider for the named table fn table(&self, table_name: &str) -> Option> { let schema = { let table = self.catalog.table(table_name).ok()?; let schema = Arc::clone(&table.schema().read()); schema }; let mut builder = ProviderBuilder::new(table_name, schema); builder = builder.add_pruner(Arc::clone(&self.chunk_access) as Arc>); let predicate = PredicateBuilder::new().table(table_name).build(); for chunk in self.chunk_access.candidate_chunks(&predicate) { builder = builder.add_chunk(chunk); } match builder.build() { Ok(provider) => Some(Arc::new(provider)), Err(e) => panic!("unexpected error: {:?}", e), } } fn table_exist(&self, name: &str) -> bool { self.catalog.table(name).is_ok() } }