fix: Rename QueryDatabase to QueryNamespace

pull/24376/head
Carol (Nichols || Goulding) 2022-11-11 14:20:31 -05:00
parent 621560a0dc
commit 0657ad9600
No known key found for this signature in database
GPG Key ID: E907EE5A736F87D4
16 changed files with 147 additions and 153 deletions

View File

@ -11,7 +11,7 @@ use crate::{
seriesset::{SeriesSetPlan, SeriesSetPlans},
stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder},
},
QueryChunk, QueryDatabase,
QueryChunk, QueryNamespace,
};
use arrow::datatypes::DataType;
use data_types::ChunkId;
@ -226,7 +226,7 @@ impl InfluxRpcPlanner {
/// . chunks without deleted data but cannot be decided from meta data
pub async fn table_names(
&self,
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
rpc_predicate: InfluxRpcPredicate,
) -> Result<StringSetPlan> {
let ctx = self.ctx.child_ctx("table_names planning");
@ -236,10 +236,10 @@ impl InfluxRpcPlanner {
let rpc_predicate = rpc_predicate.clear_timestamp_if_max_range();
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
let tables: Vec<_> =
table_chunk_stream(Arc::clone(&database), false, &table_predicates, &ctx)
table_chunk_stream(Arc::clone(&namespace), false, &table_predicates, &ctx)
.try_filter_map(|(table_name, predicate, chunks)| async move {
// Identify which chunks can answer from its metadata and then record its table,
// and which chunks needs full plan and group them into their table
@ -291,7 +291,7 @@ impl InfluxRpcPlanner {
builder.append_string(table_name.to_string());
}
Some((predicate, chunks)) => {
let schema = database
let schema = namespace
.table_schema(table_name)
.context(TableRemovedSnafu {
table_name: table_name.as_ref(),
@ -315,13 +315,12 @@ impl InfluxRpcPlanner {
builder.build().context(CreatingStringSetSnafu)
}
/// Returns a set of plans that produces the names of "tag"
/// columns (as defined in the InfluxDB Data model) names in this
/// database that have more than zero rows which pass the
/// conditions specified by `predicate`.
/// Returns a set of plans that produces the names of "tag" columns (as defined in the InfluxDB
/// data model) names in this namespace that have more than zero rows which pass the conditions
/// specified by `predicate`.
pub async fn tag_keys(
&self,
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
rpc_predicate: InfluxRpcPredicate,
) -> Result<StringSetPlan> {
let ctx = self.ctx.child_ctx("tag_keys planning");
@ -334,11 +333,11 @@ impl InfluxRpcPlanner {
//
// 1. Find all the potential tables in the chunks
//
// 2. For each table/chunk pair, figure out which can be found
// from only metadata and which need full plans
// 2. For each table/chunk pair, figure out which can be found from only metadata and which
// need full plans
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
let mut table_predicates_need_chunks = vec![];
@ -348,7 +347,7 @@ impl InfluxRpcPlanner {
// special case - return the columns from metadata only.
// Note that columns with all rows deleted will still show here
builder = builder.append_other(
database
namespace
.table_schema(&table_name)
.context(TableRemovedSnafu {
table_name: table_name.as_ref(),
@ -364,7 +363,7 @@ impl InfluxRpcPlanner {
}
let tables: Vec<_> = table_chunk_stream(
Arc::clone(&database),
Arc::clone(&namespace),
false,
&table_predicates_need_chunks,
&ctx,
@ -458,7 +457,7 @@ impl InfluxRpcPlanner {
// out chunks (and tables) where all columns in that chunk
// were already known to have data (based on the contents of known_columns)
let schema = database
let schema = namespace
.table_schema(table_name)
.context(TableRemovedSnafu {
table_name: table_name.as_ref(),
@ -478,12 +477,11 @@ impl InfluxRpcPlanner {
builder.build().context(CreatingStringSetSnafu)
}
/// Returns a plan which finds the distinct, non-null tag values
/// in the specified `tag_name` column of this database which pass
/// the conditions specified by `predicate`.
/// Returns a plan which finds the distinct, non-null tag values in the specified `tag_name`
/// column of this namespace which pass the conditions specified by `predicate`.
pub async fn tag_values(
&self,
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
tag_name: &str,
rpc_predicate: InfluxRpcPredicate,
) -> Result<StringSetPlan> {
@ -499,14 +497,14 @@ impl InfluxRpcPlanner {
// which need full plans
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
// filter out tables that do NOT contain `tag_name` early, esp. before performing any chunk scan (which includes
// ingester RPC)
// filter out tables that do NOT contain `tag_name` early, esp. before performing any chunk
// scan (which includes ingester RPC)
let mut table_predicates_filtered = Vec::with_capacity(table_predicates.len());
for (table_name, predicate) in table_predicates {
let schema = database
let schema = namespace
.table_schema(&table_name)
.context(TableRemovedSnafu {
table_name: table_name.as_ref(),
@ -521,7 +519,7 @@ impl InfluxRpcPlanner {
}
let tables: Vec<_> = table_chunk_stream(
Arc::clone(&database),
Arc::clone(&namespace),
false,
&table_predicates_filtered,
&ctx,
@ -546,8 +544,8 @@ impl InfluxRpcPlanner {
let schema = chunk.schema();
// Skip this table if the tag_name is not a column in this chunk
// Note: This may happen even when the table contains the tag_name, because some chunks may not
// contain all columns.
// Note: This may happen even when the table contains the tag_name, because some
// chunks may not contain all columns.
let idx = if let Some(idx) = schema.find_index_of(tag_name) {
idx
} else {
@ -571,8 +569,8 @@ impl InfluxRpcPlanner {
}
);
// If there are delete predicates, we need to scan (or do full plan) the data to eliminate
// deleted data before getting tag values
// If there are delete predicates, we need to scan (or do full plan) the data to
// eliminate deleted data before getting tag values
if chunk.has_delete_predicates() {
debug!(
%table_name,
@ -628,7 +626,7 @@ impl InfluxRpcPlanner {
builder = builder.append_other(known_values.into());
if !chunks_full.is_empty() {
let schema = database
let schema = namespace
.table_schema(table_name)
.context(TableRemovedSnafu {
table_name: table_name.as_ref(),
@ -670,13 +668,12 @@ impl InfluxRpcPlanner {
builder.build().context(CreatingStringSetSnafu)
}
/// Returns a plan that produces a list of columns and their
/// datatypes (as defined in the data written via `write_lines`),
/// and which have more than zero rows which pass the conditions
/// Returns a plan that produces a list of columns and their datatypes (as defined in the data
/// written via `write_lines`), and which have more than zero rows which pass the conditions
/// specified by `predicate`.
pub async fn field_columns(
&self,
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
rpc_predicate: InfluxRpcPredicate,
) -> Result<FieldListPlan> {
let ctx = self.ctx.child_ctx("field_columns planning");
@ -692,7 +689,7 @@ impl InfluxRpcPlanner {
// values and stops the plan executing once it has them
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
// optimization: just get the field columns from metadata.
@ -701,7 +698,7 @@ impl InfluxRpcPlanner {
let mut table_predicates_need_chunks = Vec::with_capacity(table_predicates.len());
for (table_name, predicate) in table_predicates {
if predicate.is_empty() {
let schema = database
let schema = namespace
.table_schema(&table_name)
.context(TableRemovedSnafu {
table_name: table_name.as_ref(),
@ -721,7 +718,7 @@ impl InfluxRpcPlanner {
// full scans
let plans = create_plans(
database,
namespace,
&table_predicates_need_chunks,
ctx,
|ctx, table_name, predicate, chunks, schema| {
@ -762,18 +759,18 @@ impl InfluxRpcPlanner {
/// same) occur together in the plan
pub async fn read_filter(
&self,
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
rpc_predicate: InfluxRpcPredicate,
) -> Result<SeriesSetPlans> {
let ctx = self.ctx.child_ctx("planning_read_filter");
debug!(?rpc_predicate, "planning read_filter");
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
let plans = create_plans(
database,
namespace,
&table_predicates,
ctx,
|ctx, table_name, predicate, chunks, schema| {
@ -813,7 +810,7 @@ impl InfluxRpcPlanner {
/// (apply filters)
pub async fn read_group(
&self,
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
rpc_predicate: InfluxRpcPredicate,
agg: Aggregate,
group_columns: &[impl AsRef<str> + Send + Sync],
@ -822,11 +819,11 @@ impl InfluxRpcPlanner {
debug!(?rpc_predicate, ?agg, "planning read_group");
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
let plans = create_plans(
database,
namespace,
&table_predicates,
ctx,
|ctx, table_name, predicate, chunks, schema| match agg {
@ -863,7 +860,7 @@ impl InfluxRpcPlanner {
/// that are grouped by window definitions
pub async fn read_window_aggregate(
&self,
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
rpc_predicate: InfluxRpcPredicate,
agg: Aggregate,
every: WindowDuration,
@ -879,11 +876,11 @@ impl InfluxRpcPlanner {
);
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
let plans = create_plans(
database,
namespace,
&table_predicates,
ctx,
|ctx, table_name, predicate, chunks, schema| {
@ -1375,7 +1372,7 @@ impl InfluxRpcPlanner {
/// This function is indirectly invoked by `field_columns`, `read_filter`, `read_group` and `read_window_aggregate`
/// through the function `create_plans` where need_fields should be true.
fn table_chunk_stream<'a>(
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
need_fields: bool,
table_predicates: &'a [(Arc<str>, Predicate)],
ctx: &'a IOxSessionContext,
@ -1385,9 +1382,9 @@ fn table_chunk_stream<'a>(
let mut ctx = ctx.child_ctx("table");
ctx.set_metadata("table", table_name.to_string());
let database = Arc::clone(&database);
let namespace = Arc::clone(&namespace);
let table_schema = database.table_schema(table_name);
let table_schema = namespace.table_schema(table_name);
let projection = match table_schema {
Some(table_schema) => {
columns_in_predicates(need_fields, table_schema, table_name, predicate)
@ -1396,7 +1393,7 @@ fn table_chunk_stream<'a>(
};
async move {
let chunks = database
let chunks = namespace
.chunks(
table_name,
predicate,
@ -1507,7 +1504,7 @@ fn columns_in_predicates(
/// `f(ctx, table_name, table_predicate, chunks, table_schema)` is
/// invoked on the chunks for each table to produce a plan for each
async fn create_plans<F, P>(
database: Arc<dyn QueryDatabase>,
namespace: Arc<dyn QueryNamespace>,
table_predicates: &[(Arc<str>, Predicate)],
ctx: IOxSessionContext,
f: F,
@ -1525,7 +1522,7 @@ where
+ Sync,
P: Send,
{
table_chunk_stream(Arc::clone(&database), true, table_predicates, &ctx)
table_chunk_stream(Arc::clone(&namespace), true, table_predicates, &ctx)
.and_then(|(table_name, predicate, chunks)| async move {
let chunks = prune_chunks_metadata(chunks, predicate)?;
Ok((table_name, predicate, chunks))
@ -1540,11 +1537,11 @@ where
let mut ctx = ctx.child_ctx("table");
ctx.set_metadata("table", table_name.to_string());
let database = Arc::clone(&database);
let namespace = Arc::clone(&namespace);
let f = f.clone();
async move {
let schema = database
let schema = namespace
.table_schema(table_name)
.context(TableRemovedSnafu {
table_name: table_name.as_ref(),

View File

@ -89,7 +89,7 @@ pub trait QueryChunkMeta {
}
/// A `QueryCompletedToken` is returned by `record_query` implementations of
/// a `QueryDatabase`. It is used to trigger side-effects (such as query timing)
/// a `QueryNamespace`. It is used to trigger side-effects (such as query timing)
/// on query completion.
///
pub struct QueryCompletedToken {
@ -136,22 +136,20 @@ impl Drop for QueryCompletedToken {
/// This avoids storing potentially large strings
pub type QueryText = Box<dyn std::fmt::Display + Send + Sync>;
/// A `Database` is the main trait implemented by the IOx subsystems
/// that store actual data.
/// `QueryNamespace` is the main trait implemented by the IOx subsystems that store actual data.
///
/// Databases store data organized by partitions and each partition stores
/// data in Chunks.
/// Namespaces store data organized by partitions and each partition stores data in Chunks.
#[async_trait]
pub trait QueryDatabase: QueryNamespaceMeta + Debug + Send + Sync {
/// Returns a set of chunks within the partition with data that may match
/// the provided predicate.
pub trait QueryNamespace: QueryNamespaceMeta + Debug + Send + Sync {
/// Returns a set of chunks within the partition with data that may match the provided
/// predicate.
///
/// If possible, chunks which have no rows that can
/// possibly match the predicate may be omitted.
/// If possible, chunks which have no rows that can possibly match the predicate may be omitted.
///
/// If projection is None, returned chunks will include all columns of its original data. Otherwise,
/// returned chunks will include PK columns (tags and time) and columns specified in the projection. Projecting
/// chunks here is optional and a mere optimization. The query subsystem does NOT rely on it.
/// If projection is `None`, returned chunks will include all columns of its original data.
/// Otherwise, returned chunks will include PK columns (tags and time) and columns specified in
/// the projection. Projecting chunks here is optional and a mere optimization. The query
/// subsystem does NOT rely on it.
async fn chunks(
&self,
table_name: &str,

View File

@ -1,5 +1,4 @@
//! This module provides a reference implementation of
//! [`QueryDatabase`] for use in testing.
//! This module provides a reference implementation of [`QueryNamespace`] for use in testing.
//!
//! AKA it is a Mock
@ -9,7 +8,7 @@ use crate::{
ExecutionContextProvider, Executor, ExecutorType, IOxSessionContext,
},
Predicate, PredicateMatch, QueryChunk, QueryChunkData, QueryChunkMeta, QueryCompletedToken,
QueryDatabase, QueryText,
QueryNamespace, QueryText,
};
use arrow::{
array::{
@ -100,7 +99,7 @@ impl TestDatabase {
}
#[async_trait]
impl QueryDatabase for TestDatabase {
impl QueryNamespace for TestDatabase {
async fn chunks(
&self,
table_name: &str,

View File

@ -16,7 +16,7 @@ use datafusion::{
use datafusion_util::config::DEFAULT_SCHEMA;
use iox_query::{
exec::{ExecutionContextProvider, ExecutorType, IOxSessionContext},
QueryChunk, QueryCompletedToken, QueryDatabase, QueryText,
QueryChunk, QueryCompletedToken, QueryNamespace, QueryText,
};
use observability_deps::tracing::{debug, trace};
use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate};
@ -37,7 +37,7 @@ impl QueryNamespaceMeta for QuerierNamespace {
}
#[async_trait]
impl QueryDatabase for QuerierNamespace {
impl QueryNamespace for QuerierNamespace {
async fn chunks(
&self,
table_name: &str,

View File

@ -1,11 +1,11 @@
use std::{any::Any, sync::Arc};
use datafusion::catalog::catalog::CatalogProvider;
use iox_query::{exec::ExecutionContextProvider, QueryDatabase};
use iox_query::{exec::ExecutionContextProvider, QueryNamespace};
use querier::QuerierNamespace;
/// Abstract database used during testing.
pub trait AbstractDb: CatalogProvider + ExecutionContextProvider + QueryDatabase {
pub trait AbstractDb: CatalogProvider + ExecutionContextProvider + QueryNamespace {
fn as_any_arc(self: Arc<Self>) -> Arc<dyn Any + Send + Sync + 'static>;
/// Upcast to [`CatalogProvider`].
@ -13,10 +13,10 @@ pub trait AbstractDb: CatalogProvider + ExecutionContextProvider + QueryDatabase
/// This is required due to <https://github.com/rust-lang/rust/issues/65991>.
fn as_catalog_provider_arc(self: Arc<Self>) -> Arc<dyn CatalogProvider>;
/// Upcast to [`QueryDatabase`].
/// Upcast to [`QueryNamespace`].
///
/// This is required due to <https://github.com/rust-lang/rust/issues/65991>.
fn as_query_database_arc(self: Arc<Self>) -> Arc<dyn QueryDatabase>;
fn as_query_namespace_arc(self: Arc<Self>) -> Arc<dyn QueryNamespace>;
}
impl AbstractDb for QuerierNamespace {
@ -28,7 +28,7 @@ impl AbstractDb for QuerierNamespace {
self as _
}
fn as_query_database_arc(self: Arc<Self>) -> Arc<dyn QueryDatabase> {
fn as_query_namespace_arc(self: Arc<Self>) -> Arc<dyn QueryNamespace> {
self
}
}

View File

@ -32,7 +32,7 @@ async fn run_field_columns_test_case<D>(
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.field_columns(db.as_query_database_arc(), predicate.clone())
.field_columns(db.as_query_namespace_arc(), predicate.clone())
.await
.expect("built plan successfully");
let fields = ctx

View File

@ -64,7 +64,7 @@ async fn run_read_filter(
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.read_filter(db.as_query_database_arc(), predicate)
.read_filter(db.as_query_namespace_arc(), predicate)
.await
.map_err(|e| e.to_string())?;

View File

@ -38,7 +38,7 @@ async fn run_read_group_test_case<D>(
let plans = planner
.read_group(
db.as_query_database_arc(),
db.as_query_namespace_arc(),
predicate.clone(),
agg,
&group_columns,

View File

@ -30,7 +30,7 @@ async fn run_read_window_aggregate_test_case<D>(
let plan = planner
.read_window_aggregate(
db.as_query_database_arc(),
db.as_query_namespace_arc(),
predicate.clone(),
agg,
every,

View File

@ -29,7 +29,7 @@ async fn run_table_names_test_case<D>(
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.table_names(db.as_query_database_arc(), predicate.clone())
.table_names(db.as_query_namespace_arc(), predicate.clone())
.await
.expect("built plan successfully");

View File

@ -31,7 +31,7 @@ async fn run_tag_keys_test_case<D>(
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.tag_keys(db.as_query_database_arc(), predicate.clone())
.tag_keys(db.as_query_namespace_arc(), predicate.clone())
.await
.expect("built plan successfully");
let names = ctx

View File

@ -30,7 +30,7 @@ async fn run_tag_values_test_case<D>(
let planner = InfluxRpcPlanner::new(ctx.child_ctx("planner"));
let plan = planner
.tag_values(db.as_query_database_arc(), tag_name, predicate.clone())
.tag_values(db.as_query_namespace_arc(), tag_name, predicate.clone())
.await
.expect("built plan successfully");
let names = ctx
@ -292,7 +292,7 @@ async fn list_tag_values_field_col_on_tag() {
let tag_name = "temp";
let plan_result = planner
.tag_values(
db.as_query_database_arc(),
db.as_query_namespace_arc(),
tag_name,
InfluxRpcPredicate::default(),
)

View File

@ -7,20 +7,20 @@ pub mod test_util;
use std::sync::Arc;
use async_trait::async_trait;
use iox_query::{exec::ExecutionContextProvider, QueryDatabase};
use iox_query::{exec::ExecutionContextProvider, QueryNamespace};
use trace::span::Span;
use tracker::InstrumentedAsyncOwnedSemaphorePermit;
/// Trait that allows the query engine (which includes flight and storage/InfluxRPC) to access a virtual set of
/// databases.
/// Trait that allows the query engine (which includes flight and storage/InfluxRPC) to access a
/// virtual set of namespaces.
///
/// The query engine MUST ONLY use this trait to access the databases / catalogs.
/// The query engine MUST ONLY use this trait to access the namespaces / catalogs.
#[async_trait]
pub trait QueryDatabaseProvider: std::fmt::Debug + Send + Sync + 'static {
/// Abstract database.
type Db: ExecutionContextProvider + QueryDatabase;
/// Abstract namespace.
type Db: ExecutionContextProvider + QueryNamespace;
/// Get database if it exists.
/// Get namespace if it exists.
async fn db(&self, name: &str, span: Option<Span>) -> Option<Arc<Self::Db>>;
/// Acquire concurrency-limiting sempahore

View File

@ -6,7 +6,7 @@ use iox_query::{
exec::IOxSessionContext,
frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner},
plan::{fieldlist::FieldListPlan, seriesset::SeriesSetPlans, stringset::StringSetPlan},
Aggregate, QueryDatabase, WindowDuration,
Aggregate, QueryNamespace, WindowDuration,
};
pub use datafusion::error::{DataFusionError as Error, Result};
@ -31,7 +31,7 @@ impl Planner {
}
}
/// Plan a SQL query against the data in `database`, and return a
/// Plan a SQL query against the data in a namespace, and return a
/// DataFusion physical execution plan.
pub async fn sql(&self, query: impl Into<String> + Send) -> Result<Arc<dyn ExecutionPlan>> {
let planner = SqlQueryPlanner::new();
@ -45,20 +45,20 @@ impl Planner {
/// Creates a plan as described on
/// [`InfluxRpcPlanner::table_names`], on a separate threadpool
pub async fn table_names<D>(
pub async fn table_names<N>(
&self,
database: Arc<D>,
namespace: Arc<N>,
predicate: InfluxRpcPredicate,
) -> Result<StringSetPlan>
where
D: QueryDatabase + 'static,
N: QueryNamespace + 'static,
{
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner table_names"));
self.ctx
.run(async move {
planner
.table_names(database, predicate)
.table_names(namespace, predicate)
.await
.map_err(|e| e.to_df_error("table_names"))
})
@ -67,20 +67,20 @@ impl Planner {
/// Creates a plan as described on
/// [`InfluxRpcPlanner::tag_keys`], on a separate threadpool
pub async fn tag_keys<D>(
pub async fn tag_keys<N>(
&self,
database: Arc<D>,
namespace: Arc<N>,
predicate: InfluxRpcPredicate,
) -> Result<StringSetPlan>
where
D: QueryDatabase + 'static,
N: QueryNamespace + 'static,
{
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner tag_keys"));
self.ctx
.run(async move {
planner
.tag_keys(database, predicate)
.tag_keys(namespace, predicate)
.await
.map_err(|e| e.to_df_error("tag_keys"))
})
@ -89,14 +89,14 @@ impl Planner {
/// Creates a plan as described on
/// [`InfluxRpcPlanner::tag_values`], on a separate threadpool
pub async fn tag_values<D>(
pub async fn tag_values<N>(
&self,
database: Arc<D>,
namespace: Arc<N>,
tag_name: impl Into<String> + Send,
predicate: InfluxRpcPredicate,
) -> Result<StringSetPlan>
where
D: QueryDatabase + 'static,
N: QueryNamespace + 'static,
{
let tag_name = tag_name.into();
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner tag_values"));
@ -104,7 +104,7 @@ impl Planner {
self.ctx
.run(async move {
planner
.tag_values(database, &tag_name, predicate)
.tag_values(namespace, &tag_name, predicate)
.await
.map_err(|e| e.to_df_error("tag_values"))
})
@ -113,20 +113,20 @@ impl Planner {
/// Creates a plan as described on
/// [`InfluxRpcPlanner::field_columns`], on a separate threadpool
pub async fn field_columns<D>(
pub async fn field_columns<N>(
&self,
database: Arc<D>,
namespace: Arc<N>,
predicate: InfluxRpcPredicate,
) -> Result<FieldListPlan>
where
D: QueryDatabase + 'static,
N: QueryNamespace + 'static,
{
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner field_columns"));
self.ctx
.run(async move {
planner
.field_columns(database, predicate)
.field_columns(namespace, predicate)
.await
.map_err(|e| e.to_df_error("field_columns"))
})
@ -135,20 +135,20 @@ impl Planner {
/// Creates a plan as described on
/// [`InfluxRpcPlanner::read_filter`], on a separate threadpool
pub async fn read_filter<D>(
pub async fn read_filter<N>(
&self,
database: Arc<D>,
namespace: Arc<N>,
predicate: InfluxRpcPredicate,
) -> Result<SeriesSetPlans>
where
D: QueryDatabase + 'static,
N: QueryNamespace + 'static,
{
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_filter"));
self.ctx
.run(async move {
planner
.read_filter(database, predicate)
.read_filter(namespace, predicate)
.await
.map_err(|e| e.to_df_error("read_filter"))
})
@ -157,22 +157,22 @@ impl Planner {
/// Creates a plan as described on
/// [`InfluxRpcPlanner::read_group`], on a separate threadpool
pub async fn read_group<D>(
pub async fn read_group<N>(
&self,
database: Arc<D>,
namespace: Arc<N>,
predicate: InfluxRpcPredicate,
agg: Aggregate,
group_columns: Vec<String>,
) -> Result<SeriesSetPlans>
where
D: QueryDatabase + 'static,
N: QueryNamespace + 'static,
{
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_group"));
self.ctx
.run(async move {
planner
.read_group(database, predicate, agg, &group_columns)
.read_group(namespace, predicate, agg, &group_columns)
.await
.map_err(|e| e.to_df_error("read_group"))
})
@ -181,23 +181,23 @@ impl Planner {
/// Creates a plan as described on
/// [`InfluxRpcPlanner::read_window_aggregate`], on a separate threadpool
pub async fn read_window_aggregate<D>(
pub async fn read_window_aggregate<N>(
&self,
database: Arc<D>,
namespace: Arc<N>,
predicate: InfluxRpcPredicate,
agg: Aggregate,
every: WindowDuration,
offset: WindowDuration,
) -> Result<SeriesSetPlans>
where
D: QueryDatabase + 'static,
N: QueryNamespace + 'static,
{
let planner = InfluxRpcPlanner::new(self.ctx.child_ctx("planner read_window_aggregate"));
self.ctx
.run(async move {
planner
.read_window_aggregate(database, predicate, agg, every, offset)
.read_window_aggregate(namespace, predicate, agg, every, offset)
.await
.map_err(|e| e.to_df_error("read_window_aggregate"))
})

View File

@ -14,7 +14,7 @@ use futures::{SinkExt, Stream, StreamExt};
use generated_types::influxdata::iox::querier::v1 as proto;
use iox_query::{
exec::{ExecutionContextProvider, IOxSessionContext},
QueryCompletedToken, QueryDatabase,
QueryCompletedToken, QueryNamespace,
};
use observability_deps::tracing::{debug, info, warn};
use pin_project::{pin_project, pinned_drop};

View File

@ -1,5 +1,5 @@
//! This module contains implementations for the storage gRPC service
//! implemented in terms of the [`QueryDatabase`](iox_query::QueryDatabase).
//! implemented in terms of the [`QueryNamespace`](iox_query::QueryNamespace).
use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT};
use crate::{
@ -30,7 +30,7 @@ use iox_query::{
fieldlist::FieldList, seriesset::converter::Error as SeriesSetError,
ExecutionContextProvider, IOxSessionContext,
},
QueryDatabase, QueryText,
QueryNamespace, QueryText,
};
use observability_deps::tracing::{error, info, trace, warn};
use pin_project::pin_project;
@ -1015,15 +1015,15 @@ fn get_namespace_name(input: &impl GrpcInputs) -> Result<NamespaceName<'static>,
/// Gathers all measurement names that have data in the specified
/// (optional) range
async fn measurement_name_impl<D>(
db: Arc<D>,
async fn measurement_name_impl<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>,
ctx: &IOxSessionContext,
) -> Result<StringValuesResponse>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
let rpc_predicate_string = format!("{:?}", rpc_predicate);
let db_name = db_name.as_str();
@ -1058,8 +1058,8 @@ where
/// Return tag keys with optional measurement, timestamp and arbitrary
/// predicates
async fn tag_keys_impl<D>(
db: Arc<D>,
async fn tag_keys_impl<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
measurement: Option<String>,
range: Option<TimestampRange>,
@ -1067,7 +1067,7 @@ async fn tag_keys_impl<D>(
ctx: &IOxSessionContext,
) -> Result<StringValuesResponse>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
let rpc_predicate_string = format!("{:?}", rpc_predicate);
let db_name = db_name.as_str();
@ -1100,8 +1100,8 @@ where
/// Return tag values for tag_name, with optional measurement, timestamp and
/// arbitratry predicates
async fn tag_values_impl<D>(
db: Arc<D>,
async fn tag_values_impl<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
tag_name: String,
measurement: Option<String>,
@ -1110,7 +1110,7 @@ async fn tag_values_impl<D>(
ctx: &IOxSessionContext,
) -> Result<StringValuesResponse>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
let rpc_predicate_string = format!("{:?}", rpc_predicate);
@ -1148,14 +1148,14 @@ where
/// Return tag values grouped by one or more measurements with optional
/// filtering predicate and optionally scoped to one or more tag keys.
async fn tag_values_grouped_by_measurement_and_tag_key_impl<D>(
db: Arc<D>,
async fn tag_values_grouped_by_measurement_and_tag_key_impl<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
req: TagValuesGroupedByMeasurementAndTagKeyRequest,
ctx: &IOxSessionContext,
) -> Result<Vec<TagValuesResponse>, Error>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
// Extract the tag key predicate.
// See https://docs.influxdata.com/influxdb/v1.8/query_language/explore-schema/#show-tag-values
@ -1222,14 +1222,14 @@ where
}
/// Launch async tasks that materialises the result of executing read_filter.
async fn read_filter_impl<D>(
db: Arc<D>,
async fn read_filter_impl<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
req: ReadFilterRequest,
ctx: &IOxSessionContext,
) -> Result<Vec<ReadResponse>, Error>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
let db_name = db_name.as_str();
@ -1267,8 +1267,8 @@ where
}
/// Launch async tasks that send the result of executing read_group to `tx`
async fn query_group_impl<D>(
db: Arc<D>,
async fn query_group_impl<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>,
@ -1277,7 +1277,7 @@ async fn query_group_impl<D>(
ctx: &IOxSessionContext,
) -> Result<Vec<ReadResponse>, Error>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
let db_name = db_name.as_str();
@ -1324,8 +1324,8 @@ where
/// Return field names, restricted via optional measurement, timestamp and
/// predicate
async fn field_names_impl<D>(
db: Arc<D>,
async fn field_names_impl<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
measurement: Option<String>,
range: Option<TimestampRange>,
@ -1333,7 +1333,7 @@ async fn field_names_impl<D>(
ctx: &IOxSessionContext,
) -> Result<FieldList>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
let rpc_predicate_string = format!("{:?}", rpc_predicate);
@ -1364,14 +1364,14 @@ where
/// Materialises a collection of measurement names. Typically used as part of
/// a plan to scope and group multiple plans by measurement name.
async fn materialise_measurement_names<D>(
db: Arc<D>,
async fn materialise_measurement_names<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
measurement_exprs: Vec<LiteralOrRegex>,
ctx: &IOxSessionContext,
) -> Result<BTreeSet<String>, Error>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
use generated_types::{
node::{Comparison, Type, Value},
@ -1442,15 +1442,15 @@ where
///
/// TODO(edd): this might be better represented as a plan against the `columns`
/// system table.
async fn materialise_tag_keys<D>(
db: Arc<D>,
async fn materialise_tag_keys<N>(
db: Arc<N>,
db_name: NamespaceName<'static>,
measurement_name: String,
tag_key_predicate: tag_key_predicate::Value,
ctx: &IOxSessionContext,
) -> Result<BTreeSet<String>, Error>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
N: QueryNamespace + ExecutionContextProvider + 'static,
{
use generated_types::tag_key_predicate::Value;