feat: Use separate executor for queries and compactions/moves (#1870)
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>pull/24376/head
parent
f1a100c6ae
commit
56c8c8d428
|
@ -100,34 +100,56 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
|||
|
||||
/// Handles executing DataFusion plans, and marshalling the results into rust
|
||||
/// native structures.
|
||||
///
|
||||
/// TODO: Have a resource manager that would limit how many plans are
|
||||
/// running, based on a policy
|
||||
#[derive(Debug)]
|
||||
pub struct Executor {
|
||||
exec: DedicatedExecutor,
|
||||
/// Executor for running user queries
|
||||
query_exec: DedicatedExecutor,
|
||||
|
||||
/// Executor for running system/reorganization tasks such as
|
||||
/// compact
|
||||
reorg_exec: DedicatedExecutor,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum ExecutorType {
|
||||
/// Run using the pool for queries
|
||||
Query,
|
||||
/// Run using the pool for system / reorganization tasks
|
||||
Reorg,
|
||||
}
|
||||
|
||||
impl Executor {
|
||||
/// Creates a new executor with a single dedicated thread pool with
|
||||
/// num_threads
|
||||
/// Creates a new executor with a two dedicated thread pools, each
|
||||
/// with num_threads
|
||||
pub fn new(num_threads: usize) -> Self {
|
||||
let exec = DedicatedExecutor::new("IOx Executor Thread", num_threads);
|
||||
let query_exec = DedicatedExecutor::new("IOx Query Executor Thread", num_threads);
|
||||
let reorg_exec = DedicatedExecutor::new("IOx Reorg Executor Thread", num_threads);
|
||||
|
||||
Self { exec }
|
||||
Self {
|
||||
query_exec,
|
||||
reorg_exec,
|
||||
}
|
||||
}
|
||||
|
||||
/// Executes this plan and returns the resulting set of strings
|
||||
/// Executes this plan on the query pool, and returns the
|
||||
/// resulting set of strings
|
||||
pub async fn to_string_set(&self, plan: StringSetPlan) -> Result<StringSetRef> {
|
||||
match plan {
|
||||
StringSetPlan::Known(ss) => Ok(ss),
|
||||
StringSetPlan::Plan(plans) => self
|
||||
.run_logical_plans(plans)
|
||||
.run_logical_plans(plans, ExecutorType::Query)
|
||||
.await?
|
||||
.into_stringset()
|
||||
.context(StringSetConversion),
|
||||
}
|
||||
}
|
||||
|
||||
/// Executes the embedded plans, each as separate tasks combining the results
|
||||
/// into the returned collection of items.
|
||||
/// Executes the SeriesSetPlans on the query exectutor, in
|
||||
/// parallel, combining the results into the returned collection
|
||||
/// of items.
|
||||
///
|
||||
/// The SeriesSets are guaranteed to come back ordered by table_name.
|
||||
pub async fn to_series_set(
|
||||
|
@ -147,47 +169,50 @@ impl Executor {
|
|||
let handles = plans
|
||||
.into_iter()
|
||||
.map(|plan| {
|
||||
// TODO run these on some executor other than the main tokio pool (maybe?)
|
||||
let ctx = self.new_context();
|
||||
let executor_type = ExecutorType::Query;
|
||||
let ctx = self.new_context(executor_type);
|
||||
|
||||
self.exec.spawn(async move {
|
||||
let SeriesSetPlan {
|
||||
table_name,
|
||||
plan,
|
||||
tag_columns,
|
||||
field_columns,
|
||||
num_prefix_tag_group_columns,
|
||||
} = plan;
|
||||
|
||||
let tag_columns = Arc::new(tag_columns);
|
||||
|
||||
let physical_plan = ctx
|
||||
.prepare_plan(&plan)
|
||||
.context(DataFusionPhysicalPlanning)?;
|
||||
|
||||
let it = ctx
|
||||
.execute(physical_plan)
|
||||
.await
|
||||
.context(SeriesSetExecution)?;
|
||||
|
||||
SeriesSetConverter::default()
|
||||
.convert(
|
||||
self.run(
|
||||
async move {
|
||||
let SeriesSetPlan {
|
||||
table_name,
|
||||
plan,
|
||||
tag_columns,
|
||||
field_columns,
|
||||
num_prefix_tag_group_columns,
|
||||
it,
|
||||
)
|
||||
.await
|
||||
.context(SeriesSetConversion)
|
||||
})
|
||||
} = plan;
|
||||
|
||||
let tag_columns = Arc::new(tag_columns);
|
||||
|
||||
let physical_plan = ctx
|
||||
.prepare_plan(&plan)
|
||||
.context(DataFusionPhysicalPlanning)?;
|
||||
|
||||
let it = ctx
|
||||
.execute(physical_plan)
|
||||
.await
|
||||
.context(SeriesSetExecution)?;
|
||||
|
||||
SeriesSetConverter::default()
|
||||
.convert(
|
||||
table_name,
|
||||
tag_columns,
|
||||
field_columns,
|
||||
num_prefix_tag_group_columns,
|
||||
it,
|
||||
)
|
||||
.await
|
||||
.context(SeriesSetConversion)
|
||||
},
|
||||
executor_type,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// join_all ensures that the results are consumed in the same order they
|
||||
// were spawned maintaining the guarantee to return results ordered
|
||||
// by the plan sort order.
|
||||
let handles = future::try_join_all(handles).await.context(TaskJoinError)?;
|
||||
let handles = future::try_join_all(handles).await?;
|
||||
let mut results = vec![];
|
||||
for handle in handles {
|
||||
results.extend(handle?.into_iter());
|
||||
|
@ -196,7 +221,7 @@ impl Executor {
|
|||
Ok(results)
|
||||
}
|
||||
|
||||
/// Executes `plan` and return the resulting FieldList
|
||||
/// Executes `plan` and return the resulting FieldList on the query executor
|
||||
pub async fn to_field_list(&self, plan: FieldListPlan) -> Result<FieldList> {
|
||||
let FieldListPlan { plans } = plan;
|
||||
|
||||
|
@ -204,29 +229,33 @@ impl Executor {
|
|||
let handles = plans
|
||||
.into_iter()
|
||||
.map(|plan| {
|
||||
let ctx = self.new_context();
|
||||
self.exec.spawn(async move {
|
||||
let physical_plan = ctx
|
||||
.prepare_plan(&plan)
|
||||
.context(DataFusionPhysicalPlanning)?;
|
||||
let executor_type = ExecutorType::Query;
|
||||
let ctx = self.new_context(executor_type);
|
||||
self.run(
|
||||
async move {
|
||||
let physical_plan = ctx
|
||||
.prepare_plan(&plan)
|
||||
.context(DataFusionPhysicalPlanning)?;
|
||||
|
||||
// TODO: avoid this buffering
|
||||
let fieldlist = ctx
|
||||
.collect(physical_plan)
|
||||
.await
|
||||
.context(FieldListExectuor)?
|
||||
.into_fieldlist()
|
||||
.context(FieldListConversion);
|
||||
// TODO: avoid this buffering
|
||||
let fieldlist = ctx
|
||||
.collect(physical_plan)
|
||||
.await
|
||||
.context(FieldListExectuor)?
|
||||
.into_fieldlist()
|
||||
.context(FieldListConversion);
|
||||
|
||||
Ok(fieldlist)
|
||||
})
|
||||
Ok(fieldlist)
|
||||
},
|
||||
executor_type,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// collect them all up and combine them
|
||||
let mut results = Vec::new();
|
||||
for join_handle in handles {
|
||||
let fieldlist = join_handle.await.context(TaskJoinError)???;
|
||||
let fieldlist = join_handle.await???;
|
||||
|
||||
results.push(fieldlist);
|
||||
}
|
||||
|
@ -235,64 +264,88 @@ impl Executor {
|
|||
}
|
||||
|
||||
/// Run the plan and return a record batch reader for reading the results
|
||||
pub async fn run_logical_plan(&self, plan: LogicalPlan) -> Result<Vec<RecordBatch>> {
|
||||
self.run_logical_plans(vec![plan]).await
|
||||
pub async fn run_logical_plan(
|
||||
&self,
|
||||
plan: LogicalPlan,
|
||||
executor_type: ExecutorType,
|
||||
) -> Result<Vec<RecordBatch>> {
|
||||
self.run_logical_plans(vec![plan], executor_type).await
|
||||
}
|
||||
|
||||
/// Executes the logical plan using DataFusion on a separate
|
||||
/// thread pool and produces RecordBatches
|
||||
pub async fn collect(&self, physical_plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
|
||||
self.new_context()
|
||||
pub async fn collect(
|
||||
&self,
|
||||
physical_plan: Arc<dyn ExecutionPlan>,
|
||||
executor_type: ExecutorType,
|
||||
) -> Result<Vec<RecordBatch>> {
|
||||
self.new_context(executor_type)
|
||||
.collect(physical_plan)
|
||||
.await
|
||||
.context(DataFusionExecution)
|
||||
}
|
||||
|
||||
/// Create a new execution context, suitable for executing a new query
|
||||
pub fn new_context(&self) -> IOxExecutionContext {
|
||||
IOxExecutionContext::new(self.exec.clone())
|
||||
/// Create a new execution context, suitable for executing a new query or system task
|
||||
pub fn new_context(&self, executor_type: ExecutorType) -> IOxExecutionContext {
|
||||
let executor = self.executor(executor_type).clone();
|
||||
|
||||
IOxExecutionContext::new(executor)
|
||||
}
|
||||
|
||||
/// Return the execution pool of the specified type
|
||||
fn executor(&self, executor_type: ExecutorType) -> &DedicatedExecutor {
|
||||
match executor_type {
|
||||
ExecutorType::Query => &self.query_exec,
|
||||
ExecutorType::Reorg => &self.reorg_exec,
|
||||
}
|
||||
}
|
||||
|
||||
/// plans and runs the plans in parallel and collects the results
|
||||
/// run each plan in parallel and collect the results
|
||||
async fn run_logical_plans(&self, plans: Vec<LogicalPlan>) -> Result<Vec<RecordBatch>> {
|
||||
async fn run_logical_plans(
|
||||
&self,
|
||||
plans: Vec<LogicalPlan>,
|
||||
executor_type: ExecutorType,
|
||||
) -> Result<Vec<RecordBatch>> {
|
||||
let value_futures = plans
|
||||
.into_iter()
|
||||
.map(|plan| {
|
||||
let ctx = self.new_context();
|
||||
let ctx = self.new_context(executor_type);
|
||||
|
||||
self.exec.spawn(async move {
|
||||
let physical_plan = ctx
|
||||
.prepare_plan(&plan)
|
||||
.context(DataFusionPhysicalPlanning)?;
|
||||
self.run(
|
||||
async move {
|
||||
let physical_plan = ctx
|
||||
.prepare_plan(&plan)
|
||||
.context(DataFusionPhysicalPlanning)?;
|
||||
|
||||
// TODO: avoid this buffering
|
||||
ctx.collect(physical_plan)
|
||||
.await
|
||||
.context(DataFusionExecution)
|
||||
})
|
||||
// TODO: avoid this buffering
|
||||
ctx.collect(physical_plan)
|
||||
.await
|
||||
.context(DataFusionExecution)
|
||||
},
|
||||
executor_type,
|
||||
)
|
||||
})
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
// now, wait for all the values to resolve and collect them together
|
||||
let mut results = Vec::new();
|
||||
for join_handle in value_futures {
|
||||
let mut plan_result = join_handle.await.context(TaskJoinError)??;
|
||||
let mut plan_result = join_handle.await??;
|
||||
results.append(&mut plan_result);
|
||||
}
|
||||
Ok(results)
|
||||
}
|
||||
|
||||
/// Runs the specified Future (and any tasks it spawns) on the
|
||||
/// worker pool for this executor, returning the result of the
|
||||
/// computation.
|
||||
pub async fn run<T>(&self, task: T) -> Result<T::Output>
|
||||
/// specified executor, returning the result of the computation.
|
||||
pub async fn run<T>(&self, task: T, executor_type: ExecutorType) -> Result<T::Output>
|
||||
where
|
||||
T: Future + Send + 'static,
|
||||
T::Output: Send + 'static,
|
||||
{
|
||||
// run on the dedicated executor
|
||||
self.exec
|
||||
self.executor(executor_type)
|
||||
.spawn(task)
|
||||
// wait on the *current* tokio executor
|
||||
.await
|
||||
|
|
|
@ -239,7 +239,7 @@ mod test {
|
|||
use internal_types::schema::sort::SortOptions;
|
||||
|
||||
use crate::{
|
||||
exec::Executor,
|
||||
exec::{Executor, ExecutorType},
|
||||
test::{raw_data, TestChunk},
|
||||
};
|
||||
|
||||
|
@ -318,7 +318,10 @@ mod test {
|
|||
.expect("created compact plan");
|
||||
|
||||
let executor = Executor::new(1);
|
||||
let physical_plan = executor.new_context().prepare_plan(&compact_plan).unwrap();
|
||||
let physical_plan = executor
|
||||
.new_context(ExecutorType::Reorg)
|
||||
.prepare_plan(&compact_plan)
|
||||
.unwrap();
|
||||
assert_eq!(
|
||||
physical_plan.output_partitioning().partition_count(),
|
||||
1,
|
||||
|
@ -368,7 +371,10 @@ mod test {
|
|||
.expect("created compact plan");
|
||||
|
||||
let executor = Executor::new(1);
|
||||
let physical_plan = executor.new_context().prepare_plan(&split_plan).unwrap();
|
||||
let physical_plan = executor
|
||||
.new_context(ExecutorType::Reorg)
|
||||
.prepare_plan(&split_plan)
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
physical_plan.output_partitioning().partition_count(),
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::sync::Arc;
|
|||
|
||||
use snafu::{ResultExt, Snafu};
|
||||
|
||||
use crate::exec::{context::DEFAULT_CATALOG, Executor};
|
||||
use crate::exec::{context::DEFAULT_CATALOG, Executor, ExecutorType};
|
||||
use datafusion::{
|
||||
catalog::catalog::CatalogProvider, error::DataFusionError, physical_plan::ExecutionPlan,
|
||||
};
|
||||
|
@ -76,15 +76,17 @@ impl SqlQueryPlanner {
|
|||
}
|
||||
|
||||
/// Plan a SQL query against the data in `database`, and return a
|
||||
/// DataFusion physical execution plan. The plan can then be
|
||||
/// executed using `executor` in a streaming fashion.
|
||||
/// DataFusion physical execution plan that runs on the query executor.
|
||||
///
|
||||
/// When the plan is executed, it will run on the query executor
|
||||
/// in a streaming fashion.
|
||||
pub fn query<D: CatalogProvider + 'static>(
|
||||
&self,
|
||||
database: Arc<D>,
|
||||
query: &str,
|
||||
executor: &Executor,
|
||||
) -> Result<Arc<dyn ExecutionPlan>> {
|
||||
let mut ctx = executor.new_context();
|
||||
let mut ctx = executor.new_context(ExecutorType::Query);
|
||||
ctx.inner_mut().register_catalog(DEFAULT_CATALOG, database);
|
||||
ctx.prepare_sql(query).context(Preparing)
|
||||
}
|
||||
|
|
|
@ -2,7 +2,10 @@ use arrow::datatypes::DataType;
|
|||
use arrow_util::assert_batches_eq;
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
use query::{
|
||||
exec::fieldlist::{Field, FieldList},
|
||||
exec::{
|
||||
fieldlist::{Field, FieldList},
|
||||
ExecutorType,
|
||||
},
|
||||
frontend::influxrpc::InfluxRpcPlanner,
|
||||
predicate::PredicateBuilder,
|
||||
};
|
||||
|
@ -140,7 +143,7 @@ async fn test_field_name_plan() {
|
|||
// expected (specifically that the column ordering is correct)
|
||||
let results = db
|
||||
.executor()
|
||||
.run_logical_plan(plan)
|
||||
.run_logical_plan(plan, ExecutorType::Query)
|
||||
.await
|
||||
.expect("ok running plan");
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ use arrow::util::pretty::pretty_format_batches;
|
|||
use async_trait::async_trait;
|
||||
use datafusion::prelude::*;
|
||||
use query::{
|
||||
exec::ExecutorType,
|
||||
frontend::influxrpc::InfluxRpcPlanner,
|
||||
group_by::Aggregate,
|
||||
predicate::{Predicate, PredicateBuilder},
|
||||
|
@ -46,7 +47,7 @@ macro_rules! run_read_group_test_case {
|
|||
for plan in plans.into_iter() {
|
||||
let batches = db
|
||||
.executor()
|
||||
.run_logical_plan(plan.plan)
|
||||
.run_logical_plan(plan.plan, ExecutorType::Query)
|
||||
.await
|
||||
.expect("ok running plan");
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ use arrow::util::pretty::pretty_format_batches;
|
|||
use async_trait::async_trait;
|
||||
use datafusion::prelude::*;
|
||||
use query::{
|
||||
exec::ExecutorType,
|
||||
frontend::influxrpc::InfluxRpcPlanner,
|
||||
group_by::{Aggregate, WindowDuration},
|
||||
predicate::{Predicate, PredicateBuilder},
|
||||
|
@ -40,7 +41,7 @@ macro_rules! run_read_window_aggregate_test_case {
|
|||
for plan in plans.into_iter() {
|
||||
let batches = db
|
||||
.executor()
|
||||
.run_logical_plan(plan.plan)
|
||||
.run_logical_plan(plan.plan, ExecutorType::Query)
|
||||
.await
|
||||
.expect("ok running plan");
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ use std::sync::Arc;
|
|||
use arrow_util::assert_batches_sorted_eq;
|
||||
use datafusion::logical_plan::{col, lit};
|
||||
use query::{
|
||||
exec::stringset::StringSet,
|
||||
exec::{stringset::StringSet, ExecutorType},
|
||||
frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner},
|
||||
predicate::PredicateBuilder,
|
||||
QueryChunk,
|
||||
|
@ -74,7 +74,10 @@ async fn chunk_pruning_sql() {
|
|||
let physical_plan = SqlQueryPlanner::default()
|
||||
.query(db, query, &executor)
|
||||
.unwrap();
|
||||
let batches = executor.collect(physical_plan).await.unwrap();
|
||||
let batches = executor
|
||||
.collect(physical_plan, ExecutorType::Query)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_batches_sorted_eq!(&expected, &batches);
|
||||
|
||||
|
|
|
@ -4,7 +4,7 @@ mod parse;
|
|||
mod setup;
|
||||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use query::frontend::sql::SqlQueryPlanner;
|
||||
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner};
|
||||
use snafu::{ResultExt, Snafu};
|
||||
use std::{
|
||||
io::BufWriter,
|
||||
|
@ -250,8 +250,10 @@ impl<W: Write> Runner<W> {
|
|||
.query(db, &sql, executor.as_ref())
|
||||
.expect("built plan successfully");
|
||||
|
||||
let results: Vec<RecordBatch> =
|
||||
executor.collect(physical_plan).await.expect("Running plan");
|
||||
let results: Vec<RecordBatch> = executor
|
||||
.collect(physical_plan, ExecutorType::Query)
|
||||
.await
|
||||
.expect("Running plan");
|
||||
|
||||
let current_results = arrow::util::pretty::pretty_format_batches(&results)
|
||||
.unwrap()
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
use super::scenarios::*;
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use arrow_util::assert_batches_sorted_eq;
|
||||
use query::frontend::sql::SqlQueryPlanner;
|
||||
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner};
|
||||
use std::sync::Arc;
|
||||
|
||||
/// runs table_names(predicate) and compares it to the expected
|
||||
|
@ -30,8 +30,10 @@ macro_rules! run_sql_test_case {
|
|||
.query(db, &sql, executor.as_ref())
|
||||
.expect("built plan successfully");
|
||||
|
||||
let results: Vec<RecordBatch> =
|
||||
executor.collect(physical_plan).await.expect("Running plan");
|
||||
let results: Vec<RecordBatch> = executor
|
||||
.collect(physical_plan, ExecutorType::Query)
|
||||
.await
|
||||
.expect("Running plan");
|
||||
|
||||
assert_batches_sorted_eq!($EXPECTED_LINES, &results);
|
||||
}
|
||||
|
|
|
@ -858,7 +858,7 @@ mod tests {
|
|||
metadata::IoxParquetMetaData,
|
||||
test_utils::{load_parquet_from_store_for_path, read_data_from_parquet_data},
|
||||
};
|
||||
use query::{frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase};
|
||||
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryChunk, QueryDatabase};
|
||||
|
||||
use crate::{
|
||||
db::{
|
||||
|
@ -2240,7 +2240,10 @@ mod tests {
|
|||
|
||||
let physical_plan = planner.query(db, query, &executor).unwrap();
|
||||
|
||||
executor.collect(physical_plan).await.unwrap()
|
||||
executor
|
||||
.collect(physical_plan, ExecutorType::Query)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
fn mutable_chunk_ids(db: &Db, partition_key: &str) -> Vec<u32> {
|
||||
|
|
|
@ -7,6 +7,7 @@ use futures::StreamExt;
|
|||
|
||||
use data_types::job::Job;
|
||||
use lifecycle::LifecycleWriteGuard;
|
||||
use query::exec::ExecutorType;
|
||||
use query::frontend::reorg::ReorgPlanner;
|
||||
use query::QueryChunkMeta;
|
||||
use read_buffer::{ChunkMetrics, RBChunk};
|
||||
|
@ -67,7 +68,7 @@ pub(crate) fn compact_chunks(
|
|||
ChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()),
|
||||
);
|
||||
|
||||
let ctx = db.exec.new_context();
|
||||
let ctx = db.exec.new_context(ExecutorType::Reorg);
|
||||
|
||||
let fut = async move {
|
||||
let key = compute_sort_key(query_chunks.iter().map(|x| x.summary()));
|
||||
|
|
|
@ -5,7 +5,7 @@ use data_types::job::Job;
|
|||
use futures::StreamExt;
|
||||
|
||||
use observability_deps::tracing::{debug, info};
|
||||
use query::{frontend::reorg::ReorgPlanner, QueryChunkMeta};
|
||||
use query::{exec::ExecutorType, frontend::reorg::ReorgPlanner, QueryChunkMeta};
|
||||
use read_buffer::{ChunkMetrics as ReadBufferChunkMetrics, RBChunk};
|
||||
use std::{future::Future, sync::Arc};
|
||||
use tracker::{TaskTracker, TrackedFuture, TrackedFutureExt};
|
||||
|
@ -53,7 +53,7 @@ pub fn move_chunk_to_read_buffer(
|
|||
ReadBufferChunkMetrics::new(&metrics, db.catalog.metrics().memory().read_buffer()),
|
||||
);
|
||||
|
||||
let ctx = db.exec.new_context();
|
||||
let ctx = db.exec.new_context(ExecutorType::Reorg);
|
||||
|
||||
let fut = async move {
|
||||
info!(chunk=%addr, "chunk marked MOVING, loading tables into read buffer");
|
||||
|
|
|
@ -1110,6 +1110,7 @@ mod tests {
|
|||
time::{Duration, Instant},
|
||||
};
|
||||
|
||||
use arrow::record_batch::RecordBatch;
|
||||
use async_trait::async_trait;
|
||||
use bytes::Bytes;
|
||||
use futures::TryStreamExt;
|
||||
|
@ -1126,7 +1127,7 @@ mod tests {
|
|||
use influxdb_line_protocol::parse_lines;
|
||||
use metrics::MetricRegistry;
|
||||
use object_store::{memory::InMemory, path::ObjectStorePath};
|
||||
use query::{frontend::sql::SqlQueryPlanner, QueryDatabase};
|
||||
use query::{exec::ExecutorType, frontend::sql::SqlQueryPlanner, QueryDatabase};
|
||||
|
||||
use super::*;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
@ -1398,14 +1399,8 @@ mod tests {
|
|||
|
||||
let db_name = DatabaseName::new("foo").unwrap();
|
||||
let db = server.db(&db_name).unwrap();
|
||||
let batches = run_query(db, "select * from cpu").await;
|
||||
|
||||
let planner = SqlQueryPlanner::default();
|
||||
let executor = server.executor();
|
||||
let physical_plan = planner
|
||||
.query(db, "select * from cpu", executor.as_ref())
|
||||
.unwrap();
|
||||
|
||||
let batches = executor.collect(physical_plan).await.unwrap();
|
||||
let expected = vec![
|
||||
"+-----+-------------------------------+",
|
||||
"| bar | time |",
|
||||
|
@ -1449,13 +1444,7 @@ mod tests {
|
|||
.await
|
||||
.expect("write entry");
|
||||
|
||||
let planner = SqlQueryPlanner::default();
|
||||
let executor = server.executor();
|
||||
let physical_plan = planner
|
||||
.query(db, "select * from cpu", executor.as_ref())
|
||||
.unwrap();
|
||||
|
||||
let batches = executor.collect(physical_plan).await.unwrap();
|
||||
let batches = run_query(db, "select * from cpu").await;
|
||||
let expected = vec![
|
||||
"+-----+-------------------------------+",
|
||||
"| bar | time |",
|
||||
|
@ -2176,4 +2165,17 @@ mod tests {
|
|||
let err = create_simple_database(&server, db_name).await.unwrap_err();
|
||||
assert!(matches!(err, Error::CannotCreatePreservedCatalog { .. }));
|
||||
}
|
||||
|
||||
// run a sql query against the database, returning the results as record batches
|
||||
async fn run_query(db: Arc<Db>, query: &str) -> Vec<RecordBatch> {
|
||||
let planner = SqlQueryPlanner::default();
|
||||
let executor = db.executor();
|
||||
|
||||
let physical_plan = planner.query(db, query, &executor).unwrap();
|
||||
|
||||
executor
|
||||
.collect(physical_plan, ExecutorType::Query)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -81,11 +81,12 @@ pub struct Config {
|
|||
#[structopt(long = "--data-dir", env = "INFLUXDB_IOX_DB_DIR")]
|
||||
pub database_directory: Option<PathBuf>,
|
||||
|
||||
/// The number of threads to use for the query worker pool.
|
||||
/// The number of threads to use for all worker pools.
|
||||
///
|
||||
/// IOx uses `--num-threads` threads for handling API requests and
|
||||
/// will use a dedicated thread pool with `--num-worker-threads`
|
||||
/// for running queries.
|
||||
/// IOx uses a pool with `--num-threads` threads *each* for
|
||||
/// 1. Handling API requests
|
||||
/// 2. Running queries.
|
||||
/// 3. Reorganizing data (e.g. compacting chunks)
|
||||
///
|
||||
/// If not specified, defaults to the number of cores on the system
|
||||
#[structopt(long = "--num-worker-threads", env = "INFLUXDB_IOX_NUM_WORKER_THREADS")]
|
||||
|
|
|
@ -18,7 +18,7 @@ use data_types::{
|
|||
};
|
||||
use influxdb_iox_client::format::QueryOutputFormat;
|
||||
use influxdb_line_protocol::parse_lines;
|
||||
use query::QueryDatabase;
|
||||
use query::{exec::ExecutorType, QueryDatabase};
|
||||
use server::{ConnectionManager, Server as AppServer};
|
||||
|
||||
// External crates
|
||||
|
@ -631,7 +631,7 @@ async fn query<M: ConnectionManager + Send + Sync + Debug + 'static>(
|
|||
// TODO: stream read results out rather than rendering the
|
||||
// whole thing in mem
|
||||
let batches = executor
|
||||
.collect(physical_plan)
|
||||
.collect(physical_plan, ExecutorType::Query)
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(Query { db_name })?;
|
||||
|
@ -1340,6 +1340,9 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
executor.collect(physical_plan).await.unwrap()
|
||||
executor
|
||||
.collect(physical_plan, ExecutorType::Query)
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::sync::Arc;
|
|||
|
||||
use datafusion::{catalog::catalog::CatalogProvider, physical_plan::ExecutionPlan};
|
||||
use query::{
|
||||
exec::Executor,
|
||||
exec::{Executor, ExecutorType},
|
||||
frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner},
|
||||
group_by::{Aggregate, WindowDuration},
|
||||
plan::{fieldlist::FieldListPlan, seriesset::SeriesSetPlans, stringset::StringSetPlan},
|
||||
|
@ -61,11 +61,14 @@ impl Planner {
|
|||
let query = query.into();
|
||||
|
||||
self.exec
|
||||
.run(async move {
|
||||
planner
|
||||
.query(database, &query, q_executor.as_ref())
|
||||
.context(Sql { query })
|
||||
})
|
||||
.run(
|
||||
async move {
|
||||
planner
|
||||
.query(database, &query, q_executor.as_ref())
|
||||
.context(Sql { query })
|
||||
},
|
||||
ExecutorType::Query,
|
||||
)
|
||||
.await
|
||||
.context(InternalExecutionWhilePlanning)?
|
||||
}
|
||||
|
@ -83,11 +86,14 @@ impl Planner {
|
|||
let planner = InfluxRpcPlanner::new();
|
||||
|
||||
self.exec
|
||||
.run(async move {
|
||||
planner
|
||||
.table_names(database.as_ref(), predicate)
|
||||
.context(InfluxRpc)
|
||||
})
|
||||
.run(
|
||||
async move {
|
||||
planner
|
||||
.table_names(database.as_ref(), predicate)
|
||||
.context(InfluxRpc)
|
||||
},
|
||||
ExecutorType::Query,
|
||||
)
|
||||
.await
|
||||
.context(InternalExecutionWhilePlanning)?
|
||||
}
|
||||
|
@ -101,11 +107,14 @@ impl Planner {
|
|||
let planner = InfluxRpcPlanner::new();
|
||||
|
||||
self.exec
|
||||
.run(async move {
|
||||
planner
|
||||
.tag_keys(database.as_ref(), predicate)
|
||||
.context(InfluxRpc)
|
||||
})
|
||||
.run(
|
||||
async move {
|
||||
planner
|
||||
.tag_keys(database.as_ref(), predicate)
|
||||
.context(InfluxRpc)
|
||||
},
|
||||
ExecutorType::Query,
|
||||
)
|
||||
.await
|
||||
.context(InternalExecutionWhilePlanning)?
|
||||
}
|
||||
|
@ -125,11 +134,14 @@ impl Planner {
|
|||
let planner = InfluxRpcPlanner::new();
|
||||
|
||||
self.exec
|
||||
.run(async move {
|
||||
planner
|
||||
.tag_values(database.as_ref(), &tag_name, predicate)
|
||||
.context(InfluxRpc)
|
||||
})
|
||||
.run(
|
||||
async move {
|
||||
planner
|
||||
.tag_values(database.as_ref(), &tag_name, predicate)
|
||||
.context(InfluxRpc)
|
||||
},
|
||||
ExecutorType::Query,
|
||||
)
|
||||
.await
|
||||
.context(InternalExecutionWhilePlanning)?
|
||||
}
|
||||
|
@ -147,11 +159,14 @@ impl Planner {
|
|||
let planner = InfluxRpcPlanner::new();
|
||||
|
||||
self.exec
|
||||
.run(async move {
|
||||
planner
|
||||
.field_columns(database.as_ref(), predicate)
|
||||
.context(InfluxRpc)
|
||||
})
|
||||
.run(
|
||||
async move {
|
||||
planner
|
||||
.field_columns(database.as_ref(), predicate)
|
||||
.context(InfluxRpc)
|
||||
},
|
||||
ExecutorType::Query,
|
||||
)
|
||||
.await
|
||||
.context(InternalExecutionWhilePlanning)?
|
||||
}
|
||||
|
@ -169,11 +184,14 @@ impl Planner {
|
|||
let planner = InfluxRpcPlanner::new();
|
||||
|
||||
self.exec
|
||||
.run(async move {
|
||||
planner
|
||||
.read_filter(database.as_ref(), predicate)
|
||||
.context(InfluxRpc)
|
||||
})
|
||||
.run(
|
||||
async move {
|
||||
planner
|
||||
.read_filter(database.as_ref(), predicate)
|
||||
.context(InfluxRpc)
|
||||
},
|
||||
ExecutorType::Query,
|
||||
)
|
||||
.await
|
||||
.context(InternalExecutionWhilePlanning)?
|
||||
}
|
||||
|
@ -193,11 +211,14 @@ impl Planner {
|
|||
let planner = InfluxRpcPlanner::new();
|
||||
|
||||
self.exec
|
||||
.run(async move {
|
||||
planner
|
||||
.read_group(database.as_ref(), predicate, agg, &group_columns)
|
||||
.context(InfluxRpc)
|
||||
})
|
||||
.run(
|
||||
async move {
|
||||
planner
|
||||
.read_group(database.as_ref(), predicate, agg, &group_columns)
|
||||
.context(InfluxRpc)
|
||||
},
|
||||
ExecutorType::Query,
|
||||
)
|
||||
.await
|
||||
.context(InternalExecutionWhilePlanning)?
|
||||
}
|
||||
|
@ -218,11 +239,14 @@ impl Planner {
|
|||
let planner = InfluxRpcPlanner::new();
|
||||
|
||||
self.exec
|
||||
.run(async move {
|
||||
planner
|
||||
.read_window_aggregate(database.as_ref(), predicate, agg, every, offset)
|
||||
.context(InfluxRpc)
|
||||
})
|
||||
.run(
|
||||
async move {
|
||||
planner
|
||||
.read_window_aggregate(database.as_ref(), predicate, agg, every, offset)
|
||||
.context(InfluxRpc)
|
||||
},
|
||||
ExecutorType::Query,
|
||||
)
|
||||
.await
|
||||
.context(InternalExecutionWhilePlanning)?
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ use std::{pin::Pin, sync::Arc};
|
|||
|
||||
use futures::Stream;
|
||||
use observability_deps::tracing::{info, warn};
|
||||
use query::exec::ExecutorType;
|
||||
use serde::Deserialize;
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
use tonic::{Interceptor, Request, Response, Streaming};
|
||||
|
@ -181,8 +182,7 @@ where
|
|||
|
||||
// execute the query
|
||||
let results = executor
|
||||
.new_context()
|
||||
.collect(Arc::clone(&physical_plan))
|
||||
.collect(Arc::clone(&physical_plan), ExecutorType::Query)
|
||||
.await
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(Query {
|
||||
|
|
Loading…
Reference in New Issue