feat: add SystemSchemaProvider to QueryExecutor (#24990)

A shell for the `system` table provider was added to the QueryExecutorImpl
which currently does not do anything, but will enable us to tie the
different system table providers into it.

The QueryLog was elevated from the `Database`, i.e., namespace provider,
to the QueryExecutorImpl, so that it lives accross queries.
pull/24992/head
Trevor Hilton 2024-05-17 11:21:01 -04:00 committed by GitHub
parent 2381cc6f1d
commit 1cb3652692
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 67 additions and 25 deletions

View File

@ -55,6 +55,7 @@ pub struct QueryExecutorImpl<W> {
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
query_execution_semaphore: Arc<InstrumentedAsyncSemaphore>,
query_log: Arc<QueryLog>,
}
impl<W: WriteBuffer> QueryExecutorImpl<W> {
@ -72,12 +73,19 @@ impl<W: WriteBuffer> QueryExecutorImpl<W> {
));
let query_execution_semaphore =
Arc::new(semaphore_metrics.new_semaphore(concurrent_query_limit));
// TODO Fine tune this number or make configurable
const QUERY_LOG_LIMIT: usize = 1_000;
let query_log = Arc::new(QueryLog::new(
QUERY_LOG_LIMIT,
Arc::new(iox_time::SystemProvider::new()),
));
Self {
catalog,
write_buffer,
exec,
datafusion_config,
query_execution_semaphore,
query_log,
}
}
}
@ -282,7 +290,7 @@ impl<W: WriteBuffer> QueryDatabase for QueryExecutorImpl<W> {
&self,
name: &str,
span: Option<Span>,
_include_debug_info_tables: bool,
include_debug_info_tables: bool,
) -> Result<Option<Arc<dyn QueryNamespace>>, DataFusionError> {
let _span_recorder = SpanRecorder::new(span);
@ -297,6 +305,8 @@ impl<W: WriteBuffer> QueryDatabase for QueryExecutorImpl<W> {
Arc::clone(&self.write_buffer) as _,
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
Arc::clone(&self.query_log),
include_debug_info_tables,
))))
}
@ -312,13 +322,14 @@ impl<W: WriteBuffer> QueryDatabase for QueryExecutorImpl<W> {
}
}
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct Database<B> {
db_schema: Arc<DatabaseSchema>,
write_buffer: Arc<B>,
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
query_log: Arc<QueryLog>,
system_schema_provider: Arc<SystemSchemaProvider>,
}
impl<B: WriteBuffer> Database<B> {
@ -327,13 +338,13 @@ impl<B: WriteBuffer> Database<B> {
write_buffer: Arc<B>,
exec: Arc<Executor>,
datafusion_config: Arc<HashMap<String, String>>,
query_log: Arc<QueryLog>,
include_debug_info_tables: bool,
) -> Self {
// TODO Fine tune this number
const QUERY_LOG_LIMIT: usize = 10;
let query_log = Arc::new(QueryLog::new(
QUERY_LOG_LIMIT,
Arc::new(iox_time::SystemProvider::new()),
let system_schema_provider = Arc::new(SystemSchemaProvider::new(
write_buffer.catalog(),
Arc::clone(&query_log),
include_debug_info_tables,
));
Self {
db_schema,
@ -341,6 +352,18 @@ impl<B: WriteBuffer> Database<B> {
exec,
datafusion_config,
query_log,
system_schema_provider,
}
}
fn from_namespace(db: &Self) -> Self {
Self {
db_schema: Arc::clone(&db.db_schema),
write_buffer: Arc::clone(&db.write_buffer),
exec: Arc::clone(&db.exec),
datafusion_config: Arc::clone(&db.datafusion_config),
query_log: Arc::clone(&db.query_log),
system_schema_provider: Arc::clone(&db.system_schema_provider),
}
}
@ -404,17 +427,10 @@ impl<B: WriteBuffer> QueryNamespace for Database<B> {
span_ctx: Option<SpanContext>,
_config: Option<&QueryConfig>,
) -> IOxSessionContext {
let qdb = Self::new(
Arc::clone(&self.db_schema),
Arc::clone(&self.write_buffer),
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
);
let mut cfg = self
.exec
.new_session_config()
.with_default_catalog(Arc::new(qdb))
.with_default_catalog(Arc::new(Self::from_namespace(self)))
.with_span_context(span_ctx);
for (k, v) in self.datafusion_config.as_ref() {
@ -437,15 +453,8 @@ impl<B: WriteBuffer> CatalogProvider for Database<B> {
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
info!("CatalogProvider schema {}", name);
let qdb = Self::new(
Arc::clone(&self.db_schema),
Arc::clone(&self.write_buffer),
Arc::clone(&self.exec),
Arc::clone(&self.datafusion_config),
);
match name {
DEFAULT_SCHEMA => Some(Arc::new(qdb)),
DEFAULT_SCHEMA => Some(Arc::new(Self::from_namespace(self))),
_ => None,
}
}
@ -486,7 +495,6 @@ impl<B: WriteBuffer> QueryTable<B> {
filters: &[Expr],
_limit: Option<usize>,
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
// TODO - this is only pulling from write buffer, and not parquet?
self.write_buffer.get_table_chunks(
&self.db_schema.name,
self.name.as_ref(),
@ -545,3 +553,37 @@ impl<B: WriteBuffer> TableProvider for QueryTable<B> {
provider.scan(ctx, projection, &filters, limit).await
}
}
const _QUERIES_TABLE: &str = "queries";
const _PARQUET_FILES_TABLE: &str = "parquet_files";
struct SystemSchemaProvider {
tables: HashMap<&'static str, Arc<dyn TableProvider>>,
}
impl std::fmt::Debug for SystemSchemaProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mut keys = self.tables.keys().copied().collect::<Vec<_>>();
keys.sort_unstable();
f.debug_struct("SystemSchemaProvider")
.field("tables", &keys.join(", "))
.finish()
}
}
impl SystemSchemaProvider {
fn new(_catalog: Arc<Catalog>, _query_log: Arc<QueryLog>, include_debug_info: bool) -> Self {
let tables = HashMap::new();
if include_debug_info {
// Using todo!() here causes gRPC integration tests to fail, likely because they
// enable debug mode by default, thus entering this if block. So, just leaving this
// here in lieu of todo!().
//
// Eventually, we will implement the queries and parquet_files tables and they will
// be injected to the provider's table hashmap here...
info!("TODO - gather system tables");
}
Self { tables }
}
}