Merge branch 'main' into crepererum/rework_db_init_state_machine
commit
84f2391edd
|
@ -2,7 +2,6 @@
|
||||||
//! plans. This is currently implemented using DataFusion, and this
|
//! plans. This is currently implemented using DataFusion, and this
|
||||||
//! interface abstracts away many of the details
|
//! interface abstracts away many of the details
|
||||||
pub(crate) mod context;
|
pub(crate) mod context;
|
||||||
mod counters;
|
|
||||||
pub mod field;
|
pub mod field;
|
||||||
pub mod fieldlist;
|
pub mod fieldlist;
|
||||||
mod schema_pivot;
|
mod schema_pivot;
|
||||||
|
@ -17,7 +16,6 @@ use futures::{future, Future};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use arrow::record_batch::RecordBatch;
|
use arrow::record_batch::RecordBatch;
|
||||||
use counters::ExecutionCounters;
|
|
||||||
use datafusion::{
|
use datafusion::{
|
||||||
self,
|
self,
|
||||||
logical_plan::{Expr, LogicalPlan},
|
logical_plan::{Expr, LogicalPlan},
|
||||||
|
@ -104,7 +102,6 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
|
||||||
/// native structures.
|
/// native structures.
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Executor {
|
pub struct Executor {
|
||||||
counters: Arc<ExecutionCounters>,
|
|
||||||
exec: DedicatedExecutor,
|
exec: DedicatedExecutor,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,10 +111,7 @@ impl Executor {
|
||||||
pub fn new(num_threads: usize) -> Self {
|
pub fn new(num_threads: usize) -> Self {
|
||||||
let exec = DedicatedExecutor::new("IOx Executor Thread", num_threads);
|
let exec = DedicatedExecutor::new("IOx Executor Thread", num_threads);
|
||||||
|
|
||||||
Self {
|
Self { exec }
|
||||||
exec,
|
|
||||||
counters: Arc::new(ExecutionCounters::default()),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Executes this plan and returns the resulting set of strings
|
/// Executes this plan and returns the resulting set of strings
|
||||||
|
@ -256,7 +250,7 @@ impl Executor {
|
||||||
|
|
||||||
/// Create a new execution context, suitable for executing a new query
|
/// Create a new execution context, suitable for executing a new query
|
||||||
pub fn new_context(&self) -> IOxExecutionContext {
|
pub fn new_context(&self) -> IOxExecutionContext {
|
||||||
IOxExecutionContext::new(self.exec.clone(), Arc::clone(&self.counters))
|
IOxExecutionContext::new(self.exec.clone())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// plans and runs the plans in parallel and collects the results
|
/// plans and runs the plans in parallel and collects the results
|
||||||
|
|
|
@ -26,7 +26,8 @@ use observability_deps::tracing::{debug, trace};
|
||||||
// Reuse DataFusion error and Result types for this module
|
// Reuse DataFusion error and Result types for this module
|
||||||
pub use datafusion::error::{DataFusionError as Error, Result};
|
pub use datafusion::error::{DataFusionError as Error, Result};
|
||||||
|
|
||||||
use super::{counters::ExecutionCounters, split::StreamSplitNode, task::DedicatedExecutor};
|
use super::split::StreamSplitNode;
|
||||||
|
use super::task::DedicatedExecutor;
|
||||||
|
|
||||||
// The default catalog name - this impacts what SQL queries use if not specified
|
// The default catalog name - this impacts what SQL queries use if not specified
|
||||||
pub const DEFAULT_CATALOG: &str = "public";
|
pub const DEFAULT_CATALOG: &str = "public";
|
||||||
|
@ -95,8 +96,7 @@ impl ExtensionPlanner for IOxExtensionPlanner {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// This is an execution context for planning in IOx. It wraps a
|
/// This is an execution context for planning in IOx. It wraps a
|
||||||
/// DataFusion execution context and incudes statistical counters and
|
/// DataFusion execution context with the information needed for planning.
|
||||||
/// a dedicated thread pool.
|
|
||||||
///
|
///
|
||||||
/// Methods on this struct should be preferred to using the raw
|
/// Methods on this struct should be preferred to using the raw
|
||||||
/// DataFusion functions (such as `collect`) directly.
|
/// DataFusion functions (such as `collect`) directly.
|
||||||
|
@ -105,7 +105,6 @@ impl ExtensionPlanner for IOxExtensionPlanner {
|
||||||
/// types such as Memory and providing visibility into what plans are
|
/// types such as Memory and providing visibility into what plans are
|
||||||
/// running
|
/// running
|
||||||
pub struct IOxExecutionContext {
|
pub struct IOxExecutionContext {
|
||||||
counters: Arc<ExecutionCounters>,
|
|
||||||
inner: ExecutionContext,
|
inner: ExecutionContext,
|
||||||
|
|
||||||
/// Dedicated executor for query execution.
|
/// Dedicated executor for query execution.
|
||||||
|
@ -120,7 +119,6 @@ pub struct IOxExecutionContext {
|
||||||
impl fmt::Debug for IOxExecutionContext {
|
impl fmt::Debug for IOxExecutionContext {
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
f.debug_struct("IOxExecutionContext")
|
f.debug_struct("IOxExecutionContext")
|
||||||
.field("counters", &self.counters)
|
|
||||||
.field("inner", &"<DataFusion ExecutionContext>")
|
.field("inner", &"<DataFusion ExecutionContext>")
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
|
@ -131,7 +129,7 @@ impl IOxExecutionContext {
|
||||||
///
|
///
|
||||||
/// The config is created with a default catalog and schema, but this
|
/// The config is created with a default catalog and schema, but this
|
||||||
/// can be overridden at a later date
|
/// can be overridden at a later date
|
||||||
pub fn new(exec: DedicatedExecutor, counters: Arc<ExecutionCounters>) -> Self {
|
pub fn new(exec: DedicatedExecutor) -> Self {
|
||||||
const BATCH_SIZE: usize = 1000;
|
const BATCH_SIZE: usize = 1000;
|
||||||
|
|
||||||
// TBD: Should we be reusing an execution context across all executions?
|
// TBD: Should we be reusing an execution context across all executions?
|
||||||
|
@ -144,11 +142,7 @@ impl IOxExecutionContext {
|
||||||
|
|
||||||
let inner = ExecutionContext::with_config(config);
|
let inner = ExecutionContext::with_config(config);
|
||||||
|
|
||||||
Self {
|
Self { inner, exec }
|
||||||
counters,
|
|
||||||
inner,
|
|
||||||
exec,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// returns a reference to the inner datafusion execution context
|
/// returns a reference to the inner datafusion execution context
|
||||||
|
@ -185,8 +179,6 @@ impl IOxExecutionContext {
|
||||||
/// Executes the logical plan using DataFusion on a separate
|
/// Executes the logical plan using DataFusion on a separate
|
||||||
/// thread pool and produces RecordBatches
|
/// thread pool and produces RecordBatches
|
||||||
pub async fn collect(&self, physical_plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
|
pub async fn collect(&self, physical_plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
|
||||||
self.counters.inc_plans_run();
|
|
||||||
|
|
||||||
debug!(
|
debug!(
|
||||||
"Running plan, physical:\n{}",
|
"Running plan, physical:\n{}",
|
||||||
displayable(physical_plan.as_ref()).indent()
|
displayable(physical_plan.as_ref()).indent()
|
||||||
|
|
|
@ -1,13 +0,0 @@
|
||||||
use std::{sync::atomic::AtomicU64, sync::atomic::Ordering};
|
|
||||||
|
|
||||||
// Various statistics for execution
|
|
||||||
#[derive(Debug, Default)]
|
|
||||||
pub struct ExecutionCounters {
|
|
||||||
pub plans_run: AtomicU64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ExecutionCounters {
|
|
||||||
pub fn inc_plans_run(&self) {
|
|
||||||
self.plans_run.fetch_add(1, Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue