From 0657ad9600a9a58237b7fad769aa8196715d766b Mon Sep 17 00:00:00 2001 From: "Carol (Nichols || Goulding)" Date: Fri, 11 Nov 2022 14:20:31 -0500 Subject: [PATCH] fix: Rename QueryDatabase to QueryNamespace --- iox_query/src/frontend/influxrpc.rs | 105 +++++++++--------- iox_query/src/lib.rs | 24 ++-- iox_query/src/test.rs | 7 +- querier/src/namespace/query_access.rs | 4 +- query_tests/src/db.rs | 10 +- query_tests/src/influxrpc/field_columns.rs | 2 +- query_tests/src/influxrpc/read_filter.rs | 2 +- query_tests/src/influxrpc/read_group.rs | 2 +- .../src/influxrpc/read_window_aggregate.rs | 2 +- query_tests/src/influxrpc/table_names.rs | 2 +- query_tests/src/influxrpc/tag_keys.rs | 2 +- query_tests/src/influxrpc/tag_values.rs | 4 +- service_common/src/lib.rs | 14 +-- service_common/src/planner.rs | 60 +++++----- service_grpc_flight/src/lib.rs | 2 +- service_grpc_influxrpc/src/service.rs | 58 +++++----- 16 files changed, 147 insertions(+), 153 deletions(-) diff --git a/iox_query/src/frontend/influxrpc.rs b/iox_query/src/frontend/influxrpc.rs index 07a68c2714..26dfda0ff2 100644 --- a/iox_query/src/frontend/influxrpc.rs +++ b/iox_query/src/frontend/influxrpc.rs @@ -11,7 +11,7 @@ use crate::{ seriesset::{SeriesSetPlan, SeriesSetPlans}, stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder}, }, - QueryChunk, QueryDatabase, + QueryChunk, QueryNamespace, }; use arrow::datatypes::DataType; use data_types::ChunkId; @@ -226,7 +226,7 @@ impl InfluxRpcPlanner { /// . chunks without deleted data but cannot be decided from meta data pub async fn table_names( &self, - database: Arc, + namespace: Arc, rpc_predicate: InfluxRpcPredicate, ) -> Result { let ctx = self.ctx.child_ctx("table_names planning"); @@ -236,10 +236,10 @@ impl InfluxRpcPlanner { let rpc_predicate = rpc_predicate.clear_timestamp_if_max_range(); let table_predicates = rpc_predicate - .table_predicates(database.as_meta()) + .table_predicates(namespace.as_meta()) .context(CreatingPredicatesSnafu)?; let tables: Vec<_> = - table_chunk_stream(Arc::clone(&database), false, &table_predicates, &ctx) + table_chunk_stream(Arc::clone(&namespace), false, &table_predicates, &ctx) .try_filter_map(|(table_name, predicate, chunks)| async move { // Identify which chunks can answer from its metadata and then record its table, // and which chunks needs full plan and group them into their table @@ -291,7 +291,7 @@ impl InfluxRpcPlanner { builder.append_string(table_name.to_string()); } Some((predicate, chunks)) => { - let schema = database + let schema = namespace .table_schema(table_name) .context(TableRemovedSnafu { table_name: table_name.as_ref(), @@ -315,13 +315,12 @@ impl InfluxRpcPlanner { builder.build().context(CreatingStringSetSnafu) } - /// Returns a set of plans that produces the names of "tag" - /// columns (as defined in the InfluxDB Data model) names in this - /// database that have more than zero rows which pass the - /// conditions specified by `predicate`. + /// Returns a set of plans that produces the names of "tag" columns (as defined in the InfluxDB + /// data model) names in this namespace that have more than zero rows which pass the conditions + /// specified by `predicate`. pub async fn tag_keys( &self, - database: Arc, + namespace: Arc, rpc_predicate: InfluxRpcPredicate, ) -> Result { let ctx = self.ctx.child_ctx("tag_keys planning"); @@ -334,11 +333,11 @@ impl InfluxRpcPlanner { // // 1. Find all the potential tables in the chunks // - // 2. For each table/chunk pair, figure out which can be found - // from only metadata and which need full plans + // 2. For each table/chunk pair, figure out which can be found from only metadata and which + // need full plans let table_predicates = rpc_predicate - .table_predicates(database.as_meta()) + .table_predicates(namespace.as_meta()) .context(CreatingPredicatesSnafu)?; let mut table_predicates_need_chunks = vec![]; @@ -348,7 +347,7 @@ impl InfluxRpcPlanner { // special case - return the columns from metadata only. // Note that columns with all rows deleted will still show here builder = builder.append_other( - database + namespace .table_schema(&table_name) .context(TableRemovedSnafu { table_name: table_name.as_ref(), @@ -364,7 +363,7 @@ impl InfluxRpcPlanner { } let tables: Vec<_> = table_chunk_stream( - Arc::clone(&database), + Arc::clone(&namespace), false, &table_predicates_need_chunks, &ctx, @@ -458,7 +457,7 @@ impl InfluxRpcPlanner { // out chunks (and tables) where all columns in that chunk // were already known to have data (based on the contents of known_columns) - let schema = database + let schema = namespace .table_schema(table_name) .context(TableRemovedSnafu { table_name: table_name.as_ref(), @@ -478,12 +477,11 @@ impl InfluxRpcPlanner { builder.build().context(CreatingStringSetSnafu) } - /// Returns a plan which finds the distinct, non-null tag values - /// in the specified `tag_name` column of this database which pass - /// the conditions specified by `predicate`. + /// Returns a plan which finds the distinct, non-null tag values in the specified `tag_name` + /// column of this namespace which pass the conditions specified by `predicate`. pub async fn tag_values( &self, - database: Arc, + namespace: Arc, tag_name: &str, rpc_predicate: InfluxRpcPredicate, ) -> Result { @@ -499,14 +497,14 @@ impl InfluxRpcPlanner { // which need full plans let table_predicates = rpc_predicate - .table_predicates(database.as_meta()) + .table_predicates(namespace.as_meta()) .context(CreatingPredicatesSnafu)?; - // filter out tables that do NOT contain `tag_name` early, esp. before performing any chunk scan (which includes - // ingester RPC) + // filter out tables that do NOT contain `tag_name` early, esp. before performing any chunk + // scan (which includes ingester RPC) let mut table_predicates_filtered = Vec::with_capacity(table_predicates.len()); for (table_name, predicate) in table_predicates { - let schema = database + let schema = namespace .table_schema(&table_name) .context(TableRemovedSnafu { table_name: table_name.as_ref(), @@ -521,7 +519,7 @@ impl InfluxRpcPlanner { } let tables: Vec<_> = table_chunk_stream( - Arc::clone(&database), + Arc::clone(&namespace), false, &table_predicates_filtered, &ctx, @@ -546,8 +544,8 @@ impl InfluxRpcPlanner { let schema = chunk.schema(); // Skip this table if the tag_name is not a column in this chunk - // Note: This may happen even when the table contains the tag_name, because some chunks may not - // contain all columns. + // Note: This may happen even when the table contains the tag_name, because some + // chunks may not contain all columns. let idx = if let Some(idx) = schema.find_index_of(tag_name) { idx } else { @@ -571,8 +569,8 @@ impl InfluxRpcPlanner { } ); - // If there are delete predicates, we need to scan (or do full plan) the data to eliminate - // deleted data before getting tag values + // If there are delete predicates, we need to scan (or do full plan) the data to + // eliminate deleted data before getting tag values if chunk.has_delete_predicates() { debug!( %table_name, @@ -628,7 +626,7 @@ impl InfluxRpcPlanner { builder = builder.append_other(known_values.into()); if !chunks_full.is_empty() { - let schema = database + let schema = namespace .table_schema(table_name) .context(TableRemovedSnafu { table_name: table_name.as_ref(), @@ -670,13 +668,12 @@ impl InfluxRpcPlanner { builder.build().context(CreatingStringSetSnafu) } - /// Returns a plan that produces a list of columns and their - /// datatypes (as defined in the data written via `write_lines`), - /// and which have more than zero rows which pass the conditions + /// Returns a plan that produces a list of columns and their datatypes (as defined in the data + /// written via `write_lines`), and which have more than zero rows which pass the conditions /// specified by `predicate`. pub async fn field_columns( &self, - database: Arc, + namespace: Arc, rpc_predicate: InfluxRpcPredicate, ) -> Result { let ctx = self.ctx.child_ctx("field_columns planning"); @@ -692,7 +689,7 @@ impl InfluxRpcPlanner { // values and stops the plan executing once it has them let table_predicates = rpc_predicate - .table_predicates(database.as_meta()) + .table_predicates(namespace.as_meta()) .context(CreatingPredicatesSnafu)?; // optimization: just get the field columns from metadata. @@ -701,7 +698,7 @@ impl InfluxRpcPlanner { let mut table_predicates_need_chunks = Vec::with_capacity(table_predicates.len()); for (table_name, predicate) in table_predicates { if predicate.is_empty() { - let schema = database + let schema = namespace .table_schema(&table_name) .context(TableRemovedSnafu { table_name: table_name.as_ref(), @@ -721,7 +718,7 @@ impl InfluxRpcPlanner { // full scans let plans = create_plans( - database, + namespace, &table_predicates_need_chunks, ctx, |ctx, table_name, predicate, chunks, schema| { @@ -762,18 +759,18 @@ impl InfluxRpcPlanner { /// same) occur together in the plan pub async fn read_filter( &self, - database: Arc, + namespace: Arc, rpc_predicate: InfluxRpcPredicate, ) -> Result { let ctx = self.ctx.child_ctx("planning_read_filter"); debug!(?rpc_predicate, "planning read_filter"); let table_predicates = rpc_predicate - .table_predicates(database.as_meta()) + .table_predicates(namespace.as_meta()) .context(CreatingPredicatesSnafu)?; let plans = create_plans( - database, + namespace, &table_predicates, ctx, |ctx, table_name, predicate, chunks, schema| { @@ -813,7 +810,7 @@ impl InfluxRpcPlanner { /// (apply filters) pub async fn read_group( &self, - database: Arc, + namespace: Arc, rpc_predicate: InfluxRpcPredicate, agg: Aggregate, group_columns: &[impl AsRef + Send + Sync], @@ -822,11 +819,11 @@ impl InfluxRpcPlanner { debug!(?rpc_predicate, ?agg, "planning read_group"); let table_predicates = rpc_predicate - .table_predicates(database.as_meta()) + .table_predicates(namespace.as_meta()) .context(CreatingPredicatesSnafu)?; let plans = create_plans( - database, + namespace, &table_predicates, ctx, |ctx, table_name, predicate, chunks, schema| match agg { @@ -863,7 +860,7 @@ impl InfluxRpcPlanner { /// that are grouped by window definitions pub async fn read_window_aggregate( &self, - database: Arc, + namespace: Arc, rpc_predicate: InfluxRpcPredicate, agg: Aggregate, every: WindowDuration, @@ -879,11 +876,11 @@ impl InfluxRpcPlanner { ); let table_predicates = rpc_predicate - .table_predicates(database.as_meta()) + .table_predicates(namespace.as_meta()) .context(CreatingPredicatesSnafu)?; let plans = create_plans( - database, + namespace, &table_predicates, ctx, |ctx, table_name, predicate, chunks, schema| { @@ -1375,7 +1372,7 @@ impl InfluxRpcPlanner { /// This function is indirectly invoked by `field_columns`, `read_filter`, `read_group` and `read_window_aggregate` /// through the function `create_plans` where need_fields should be true. fn table_chunk_stream<'a>( - database: Arc, + namespace: Arc, need_fields: bool, table_predicates: &'a [(Arc, Predicate)], ctx: &'a IOxSessionContext, @@ -1385,9 +1382,9 @@ fn table_chunk_stream<'a>( let mut ctx = ctx.child_ctx("table"); ctx.set_metadata("table", table_name.to_string()); - let database = Arc::clone(&database); + let namespace = Arc::clone(&namespace); - let table_schema = database.table_schema(table_name); + let table_schema = namespace.table_schema(table_name); let projection = match table_schema { Some(table_schema) => { columns_in_predicates(need_fields, table_schema, table_name, predicate) @@ -1396,7 +1393,7 @@ fn table_chunk_stream<'a>( }; async move { - let chunks = database + let chunks = namespace .chunks( table_name, predicate, @@ -1507,7 +1504,7 @@ fn columns_in_predicates( /// `f(ctx, table_name, table_predicate, chunks, table_schema)` is /// invoked on the chunks for each table to produce a plan for each async fn create_plans( - database: Arc, + namespace: Arc, table_predicates: &[(Arc, Predicate)], ctx: IOxSessionContext, f: F, @@ -1525,7 +1522,7 @@ where + Sync, P: Send, { - table_chunk_stream(Arc::clone(&database), true, table_predicates, &ctx) + table_chunk_stream(Arc::clone(&namespace), true, table_predicates, &ctx) .and_then(|(table_name, predicate, chunks)| async move { let chunks = prune_chunks_metadata(chunks, predicate)?; Ok((table_name, predicate, chunks)) @@ -1540,11 +1537,11 @@ where let mut ctx = ctx.child_ctx("table"); ctx.set_metadata("table", table_name.to_string()); - let database = Arc::clone(&database); + let namespace = Arc::clone(&namespace); let f = f.clone(); async move { - let schema = database + let schema = namespace .table_schema(table_name) .context(TableRemovedSnafu { table_name: table_name.as_ref(), diff --git a/iox_query/src/lib.rs b/iox_query/src/lib.rs index b0341680d3..8570ab5b8b 100644 --- a/iox_query/src/lib.rs +++ b/iox_query/src/lib.rs @@ -89,7 +89,7 @@ pub trait QueryChunkMeta { } /// A `QueryCompletedToken` is returned by `record_query` implementations of -/// a `QueryDatabase`. It is used to trigger side-effects (such as query timing) +/// a `QueryNamespace`. It is used to trigger side-effects (such as query timing) /// on query completion. /// pub struct QueryCompletedToken { @@ -136,22 +136,20 @@ impl Drop for QueryCompletedToken { /// This avoids storing potentially large strings pub type QueryText = Box; -/// A `Database` is the main trait implemented by the IOx subsystems -/// that store actual data. +/// `QueryNamespace` is the main trait implemented by the IOx subsystems that store actual data. /// -/// Databases store data organized by partitions and each partition stores -/// data in Chunks. +/// Namespaces store data organized by partitions and each partition stores data in Chunks. #[async_trait] -pub trait QueryDatabase: QueryNamespaceMeta + Debug + Send + Sync { - /// Returns a set of chunks within the partition with data that may match - /// the provided predicate. +pub trait QueryNamespace: QueryNamespaceMeta + Debug + Send + Sync { + /// Returns a set of chunks within the partition with data that may match the provided + /// predicate. /// - /// If possible, chunks which have no rows that can - /// possibly match the predicate may be omitted. + /// If possible, chunks which have no rows that can possibly match the predicate may be omitted. /// - /// If projection is None, returned chunks will include all columns of its original data. Otherwise, - /// returned chunks will include PK columns (tags and time) and columns specified in the projection. Projecting - /// chunks here is optional and a mere optimization. The query subsystem does NOT rely on it. + /// If projection is `None`, returned chunks will include all columns of its original data. + /// Otherwise, returned chunks will include PK columns (tags and time) and columns specified in + /// the projection. Projecting chunks here is optional and a mere optimization. The query + /// subsystem does NOT rely on it. async fn chunks( &self, table_name: &str, diff --git a/iox_query/src/test.rs b/iox_query/src/test.rs index 6b578775cb..6ecdc32291 100644 --- a/iox_query/src/test.rs +++ b/iox_query/src/test.rs @@ -1,5 +1,4 @@ -//! This module provides a reference implementation of -//! [`QueryDatabase`] for use in testing. +//! This module provides a reference implementation of [`QueryNamespace`] for use in testing. //! //! AKA it is a Mock @@ -9,7 +8,7 @@ use crate::{ ExecutionContextProvider, Executor, ExecutorType, IOxSessionContext, }, Predicate, PredicateMatch, QueryChunk, QueryChunkData, QueryChunkMeta, QueryCompletedToken, - QueryDatabase, QueryText, + QueryNamespace, QueryText, }; use arrow::{ array::{ @@ -100,7 +99,7 @@ impl TestDatabase { } #[async_trait] -impl QueryDatabase for TestDatabase { +impl QueryNamespace for TestDatabase { async fn chunks( &self, table_name: &str, diff --git a/querier/src/namespace/query_access.rs b/querier/src/namespace/query_access.rs index 855bb5bbc7..1d6df8fe5c 100644 --- a/querier/src/namespace/query_access.rs +++ b/querier/src/namespace/query_access.rs @@ -16,7 +16,7 @@ use datafusion::{ use datafusion_util::config::DEFAULT_SCHEMA; use iox_query::{ exec::{ExecutionContextProvider, ExecutorType, IOxSessionContext}, - QueryChunk, QueryCompletedToken, QueryDatabase, QueryText, + QueryChunk, QueryCompletedToken, QueryNamespace, QueryText, }; use observability_deps::tracing::{debug, trace}; use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate}; @@ -37,7 +37,7 @@ impl QueryNamespaceMeta for QuerierNamespace { } #[async_trait] -impl QueryDatabase for QuerierNamespace { +impl QueryNamespace for QuerierNamespace { async fn chunks( &self, table_name: &str, diff --git a/query_tests/src/db.rs b/query_tests/src/db.rs index c6d4362e0a..d2d7245892 100644 --- a/query_tests/src/db.rs +++ b/query_tests/src/db.rs @@ -1,11 +1,11 @@ use std::{any::Any, sync::Arc}; use datafusion::catalog::catalog::CatalogProvider; -use iox_query::{exec::ExecutionContextProvider, QueryDatabase}; +use iox_query::{exec::ExecutionContextProvider, QueryNamespace}; use querier::QuerierNamespace; /// Abstract database used during testing. -pub trait AbstractDb: CatalogProvider + ExecutionContextProvider + QueryDatabase { +pub trait AbstractDb: CatalogProvider + ExecutionContextProvider + QueryNamespace { fn as_any_arc(self: Arc) -> Arc; /// Upcast to [`CatalogProvider`]. @@ -13,10 +13,10 @@ pub trait AbstractDb: CatalogProvider + ExecutionContextProvider + QueryDatabase /// This is required due to . fn as_catalog_provider_arc(self: Arc) -> Arc; - /// Upcast to [`QueryDatabase`]. + /// Upcast to [`QueryNamespace`]. /// /// This is required due to . - fn as_query_database_arc(self: Arc) -> Arc; + fn as_query_namespace_arc(self: Arc) -> Arc; } impl AbstractDb for QuerierNamespace { @@ -28,7 +28,7 @@ impl AbstractDb for QuerierNamespace { self as _ } - fn as_query_database_arc(self: Arc) -> Arc { + fn as_query_namespace_arc(self: Arc) -> Arc { self } } diff --git a/query_tests/src/influxrpc/field_columns.rs b/query_tests/src/influxrpc/field_columns.rs index dc3fce2297..70a8021cac 100644 --- a/query_tests/src/influxrpc/field_columns.rs +++ b/query_tests/src/influxrpc/field_columns.rs @@ -32,7 +32,7 @@ async fn run_field_columns_test_case( let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner - .field_columns(db.as_query_database_arc(), predicate.clone()) + .field_columns(db.as_query_namespace_arc(), predicate.clone()) .await .expect("built plan successfully"); let fields = ctx diff --git a/query_tests/src/influxrpc/read_filter.rs b/query_tests/src/influxrpc/read_filter.rs index c0ecfe18c7..d90253330e 100644 --- a/query_tests/src/influxrpc/read_filter.rs +++ b/query_tests/src/influxrpc/read_filter.rs @@ -64,7 +64,7 @@ async fn run_read_filter( let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner - .read_filter(db.as_query_database_arc(), predicate) + .read_filter(db.as_query_namespace_arc(), predicate) .await .map_err(|e| e.to_string())?; diff --git a/query_tests/src/influxrpc/read_group.rs b/query_tests/src/influxrpc/read_group.rs index 4e9f964c2c..08adc434a7 100644 --- a/query_tests/src/influxrpc/read_group.rs +++ b/query_tests/src/influxrpc/read_group.rs @@ -38,7 +38,7 @@ async fn run_read_group_test_case( let plans = planner .read_group( - db.as_query_database_arc(), + db.as_query_namespace_arc(), predicate.clone(), agg, &group_columns, diff --git a/query_tests/src/influxrpc/read_window_aggregate.rs b/query_tests/src/influxrpc/read_window_aggregate.rs index 65942baa1b..2f6445deaa 100644 --- a/query_tests/src/influxrpc/read_window_aggregate.rs +++ b/query_tests/src/influxrpc/read_window_aggregate.rs @@ -30,7 +30,7 @@ async fn run_read_window_aggregate_test_case( let plan = planner .read_window_aggregate( - db.as_query_database_arc(), + db.as_query_namespace_arc(), predicate.clone(), agg, every, diff --git a/query_tests/src/influxrpc/table_names.rs b/query_tests/src/influxrpc/table_names.rs index f787b36410..e171c4fc9d 100644 --- a/query_tests/src/influxrpc/table_names.rs +++ b/query_tests/src/influxrpc/table_names.rs @@ -29,7 +29,7 @@ async fn run_table_names_test_case( let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner - .table_names(db.as_query_database_arc(), predicate.clone()) + .table_names(db.as_query_namespace_arc(), predicate.clone()) .await .expect("built plan successfully"); diff --git a/query_tests/src/influxrpc/tag_keys.rs b/query_tests/src/influxrpc/tag_keys.rs index 72551599f4..aeac01d474 100644 --- a/query_tests/src/influxrpc/tag_keys.rs +++ b/query_tests/src/influxrpc/tag_keys.rs @@ -31,7 +31,7 @@ async fn run_tag_keys_test_case( let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner - .tag_keys(db.as_query_database_arc(), predicate.clone()) + .tag_keys(db.as_query_namespace_arc(), predicate.clone()) .await .expect("built plan successfully"); let names = ctx diff --git a/query_tests/src/influxrpc/tag_values.rs b/query_tests/src/influxrpc/tag_values.rs index b98b2fc62e..a6b61149a6 100644 --- a/query_tests/src/influxrpc/tag_values.rs +++ b/query_tests/src/influxrpc/tag_values.rs @@ -30,7 +30,7 @@ async fn run_tag_values_test_case( let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner")); let plan = planner - .tag_values(db.as_query_database_arc(), tag_name, predicate.clone()) + .tag_values(db.as_query_namespace_arc(), tag_name, predicate.clone()) .await .expect("built plan successfully"); let names = ctx @@ -292,7 +292,7 @@ async fn list_tag_values_field_col_on_tag() { let tag_name = "temp"; let plan_result = planner .tag_values( - db.as_query_database_arc(), + db.as_query_namespace_arc(), tag_name, InfluxRpcPredicate::default(), ) diff --git a/service_common/src/lib.rs b/service_common/src/lib.rs index 99cff740ee..242f51ccd0 100644 --- a/service_common/src/lib.rs +++ b/service_common/src/lib.rs @@ -7,20 +7,20 @@ pub mod test_util; use std::sync::Arc; use async_trait::async_trait; -use iox_query::{exec::ExecutionContextProvider, QueryDatabase}; +use iox_query::{exec::ExecutionContextProvider, QueryNamespace}; use trace::span::Span; use tracker::InstrumentedAsyncOwnedSemaphorePermit; -/// Trait that allows the query engine (which includes flight and storage/InfluxRPC) to access a virtual set of -/// databases. +/// Trait that allows the query engine (which includes flight and storage/InfluxRPC) to access a +/// virtual set of namespaces. /// -/// The query engine MUST ONLY use this trait to access the databases / catalogs. +/// The query engine MUST ONLY use this trait to access the namespaces / catalogs. #[async_trait] pub trait QueryDatabaseProvider: std::fmt::Debug + Send + Sync + 'static { - /// Abstract database. - type Db: ExecutionContextProvider + QueryDatabase; + /// Abstract namespace. + type Db: ExecutionContextProvider + QueryNamespace; - /// Get database if it exists. + /// Get namespace if it exists. async fn db(&self, name: &str, span: Option) -> Option>; /// Acquire concurrency-limiting sempahore diff --git a/service_common/src/planner.rs b/service_common/src/planner.rs index e1bc5adf71..67de772667 100644 --- a/service_common/src/planner.rs +++ b/service_common/src/planner.rs @@ -6,7 +6,7 @@ use iox_query::{ exec::IOxSessionContext, frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner}, plan::{fieldlist::FieldListPlan, seriesset::SeriesSetPlans, stringset::StringSetPlan}, - Aggregate, QueryDatabase, WindowDuration, + Aggregate, QueryNamespace, WindowDuration, }; pub use datafusion::error::{DataFusionError as Error, Result}; @@ -31,7 +31,7 @@ impl Planner { } } - /// Plan a SQL query against the data in `database`, and return a + /// Plan a SQL query against the data in a namespace, and return a /// DataFusion physical execution plan. pub async fn sql(&self, query: impl Into + Send) -> Result> { let planner = SqlQueryPlanner::new(); @@ -45,20 +45,20 @@ impl Planner { /// Creates a plan as described on /// [`InfluxRpcPlanner::table_names`], on a separate threadpool - pub async fn table_names( + pub async fn table_names( &self, - database: Arc, + namespace: Arc, predicate: InfluxRpcPredicate, ) -> Result where - D: QueryDatabase + 'static, + N: QueryNamespace + 'static, { let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner table_names")); self.ctx .run(async move { planner - .table_names(database, predicate) + .table_names(namespace, predicate) .await .map_err(|e| e.to_df_error("table_names")) }) @@ -67,20 +67,20 @@ impl Planner { /// Creates a plan as described on /// [`InfluxRpcPlanner::tag_keys`], on a separate threadpool - pub async fn tag_keys( + pub async fn tag_keys( &self, - database: Arc, + namespace: Arc, predicate: InfluxRpcPredicate, ) -> Result where - D: QueryDatabase + 'static, + N: QueryNamespace + 'static, { let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner tag_keys")); self.ctx .run(async move { planner - .tag_keys(database, predicate) + .tag_keys(namespace, predicate) .await .map_err(|e| e.to_df_error("tag_keys")) }) @@ -89,14 +89,14 @@ impl Planner { /// Creates a plan as described on /// [`InfluxRpcPlanner::tag_values`], on a separate threadpool - pub async fn tag_values( + pub async fn tag_values( &self, - database: Arc, + namespace: Arc, tag_name: impl Into + Send, predicate: InfluxRpcPredicate, ) -> Result where - D: QueryDatabase + 'static, + N: QueryNamespace + 'static, { let tag_name = tag_name.into(); let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner tag_values")); @@ -104,7 +104,7 @@ impl Planner { self.ctx .run(async move { planner - .tag_values(database, &tag_name, predicate) + .tag_values(namespace, &tag_name, predicate) .await .map_err(|e| e.to_df_error("tag_values")) }) @@ -113,20 +113,20 @@ impl Planner { /// Creates a plan as described on /// [`InfluxRpcPlanner::field_columns`], on a separate threadpool - pub async fn field_columns( + pub async fn field_columns( &self, - database: Arc, + namespace: Arc, predicate: InfluxRpcPredicate, ) -> Result where - D: QueryDatabase + 'static, + N: QueryNamespace + 'static, { let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner field_columns")); self.ctx .run(async move { planner - .field_columns(database, predicate) + .field_columns(namespace, predicate) .await .map_err(|e| e.to_df_error("field_columns")) }) @@ -135,20 +135,20 @@ impl Planner { /// Creates a plan as described on /// [`InfluxRpcPlanner::read_filter`], on a separate threadpool - pub async fn read_filter( + pub async fn read_filter( &self, - database: Arc, + namespace: Arc, predicate: InfluxRpcPredicate, ) -> Result where - D: QueryDatabase + 'static, + N: QueryNamespace + 'static, { let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_filter")); self.ctx .run(async move { planner - .read_filter(database, predicate) + .read_filter(namespace, predicate) .await .map_err(|e| e.to_df_error("read_filter")) }) @@ -157,22 +157,22 @@ impl Planner { /// Creates a plan as described on /// [`InfluxRpcPlanner::read_group`], on a separate threadpool - pub async fn read_group( + pub async fn read_group( &self, - database: Arc, + namespace: Arc, predicate: InfluxRpcPredicate, agg: Aggregate, group_columns: Vec, ) -> Result where - D: QueryDatabase + 'static, + N: QueryNamespace + 'static, { let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_group")); self.ctx .run(async move { planner - .read_group(database, predicate, agg, &group_columns) + .read_group(namespace, predicate, agg, &group_columns) .await .map_err(|e| e.to_df_error("read_group")) }) @@ -181,23 +181,23 @@ impl Planner { /// Creates a plan as described on /// [`InfluxRpcPlanner::read_window_aggregate`], on a separate threadpool - pub async fn read_window_aggregate( + pub async fn read_window_aggregate( &self, - database: Arc, + namespace: Arc, predicate: InfluxRpcPredicate, agg: Aggregate, every: WindowDuration, offset: WindowDuration, ) -> Result where - D: QueryDatabase + 'static, + N: QueryNamespace + 'static, { let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_window_aggregate")); self.ctx .run(async move { planner - .read_window_aggregate(database, predicate, agg, every, offset) + .read_window_aggregate(namespace, predicate, agg, every, offset) .await .map_err(|e| e.to_df_error("read_window_aggregate")) }) diff --git a/service_grpc_flight/src/lib.rs b/service_grpc_flight/src/lib.rs index d90e3991ea..0d8b61ef82 100644 --- a/service_grpc_flight/src/lib.rs +++ b/service_grpc_flight/src/lib.rs @@ -14,7 +14,7 @@ use futures::{SinkExt, Stream, StreamExt}; use generated_types::influxdata::iox::querier::v1 as proto; use iox_query::{ exec::{ExecutionContextProvider, IOxSessionContext}, - QueryCompletedToken, QueryDatabase, + QueryCompletedToken, QueryNamespace, }; use observability_deps::tracing::{debug, info, warn}; use pin_project::{pin_project, pinned_drop}; diff --git a/service_grpc_influxrpc/src/service.rs b/service_grpc_influxrpc/src/service.rs index cc07195d7c..e96eafb1e6 100644 --- a/service_grpc_influxrpc/src/service.rs +++ b/service_grpc_influxrpc/src/service.rs @@ -1,5 +1,5 @@ //! This module contains implementations for the storage gRPC service -//! implemented in terms of the [`QueryDatabase`](iox_query::QueryDatabase). +//! implemented in terms of the [`QueryNamespace`](iox_query::QueryNamespace). use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT}; use crate::{ @@ -30,7 +30,7 @@ use iox_query::{ fieldlist::FieldList, seriesset::converter::Error as SeriesSetError, ExecutionContextProvider, IOxSessionContext, }, - QueryDatabase, QueryText, + QueryNamespace, QueryText, }; use observability_deps::tracing::{error, info, trace, warn}; use pin_project::pin_project; @@ -1015,15 +1015,15 @@ fn get_namespace_name(input: &impl GrpcInputs) -> Result, /// Gathers all measurement names that have data in the specified /// (optional) range -async fn measurement_name_impl( - db: Arc, +async fn measurement_name_impl( + db: Arc, db_name: NamespaceName<'static>, range: Option, rpc_predicate: Option, ctx: &IOxSessionContext, ) -> Result where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { let rpc_predicate_string = format!("{:?}", rpc_predicate); let db_name = db_name.as_str(); @@ -1058,8 +1058,8 @@ where /// Return tag keys with optional measurement, timestamp and arbitrary /// predicates -async fn tag_keys_impl( - db: Arc, +async fn tag_keys_impl( + db: Arc, db_name: NamespaceName<'static>, measurement: Option, range: Option, @@ -1067,7 +1067,7 @@ async fn tag_keys_impl( ctx: &IOxSessionContext, ) -> Result where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { let rpc_predicate_string = format!("{:?}", rpc_predicate); let db_name = db_name.as_str(); @@ -1100,8 +1100,8 @@ where /// Return tag values for tag_name, with optional measurement, timestamp and /// arbitratry predicates -async fn tag_values_impl( - db: Arc, +async fn tag_values_impl( + db: Arc, db_name: NamespaceName<'static>, tag_name: String, measurement: Option, @@ -1110,7 +1110,7 @@ async fn tag_values_impl( ctx: &IOxSessionContext, ) -> Result where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { let rpc_predicate_string = format!("{:?}", rpc_predicate); @@ -1148,14 +1148,14 @@ where /// Return tag values grouped by one or more measurements with optional /// filtering predicate and optionally scoped to one or more tag keys. -async fn tag_values_grouped_by_measurement_and_tag_key_impl( - db: Arc, +async fn tag_values_grouped_by_measurement_and_tag_key_impl( + db: Arc, db_name: NamespaceName<'static>, req: TagValuesGroupedByMeasurementAndTagKeyRequest, ctx: &IOxSessionContext, ) -> Result, Error> where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { // Extract the tag key predicate. // See https://docs.influxdata.com/influxdb/v1.8/query_language/explore-schema/#show-tag-values @@ -1222,14 +1222,14 @@ where } /// Launch async tasks that materialises the result of executing read_filter. -async fn read_filter_impl( - db: Arc, +async fn read_filter_impl( + db: Arc, db_name: NamespaceName<'static>, req: ReadFilterRequest, ctx: &IOxSessionContext, ) -> Result, Error> where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { let db_name = db_name.as_str(); @@ -1267,8 +1267,8 @@ where } /// Launch async tasks that send the result of executing read_group to `tx` -async fn query_group_impl( - db: Arc, +async fn query_group_impl( + db: Arc, db_name: NamespaceName<'static>, range: Option, rpc_predicate: Option, @@ -1277,7 +1277,7 @@ async fn query_group_impl( ctx: &IOxSessionContext, ) -> Result, Error> where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { let db_name = db_name.as_str(); @@ -1324,8 +1324,8 @@ where /// Return field names, restricted via optional measurement, timestamp and /// predicate -async fn field_names_impl( - db: Arc, +async fn field_names_impl( + db: Arc, db_name: NamespaceName<'static>, measurement: Option, range: Option, @@ -1333,7 +1333,7 @@ async fn field_names_impl( ctx: &IOxSessionContext, ) -> Result where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { let rpc_predicate_string = format!("{:?}", rpc_predicate); @@ -1364,14 +1364,14 @@ where /// Materialises a collection of measurement names. Typically used as part of /// a plan to scope and group multiple plans by measurement name. -async fn materialise_measurement_names( - db: Arc, +async fn materialise_measurement_names( + db: Arc, db_name: NamespaceName<'static>, measurement_exprs: Vec, ctx: &IOxSessionContext, ) -> Result, Error> where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { use generated_types::{ node::{Comparison, Type, Value}, @@ -1442,15 +1442,15 @@ where /// /// TODO(edd): this might be better represented as a plan against the `columns` /// system table. -async fn materialise_tag_keys( - db: Arc, +async fn materialise_tag_keys( + db: Arc, db_name: NamespaceName<'static>, measurement_name: String, tag_key_predicate: tag_key_predicate::Value, ctx: &IOxSessionContext, ) -> Result, Error> where - D: QueryDatabase + ExecutionContextProvider + 'static, + N: QueryNamespace + ExecutionContextProvider + 'static, { use generated_types::tag_key_predicate::Value;