diff --git a/query/src/exec.rs b/query/src/exec.rs index d0aca38673..7c387c5c2e 100644 --- a/query/src/exec.rs +++ b/query/src/exec.rs @@ -100,34 +100,56 @@ pub type Result = std::result::Result; /// Handles executing DataFusion plans, and marshalling the results into rust /// native structures. +/// +/// TODO: Have a resource manager that would limit how many plans are +/// running, based on a policy #[derive(Debug)] pub struct Executor { - exec: DedicatedExecutor, + /// Executor for running user queries + query_exec: DedicatedExecutor, + + /// Executor for running system/reorganization tasks such as + /// compact + reorg_exec: DedicatedExecutor, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ExecutorType { + /// Run using the pool for queries + Query, + /// Run using the pool for system / reorganization tasks + Reorg, } impl Executor { - /// Creates a new executor with a single dedicated thread pool with - /// num_threads + /// Creates a new executor with a two dedicated thread pools, each + /// with num_threads pub fn new(num_threads: usize) -> Self { - let exec = DedicatedExecutor::new("IOx Executor Thread", num_threads); + let query_exec = DedicatedExecutor::new("IOx Query Executor Thread", num_threads); + let reorg_exec = DedicatedExecutor::new("IOx Reorg Executor Thread", num_threads); - Self { exec } + Self { + query_exec, + reorg_exec, + } } - /// Executes this plan and returns the resulting set of strings + /// Executes this plan on the query pool, and returns the + /// resulting set of strings pub async fn to_string_set(&self, plan: StringSetPlan) -> Result { match plan { StringSetPlan::Known(ss) => Ok(ss), StringSetPlan::Plan(plans) => self - .run_logical_plans(plans) + .run_logical_plans(plans, ExecutorType::Query) .await? .into_stringset() .context(StringSetConversion), } } - /// Executes the embedded plans, each as separate tasks combining the results - /// into the returned collection of items. + /// Executes the SeriesSetPlans on the query exectutor, in + /// parallel, combining the results into the returned collection + /// of items. /// /// The SeriesSets are guaranteed to come back ordered by table_name. pub async fn to_series_set( @@ -147,47 +169,50 @@ impl Executor { let handles = plans .into_iter() .map(|plan| { - // TODO run these on some executor other than the main tokio pool (maybe?) - let ctx = self.new_context(); + let executor_type = ExecutorType::Query; + let ctx = self.new_context(executor_type); - self.exec.spawn(async move { - let SeriesSetPlan { - table_name, - plan, - tag_columns, - field_columns, - num_prefix_tag_group_columns, - } = plan; - - let tag_columns = Arc::new(tag_columns); - - let physical_plan = ctx - .prepare_plan(&plan) - .context(DataFusionPhysicalPlanning)?; - - let it = ctx - .execute(physical_plan) - .await - .context(SeriesSetExecution)?; - - SeriesSetConverter::default() - .convert( + self.run( + async move { + let SeriesSetPlan { table_name, + plan, tag_columns, field_columns, num_prefix_tag_group_columns, - it, - ) - .await - .context(SeriesSetConversion) - }) + } = plan; + + let tag_columns = Arc::new(tag_columns); + + let physical_plan = ctx + .prepare_plan(&plan) + .context(DataFusionPhysicalPlanning)?; + + let it = ctx + .execute(physical_plan) + .await + .context(SeriesSetExecution)?; + + SeriesSetConverter::default() + .convert( + table_name, + tag_columns, + field_columns, + num_prefix_tag_group_columns, + it, + ) + .await + .context(SeriesSetConversion) + }, + executor_type, + ) }) .collect::>(); // join_all ensures that the results are consumed in the same order they // were spawned maintaining the guarantee to return results ordered // by the plan sort order. - let handles = future::try_join_all(handles).await.context(TaskJoinError)?; + let handles = future::try_join_all(handles).await?; let mut results = vec![]; for handle in handles { results.extend(handle?.into_iter()); @@ -196,7 +221,7 @@ impl Executor { Ok(results) } - /// Executes `plan` and return the resulting FieldList + /// Executes `plan` and return the resulting FieldList on the query executor pub async fn to_field_list(&self, plan: FieldListPlan) -> Result { let FieldListPlan { plans } = plan; @@ -204,29 +229,33 @@ impl Executor { let handles = plans .into_iter() .map(|plan| { - let ctx = self.new_context(); - self.exec.spawn(async move { - let physical_plan = ctx - .prepare_plan(&plan) - .context(DataFusionPhysicalPlanning)?; + let executor_type = ExecutorType::Query; + let ctx = self.new_context(executor_type); + self.run( + async move { + let physical_plan = ctx + .prepare_plan(&plan) + .context(DataFusionPhysicalPlanning)?; - // TODO: avoid this buffering - let fieldlist = ctx - .collect(physical_plan) - .await - .context(FieldListExectuor)? - .into_fieldlist() - .context(FieldListConversion); + // TODO: avoid this buffering + let fieldlist = ctx + .collect(physical_plan) + .await + .context(FieldListExectuor)? + .into_fieldlist() + .context(FieldListConversion); - Ok(fieldlist) - }) + Ok(fieldlist) + }, + executor_type, + ) }) .collect::>(); // collect them all up and combine them let mut results = Vec::new(); for join_handle in handles { - let fieldlist = join_handle.await.context(TaskJoinError)???; + let fieldlist = join_handle.await???; results.push(fieldlist); } @@ -235,64 +264,88 @@ impl Executor { } /// Run the plan and return a record batch reader for reading the results - pub async fn run_logical_plan(&self, plan: LogicalPlan) -> Result> { - self.run_logical_plans(vec![plan]).await + pub async fn run_logical_plan( + &self, + plan: LogicalPlan, + executor_type: ExecutorType, + ) -> Result> { + self.run_logical_plans(vec![plan], executor_type).await } /// Executes the logical plan using DataFusion on a separate /// thread pool and produces RecordBatches - pub async fn collect(&self, physical_plan: Arc) -> Result> { - self.new_context() + pub async fn collect( + &self, + physical_plan: Arc, + executor_type: ExecutorType, + ) -> Result> { + self.new_context(executor_type) .collect(physical_plan) .await .context(DataFusionExecution) } - /// Create a new execution context, suitable for executing a new query - pub fn new_context(&self) -> IOxExecutionContext { - IOxExecutionContext::new(self.exec.clone()) + /// Create a new execution context, suitable for executing a new query or system task + pub fn new_context(&self, executor_type: ExecutorType) -> IOxExecutionContext { + let executor = self.executor(executor_type).clone(); + + IOxExecutionContext::new(executor) + } + + /// Return the execution pool of the specified type + fn executor(&self, executor_type: ExecutorType) -> &DedicatedExecutor { + match executor_type { + ExecutorType::Query => &self.query_exec, + ExecutorType::Reorg => &self.reorg_exec, + } } /// plans and runs the plans in parallel and collects the results /// run each plan in parallel and collect the results - async fn run_logical_plans(&self, plans: Vec) -> Result> { + async fn run_logical_plans( + &self, + plans: Vec, + executor_type: ExecutorType, + ) -> Result> { let value_futures = plans .into_iter() .map(|plan| { - let ctx = self.new_context(); + let ctx = self.new_context(executor_type); - self.exec.spawn(async move { - let physical_plan = ctx - .prepare_plan(&plan) - .context(DataFusionPhysicalPlanning)?; + self.run( + async move { + let physical_plan = ctx + .prepare_plan(&plan) + .context(DataFusionPhysicalPlanning)?; - // TODO: avoid this buffering - ctx.collect(physical_plan) - .await - .context(DataFusionExecution) - }) + // TODO: avoid this buffering + ctx.collect(physical_plan) + .await + .context(DataFusionExecution) + }, + executor_type, + ) }) .collect::>(); // now, wait for all the values to resolve and collect them together let mut results = Vec::new(); for join_handle in value_futures { - let mut plan_result = join_handle.await.context(TaskJoinError)??; + let mut plan_result = join_handle.await??; results.append(&mut plan_result); } Ok(results) } /// Runs the specified Future (and any tasks it spawns) on the - /// worker pool for this executor, returning the result of the - /// computation. - pub async fn run(&self, task: T) -> Result + /// specified executor, returning the result of the computation. + pub async fn run(&self, task: T, executor_type: ExecutorType) -> Result where T: Future + Send + 'static, T::Output: Send + 'static, { // run on the dedicated executor - self.exec + self.executor(executor_type) .spawn(task) // wait on the *current* tokio executor .await diff --git a/query/src/frontend/reorg.rs b/query/src/frontend/reorg.rs index 9f0096ba27..d77a0ce68a 100644 --- a/query/src/frontend/reorg.rs +++ b/query/src/frontend/reorg.rs @@ -239,7 +239,7 @@ mod test { use internal_types::schema::sort::SortOptions; use crate::{ - exec::Executor, + exec::{Executor, ExecutorType}, test::{raw_data, TestChunk}, }; @@ -318,7 +318,10 @@ mod test { .expect("created compact plan"); let executor = Executor::new(1); - let physical_plan = executor.new_context().prepare_plan(&compact_plan).unwrap(); + let physical_plan = executor + .new_context(ExecutorType::Reorg) + .prepare_plan(&compact_plan) + .unwrap(); assert_eq!( physical_plan.output_partitioning().partition_count(), 1, @@ -368,7 +371,10 @@ mod test { .expect("created compact plan"); let executor = Executor::new(1); - let physical_plan = executor.new_context().prepare_plan(&split_plan).unwrap(); + let physical_plan = executor + .new_context(ExecutorType::Reorg) + .prepare_plan(&split_plan) + .unwrap(); assert_eq!( physical_plan.output_partitioning().partition_count(), diff --git a/query/src/frontend/sql.rs b/query/src/frontend/sql.rs index 14e44f2ed6..be737fd4d6 100644 --- a/query/src/frontend/sql.rs +++ b/query/src/frontend/sql.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use snafu::{ResultExt, Snafu}; -use crate::exec::{context::DEFAULT_CATALOG, Executor}; +use crate::exec::{context::DEFAULT_CATALOG, Executor, ExecutorType}; use datafusion::{ catalog::catalog::CatalogProvider, error::DataFusionError, physical_plan::ExecutionPlan, }; @@ -76,15 +76,17 @@ impl SqlQueryPlanner { } /// Plan a SQL query against the data in `database`, and return a - /// DataFusion physical execution plan. The plan can then be - /// executed using `executor` in a streaming fashion. + /// DataFusion physical execution plan that runs on the query executor. + /// + /// When the plan is executed, it will run on the query executor + /// in a streaming fashion. pub fn query( &self, database: Arc, query: &str, executor: &Executor, ) -> Result> { - let mut ctx = executor.new_context(); + let mut ctx = executor.new_context(ExecutorType::Query); ctx.inner_mut().register_catalog(DEFAULT_CATALOG, database); ctx.prepare_sql(query).context(Preparing) } diff --git a/query_tests/src/influxrpc/field_columns.rs b/query_tests/src/influxrpc/field_columns.rs index c9b4fb16ec..18e1249ae0 100644 --- a/query_tests/src/influxrpc/field_columns.rs +++ b/query_tests/src/influxrpc/field_columns.rs @@ -2,7 +2,10 @@ use arrow::datatypes::DataType; use arrow_util::assert_batches_eq; use datafusion::logical_plan::{col, lit}; use query::{ - exec::fieldlist::{Field, FieldList}, + exec::{ + fieldlist::{Field, FieldList}, + ExecutorType, + }, frontend::influxrpc::InfluxRpcPlanner, predicate::PredicateBuilder, }; @@ -140,7 +143,7 @@ async fn test_field_name_plan() { // expected (specifically that the column ordering is correct) let results = db .executor() - .run_logical_plan(plan) + .run_logical_plan(plan, ExecutorType::Query) .await .expect("ok running plan"); diff --git a/query_tests/src/influxrpc/read_group.rs b/query_tests/src/influxrpc/read_group.rs index df041cb238..2a60224a03 100644 --- a/query_tests/src/influxrpc/read_group.rs +++ b/query_tests/src/influxrpc/read_group.rs @@ -5,6 +5,7 @@ use arrow::util::pretty::pretty_format_batches; use async_trait::async_trait; use datafusion::prelude::*; use query::{ + exec::ExecutorType, frontend::influxrpc::InfluxRpcPlanner, group_by::Aggregate, predicate::{Predicate, PredicateBuilder}, @@ -46,7 +47,7 @@ macro_rules! run_read_group_test_case { for plan in plans.into_iter() { let batches = db .executor() - .run_logical_plan(plan.plan) + .run_logical_plan(plan.plan, ExecutorType::Query) .await .expect("ok running plan"); diff --git a/query_tests/src/influxrpc/read_window_aggregate.rs b/query_tests/src/influxrpc/read_window_aggregate.rs index 981aecb77b..a66b6cc706 100644 --- a/query_tests/src/influxrpc/read_window_aggregate.rs +++ b/query_tests/src/influxrpc/read_window_aggregate.rs @@ -7,6 +7,7 @@ use arrow::util::pretty::pretty_format_batches; use async_trait::async_trait; use datafusion::prelude::*; use query::{ + exec::ExecutorType, frontend::influxrpc::InfluxRpcPlanner, group_by::{Aggregate, WindowDuration}, predicate::{Predicate, PredicateBuilder}, @@ -40,7 +41,7 @@ macro_rules! run_read_window_aggregate_test_case { for plan in plans.into_iter() { let batches = db .executor() - .run_logical_plan(plan.plan) + .run_logical_plan(plan.plan, ExecutorType::Query) .await .expect("ok running plan"); diff --git a/query_tests/src/pruning.rs b/query_tests/src/pruning.rs index eb48cc9f6b..aae58e4618 100644 --- a/query_tests/src/pruning.rs +++ b/query_tests/src/pruning.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use arrow_util::assert_batches_sorted_eq; use datafusion::logical_plan::{col, lit}; use query::{ - exec::stringset::StringSet, + exec::{stringset::StringSet, ExecutorType}, frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner}, predicate::PredicateBuilder, QueryChunk, @@ -74,7 +74,10 @@ async fn chunk_pruning_sql() { let physical_plan = SqlQueryPlanner::default() .query(db, query, &executor) .unwrap(); - let batches = executor.collect(physical_plan).await.unwrap(); + let batches = executor + .collect(physical_plan, ExecutorType::Query) + .await + .unwrap(); assert_batches_sorted_eq!(&expected, &batches); diff --git a/query_tests/src/runner.rs b/query_tests/src/runner.rs index c0c55945f8..0e18bc27b2 100644 --- a/query_tests/src/runner.rs +++ b/query_tests/src/runner.rs @@ -4,7 +4,7 @@ mod parse; mod setup; use arrow::record_batch::RecordBatch; -use query::frontend::sql::SqlQueryPlanner; +use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner}; use snafu::{ResultExt, Snafu}; use std::{ io::BufWriter, @@ -250,8 +250,10 @@ impl Runner { .query(db, &sql, executor.as_ref()) .expect("built plan successfully"); - let results: Vec = - executor.collect(physical_plan).await.expect("Running plan"); + let results: Vec = executor + .collect(physical_plan, ExecutorType::Query) + .await + .expect("Running plan"); let current_results = arrow::util::pretty::pretty_format_batches(&results) .unwrap() diff --git a/query_tests/src/sql.rs b/query_tests/src/sql.rs index 80304351b5..332653adb0 100644 --- a/query_tests/src/sql.rs +++ b/query_tests/src/sql.rs @@ -6,7 +6,7 @@ use super::scenarios::*; use arrow::record_batch::RecordBatch; use arrow_util::assert_batches_sorted_eq; -use query::frontend::sql::SqlQueryPlanner; +use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner}; use std::sync::Arc; /// runs table_names(predicate) and compares it to the expected @@ -30,8 +30,10 @@ macro_rules! run_sql_test_case { .query(db, &sql, executor.as_ref()) .expect("built plan successfully"); - let results: Vec = - executor.collect(physical_plan).await.expect("Running plan"); + let results: Vec = executor + .collect(physical_plan, ExecutorType::Query) + .await + .expect("Running plan"); assert_batches_sorted_eq!($EXPECTED_LINES, &results); } diff --git a/server/src/db.rs b/server/src/db.rs index 311835acbd..c0ce21d685 100644 --- a/server/src/db.rs +++ b/server/src/db.rs @@ -858,7 +858,7 @@ mod tests { metadata::IoxParquetMetaData, test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data}, }; - use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase}; + use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase}; use crate::{ db::{ @@ -2240,7 +2240,10 @@ mod tests { let physical_plan = planner.query(db, query, &executor).unwrap(); - executor.collect(physical_plan).await.unwrap() + executor + .collect(physical_plan, ExecutorType::Query) + .await + .unwrap() } fn mutable_chunk_ids(db: &Db, partition_key: &str) -> Vec { diff --git a/server/src/db/lifecycle/compact.rs b/server/src/db/lifecycle/compact.rs index cc379beb2b..06e0b5fb4d 100644 --- a/server/src/db/lifecycle/compact.rs +++ b/server/src/db/lifecycle/compact.rs @@ -7,6 +7,7 @@ use futures::StreamExt; use data_types::job::Job; use lifecycle::LifecycleWriteGuard; +use query::exec::ExecutorType; use query::frontend::reorg::ReorgPlanner; use query::QueryChunkMeta; use read_buffer::{ChunkMetrics, RBChunk}; @@ -67,7 +68,7 @@ pub(crate) fn compact_chunks( ChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()), ); - let ctx = db.exec.new_context(); + let ctx = db.exec.new_context(ExecutorType::Reorg); let fut = async move { let key = compute_sort_key(query_chunks.iter().map(|x| x.summary())); diff --git a/server/src/db/lifecycle/move_chunk.rs b/server/src/db/lifecycle/move_chunk.rs index 345d0a2e8b..4101779fc4 100644 --- a/server/src/db/lifecycle/move_chunk.rs +++ b/server/src/db/lifecycle/move_chunk.rs @@ -5,7 +5,7 @@ use data_types::job::Job; use futures::StreamExt; use observability_deps::tracing::{debug, info}; -use query::{frontend::reorg::ReorgPlanner, QueryChunkMeta}; +use query::{exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta}; use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk}; use std::{future::Future, sync::Arc}; use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt}; @@ -53,7 +53,7 @@ pub fn move_chunk_to_read_buffer( ReadBufferChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()), ); - let ctx = db.exec.new_context(); + let ctx = db.exec.new_context(ExecutorType::Reorg); let fut = async move { info!(chunk=%addr, "chunk marked MOVING, loading tables into read buffer"); diff --git a/server/src/lib.rs b/server/src/lib.rs index 243c28cd54..3cf166115a 100644 --- a/server/src/lib.rs +++ b/server/src/lib.rs @@ -1110,6 +1110,7 @@ mod tests { time::{Duration, Instant}, }; + use arrow::record_batch::RecordBatch; use async_trait::async_trait; use bytes::Bytes; use futures::TryStreamExt; @@ -1126,7 +1127,7 @@ mod tests { use influxdb_line_protocol::parse_lines; use metrics::MetricRegistry; use object_store::{memory::InMemory, path::ObjectStorePath}; - use query::{frontend::sql::SqlQueryPlanner, QueryDatabase}; + use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryDatabase}; use super::*; use std::sync::atomic::{AtomicBool, Ordering}; @@ -1398,14 +1399,8 @@ mod tests { let db_name = DatabaseName::new("foo").unwrap(); let db = server.db(&db_name).unwrap(); + let batches = run_query(db, "select * from cpu").await; - let planner = SqlQueryPlanner::default(); - let executor = server.executor(); - let physical_plan = planner - .query(db, "select * from cpu", executor.as_ref()) - .unwrap(); - - let batches = executor.collect(physical_plan).await.unwrap(); let expected = vec![ "+-----+-------------------------------+", "| bar | time |", @@ -1449,13 +1444,7 @@ mod tests { .await .expect("write entry"); - let planner = SqlQueryPlanner::default(); - let executor = server.executor(); - let physical_plan = planner - .query(db, "select * from cpu", executor.as_ref()) - .unwrap(); - - let batches = executor.collect(physical_plan).await.unwrap(); + let batches = run_query(db, "select * from cpu").await; let expected = vec![ "+-----+-------------------------------+", "| bar | time |", @@ -2176,4 +2165,17 @@ mod tests { let err = create_simple_database(&server, db_name).await.unwrap_err(); assert!(matches!(err, Error::CannotCreatePreservedCatalog { .. })); } + + // run a sql query against the database, returning the results as record batches + async fn run_query(db: Arc, query: &str) -> Vec { + let planner = SqlQueryPlanner::default(); + let executor = db.executor(); + + let physical_plan = planner.query(db, query, &executor).unwrap(); + + executor + .collect(physical_plan, ExecutorType::Query) + .await + .unwrap() + } } diff --git a/src/commands/run.rs b/src/commands/run.rs index 56c4303623..cab9cadb4d 100644 --- a/src/commands/run.rs +++ b/src/commands/run.rs @@ -81,11 +81,12 @@ pub struct Config { #[structopt(long = "--data-dir", env = "INFLUXDB_IOX_DB_DIR")] pub database_directory: Option, - /// The number of threads to use for the query worker pool. + /// The number of threads to use for all worker pools. /// - /// IOx uses `--num-threads` threads for handling API requests and - /// will use a dedicated thread pool with `--num-worker-threads` - /// for running queries. + /// IOx uses a pool with `--num-threads` threads *each* for + /// 1. Handling API requests + /// 2. Running queries. + /// 3. Reorganizing data (e.g. compacting chunks) /// /// If not specified, defaults to the number of cores on the system #[structopt(long = "--num-worker-threads", env = "INFLUXDB_IOX_NUM_WORKER_THREADS")] diff --git a/src/influxdb_ioxd/http.rs b/src/influxdb_ioxd/http.rs index 8dee007307..f8dd2dd0a3 100644 --- a/src/influxdb_ioxd/http.rs +++ b/src/influxdb_ioxd/http.rs @@ -18,7 +18,7 @@ use data_types::{ }; use influxdb_iox_client::format::QueryOutputFormat; use influxdb_line_protocol::parse_lines; -use query::QueryDatabase; +use query::{exec::ExecutorType, QueryDatabase}; use server::{ConnectionManager, Server as AppServer}; // External crates @@ -631,7 +631,7 @@ async fn query( // TODO: stream read results out rather than rendering the // whole thing in mem let batches = executor - .collect(physical_plan) + .collect(physical_plan, ExecutorType::Query) .await .map_err(|e| Box::new(e) as _) .context(Query { db_name })?; @@ -1340,6 +1340,9 @@ mod tests { .await .unwrap(); - executor.collect(physical_plan).await.unwrap() + executor + .collect(physical_plan, ExecutorType::Query) + .await + .unwrap() } } diff --git a/src/influxdb_ioxd/planner.rs b/src/influxdb_ioxd/planner.rs index f595f26ebc..6f93ef6f8e 100644 --- a/src/influxdb_ioxd/planner.rs +++ b/src/influxdb_ioxd/planner.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use datafusion::{catalog::catalog::CatalogProvider, physical_plan::ExecutionPlan}; use query::{ - exec::Executor, + exec::{Executor, ExecutorType}, frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner}, group_by::{Aggregate, WindowDuration}, plan::{fieldlist::FieldListPlan, seriesset::SeriesSetPlans, stringset::StringSetPlan}, @@ -61,11 +61,14 @@ impl Planner { let query = query.into(); self.exec - .run(async move { - planner - .query(database, &query, q_executor.as_ref()) - .context(Sql { query }) - }) + .run( + async move { + planner + .query(database, &query, q_executor.as_ref()) + .context(Sql { query }) + }, + ExecutorType::Query, + ) .await .context(InternalExecutionWhilePlanning)? } @@ -83,11 +86,14 @@ impl Planner { let planner = InfluxRpcPlanner::new(); self.exec - .run(async move { - planner - .table_names(database.as_ref(), predicate) - .context(InfluxRpc) - }) + .run( + async move { + planner + .table_names(database.as_ref(), predicate) + .context(InfluxRpc) + }, + ExecutorType::Query, + ) .await .context(InternalExecutionWhilePlanning)? } @@ -101,11 +107,14 @@ impl Planner { let planner = InfluxRpcPlanner::new(); self.exec - .run(async move { - planner - .tag_keys(database.as_ref(), predicate) - .context(InfluxRpc) - }) + .run( + async move { + planner + .tag_keys(database.as_ref(), predicate) + .context(InfluxRpc) + }, + ExecutorType::Query, + ) .await .context(InternalExecutionWhilePlanning)? } @@ -125,11 +134,14 @@ impl Planner { let planner = InfluxRpcPlanner::new(); self.exec - .run(async move { - planner - .tag_values(database.as_ref(), &tag_name, predicate) - .context(InfluxRpc) - }) + .run( + async move { + planner + .tag_values(database.as_ref(), &tag_name, predicate) + .context(InfluxRpc) + }, + ExecutorType::Query, + ) .await .context(InternalExecutionWhilePlanning)? } @@ -147,11 +159,14 @@ impl Planner { let planner = InfluxRpcPlanner::new(); self.exec - .run(async move { - planner - .field_columns(database.as_ref(), predicate) - .context(InfluxRpc) - }) + .run( + async move { + planner + .field_columns(database.as_ref(), predicate) + .context(InfluxRpc) + }, + ExecutorType::Query, + ) .await .context(InternalExecutionWhilePlanning)? } @@ -169,11 +184,14 @@ impl Planner { let planner = InfluxRpcPlanner::new(); self.exec - .run(async move { - planner - .read_filter(database.as_ref(), predicate) - .context(InfluxRpc) - }) + .run( + async move { + planner + .read_filter(database.as_ref(), predicate) + .context(InfluxRpc) + }, + ExecutorType::Query, + ) .await .context(InternalExecutionWhilePlanning)? } @@ -193,11 +211,14 @@ impl Planner { let planner = InfluxRpcPlanner::new(); self.exec - .run(async move { - planner - .read_group(database.as_ref(), predicate, agg, &group_columns) - .context(InfluxRpc) - }) + .run( + async move { + planner + .read_group(database.as_ref(), predicate, agg, &group_columns) + .context(InfluxRpc) + }, + ExecutorType::Query, + ) .await .context(InternalExecutionWhilePlanning)? } @@ -218,11 +239,14 @@ impl Planner { let planner = InfluxRpcPlanner::new(); self.exec - .run(async move { - planner - .read_window_aggregate(database.as_ref(), predicate, agg, every, offset) - .context(InfluxRpc) - }) + .run( + async move { + planner + .read_window_aggregate(database.as_ref(), predicate, agg, every, offset) + .context(InfluxRpc) + }, + ExecutorType::Query, + ) .await .context(InternalExecutionWhilePlanning)? } diff --git a/src/influxdb_ioxd/rpc/flight.rs b/src/influxdb_ioxd/rpc/flight.rs index 589eb6904b..f5ebcf9d0c 100644 --- a/src/influxdb_ioxd/rpc/flight.rs +++ b/src/influxdb_ioxd/rpc/flight.rs @@ -3,6 +3,7 @@ use std::{pin::Pin, sync::Arc}; use futures::Stream; use observability_deps::tracing::{info, warn}; +use query::exec::ExecutorType; use serde::Deserialize; use snafu::{OptionExt, ResultExt, Snafu}; use tonic::{Interceptor, Request, Response, Streaming}; @@ -181,8 +182,7 @@ where // execute the query let results = executor - .new_context() - .collect(Arc::clone(&physical_plan)) + .collect(Arc::clone(&physical_plan), ExecutorType::Query) .await .map_err(|e| Box::new(e) as _) .context(Query {