chore: Update datafusion (#4071)

* chore: update to datafusion 5936edc2a94d5fb20702a41eab2b80695961b9dc

* chore: Update apis to match datafusion changes
pull/24376/head
Andrew Lamb 2022-03-22 09:17:41 -04:00 committed by GitHub
parent b098828c97
commit b83b000590
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
35 changed files with 258 additions and 228 deletions

9
Cargo.lock generated
View File

@ -1093,7 +1093,7 @@ dependencies = [
[[package]]
name = "datafusion"
version = "7.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=ca952bd33402816dbb1550debb9b8cac3b13e8f2#ca952bd33402816dbb1550debb9b8cac3b13e8f2"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=5936edc2a94d5fb20702a41eab2b80695961b9dc#5936edc2a94d5fb20702a41eab2b80695961b9dc"
dependencies = [
"ahash",
"arrow",
@ -1118,12 +1118,13 @@ dependencies = [
"tempfile",
"tokio",
"tokio-stream",
"uuid",
]
[[package]]
name = "datafusion-common"
version = "7.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=ca952bd33402816dbb1550debb9b8cac3b13e8f2#ca952bd33402816dbb1550debb9b8cac3b13e8f2"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=5936edc2a94d5fb20702a41eab2b80695961b9dc#5936edc2a94d5fb20702a41eab2b80695961b9dc"
dependencies = [
"arrow",
"ordered-float 2.10.0",
@ -1134,7 +1135,7 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "7.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=ca952bd33402816dbb1550debb9b8cac3b13e8f2#ca952bd33402816dbb1550debb9b8cac3b13e8f2"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=5936edc2a94d5fb20702a41eab2b80695961b9dc#5936edc2a94d5fb20702a41eab2b80695961b9dc"
dependencies = [
"ahash",
"arrow",
@ -1145,7 +1146,7 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "7.0.0"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=ca952bd33402816dbb1550debb9b8cac3b13e8f2#ca952bd33402816dbb1550debb9b8cac3b13e8f2"
source = "git+https://github.com/apache/arrow-datafusion.git?rev=5936edc2a94d5fb20702a41eab2b80695961b9dc#5936edc2a94d5fb20702a41eab2b80695961b9dc"
dependencies = [
"ahash",
"arrow",

View File

@ -11,7 +11,7 @@ use observability_deps::tracing::trace;
use parquet_file::{chunk::ParquetChunk, metadata::IoxMetadata};
use predicate::{Predicate, PredicateMatch};
use query::{
exec::{stringset::StringSet, IOxExecutionContext},
exec::{stringset::StringSet, IOxSessionContext},
QueryChunk, QueryChunkError, QueryChunkMeta,
};
use schema::{merge::SchemaMerger, selection::Selection, sort::SortKey, Schema};
@ -147,7 +147,7 @@ impl QueryChunk for QueryableParquetChunk {
/// this Chunk. Returns `None` otherwise
fn column_names(
&self,
_ctx: IOxExecutionContext,
_ctx: IOxSessionContext,
_predicate: &Predicate,
_columns: Selection<'_>,
) -> Result<Option<StringSet>, QueryChunkError> {
@ -161,7 +161,7 @@ impl QueryChunk for QueryableParquetChunk {
/// The requested columns must all have String type.
fn column_values(
&self,
_ctx: IOxExecutionContext,
_ctx: IOxSessionContext,
_column_name: &str,
_predicate: &Predicate,
) -> Result<Option<StringSet>, QueryChunkError> {
@ -183,7 +183,7 @@ impl QueryChunk for QueryableParquetChunk {
/// streams from several different `QueryChunk`s.
fn read_filter(
&self,
mut ctx: IOxExecutionContext,
mut ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, QueryChunkError> {

View File

@ -9,5 +9,5 @@ description = "Re-exports datafusion at a specific version"
# Rename to workaround doctest bug
# Turn off optional datafusion features (e.g. don't get support for crypo functions or avro)
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="ca952bd33402816dbb1550debb9b8cac3b13e8f2", default-features = false, package = "datafusion" }
upstream = { git = "https://github.com/apache/arrow-datafusion.git", rev="5936edc2a94d5fb20702a41eab2b80695961b9dc", default-features = false, package = "datafusion" }
workspace-hack = { path = "../workspace-hack"}

View File

@ -4,10 +4,11 @@
use std::sync::Arc;
use std::task::{Context, Poll};
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::common::SizedRecordBatchStream;
use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics};
use datafusion::physical_plan::{collect, ExecutionPlan};
use datafusion::prelude::SessionContext;
use datafusion::{
arrow::{datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch},
logical_plan::{binary_expr, col, lit, Expr, Operator},
@ -224,18 +225,19 @@ pub fn stream_from_schema(schema: SchemaRef) -> SendableRecordBatchStream {
Box::pin(stream)
}
/// Execute the [ExecutionPlan] with a default [RuntimeEnv] and
/// Execute the [ExecutionPlan] with a default [SessionContext] and
/// collect the results in memory.
///
/// # Panics
/// If an an error occurs
pub async fn test_collect(plan: Arc<dyn ExecutionPlan>) -> Vec<RecordBatch> {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
collect(plan, runtime).await.unwrap()
let session_ctx = SessionContext::new();
let task_ctx = Arc::new(TaskContext::from(&session_ctx));
collect(plan, task_ctx).await.unwrap()
}
/// Execute the specified partition of the [ExecutionPlan] with a
/// default [RuntimeEnv] returning the resulting stream.
/// default [SessionContext] returning the resulting stream.
///
/// # Panics
/// If an an error occurs
@ -243,12 +245,13 @@ pub async fn test_execute_partition(
plan: Arc<dyn ExecutionPlan>,
partition: usize,
) -> SendableRecordBatchStream {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
plan.execute(partition, runtime).await.unwrap()
let session_ctx = SessionContext::new();
let task_ctx = Arc::new(TaskContext::from(&session_ctx));
plan.execute(partition, task_ctx).await.unwrap()
}
/// Execute the specified partition of the [ExecutionPlan] with a
/// default [RuntimeEnv] and collect the results in memory.
/// default [SessionContext] and collect the results in memory.
///
/// # Panics
/// If an an error occurs

View File

@ -14,7 +14,7 @@ use metric::{Attributes, DurationCounter, Metric, U64Counter};
use observability_deps::tracing::debug;
use parking_lot::Mutex;
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate};
use query::{exec::IOxExecutionContext, QueryChunk};
use query::{exec::IOxSessionContext, QueryChunk};
use query::{
provider::{ChunkPruner, ProviderBuilder},
pruning::{prune_chunks, PruningObserver},
@ -296,7 +296,7 @@ impl QueryDatabase for QueryCatalogAccess {
fn record_query(
&self,
ctx: &IOxExecutionContext,
ctx: &IOxSessionContext,
query_type: &str,
query_text: QueryText,
) -> QueryCompletedToken {
@ -346,6 +346,15 @@ impl CatalogProvider for QueryCatalogAccess {
_ => None,
}
}
fn register_schema(
&self,
_name: &str,
_schema: Arc<dyn SchemaProvider>,
) -> Option<Arc<dyn SchemaProvider>> {
// https://github.com/apache/arrow-datafusion/issues/2051
unimplemented!("Schemas can not be registered in IOx");
}
}
/// Implement the DataFusion schema provider API

View File

@ -16,7 +16,7 @@ use observability_deps::tracing::debug;
use parquet_file::chunk::ParquetChunk;
use partition_metadata::TableSummary;
use predicate::{Predicate, PredicateMatch};
use query::exec::IOxExecutionContext;
use query::exec::IOxSessionContext;
use query::QueryChunkError;
use query::{exec::stringset::StringSet, QueryChunk, QueryChunkMeta};
use read_buffer::RBChunk;
@ -372,7 +372,7 @@ impl QueryChunk for DbChunk {
fn read_filter(
&self,
mut ctx: IOxExecutionContext,
mut ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, QueryChunkError> {
@ -460,7 +460,7 @@ impl QueryChunk for DbChunk {
fn column_names(
&self,
mut ctx: IOxExecutionContext,
mut ctx: IOxSessionContext,
predicate: &Predicate,
columns: Selection<'_>,
) -> Result<Option<StringSet>, QueryChunkError> {
@ -513,7 +513,7 @@ impl QueryChunk for DbChunk {
fn column_values(
&self,
mut ctx: IOxExecutionContext,
mut ctx: IOxSessionContext,
column_name: &str,
predicate: &Predicate,
) -> Result<Option<StringSet>, QueryChunkError> {
@ -628,7 +628,7 @@ mod tests {
let t1 = time.inc(Duration::from_secs(1));
snapshot
.read_filter(
IOxExecutionContext::default(),
IOxSessionContext::default(),
&Default::default(),
Selection::All,
)
@ -638,7 +638,7 @@ mod tests {
let t2 = time.inc(Duration::from_secs(1));
let column_names = snapshot
.column_names(
IOxExecutionContext::default(),
IOxSessionContext::default(),
&Default::default(),
Selection::All,
)
@ -648,7 +648,7 @@ mod tests {
let t3 = time.inc(Duration::from_secs(1));
let column_values = snapshot
.column_values(IOxExecutionContext::default(), "tag", &Default::default())
.column_values(IOxSessionContext::default(), "tag", &Default::default())
.unwrap()
.is_some();
let m5 = chunk.access_recorder().get_metrics();

View File

@ -46,7 +46,7 @@ use persistence_windows::{checkpoint::ReplayPlan, persistence_windows::Persisten
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate};
use query::QueryChunk;
use query::{
exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext},
exec::{ExecutionContextProvider, Executor, ExecutorType, IOxSessionContext},
QueryCompletedToken, QueryDatabase, QueryText,
};
use rand_distr::{Distribution, Poisson};
@ -1235,7 +1235,7 @@ impl QueryDatabase for Db {
fn record_query(
&self,
ctx: &IOxExecutionContext,
ctx: &IOxSessionContext,
query_type: &str,
query_text: QueryText,
) -> QueryCompletedToken {
@ -1259,7 +1259,7 @@ impl QueryDatabaseMeta for Db {
}
impl ExecutionContextProvider for Db {
fn new_query_context(&self, span_ctx: Option<SpanContext>) -> IOxExecutionContext {
fn new_query_context(&self, span_ctx: Option<SpanContext>) -> IOxSessionContext {
self.exec
.new_execution_config(ExecutorType::Query)
.with_default_catalog(Arc::clone(&self.catalog_access) as _)
@ -1283,6 +1283,15 @@ impl CatalogProvider for Db {
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.catalog_access.schema(name)
}
fn register_schema(
&self,
_name: &str,
_schema: Arc<dyn SchemaProvider>,
) -> Option<Arc<dyn SchemaProvider>> {
// https://github.com/apache/arrow-datafusion/issues/2051
unimplemented!("Schemas can not be registered in IOx");
}
}
pub(crate) fn checkpoint_data_from_catalog(catalog: &Catalog) -> CheckpointData {
@ -1991,7 +2000,7 @@ mod tests {
async fn collect_read_filter(chunk: &DbChunk) -> Vec<RecordBatch> {
chunk
.read_filter(
IOxExecutionContext::default(),
IOxSessionContext::default(),
&Default::default(),
Selection::All,
)

View File

@ -27,7 +27,7 @@ use persistence_windows::{
checkpoint::{DatabaseCheckpoint, PartitionCheckpoint, PersistCheckpointBuilder},
persistence_windows::FlushHandle,
};
use query::{exec::IOxExecutionContext, QueryChunk};
use query::{exec::IOxSessionContext, QueryChunk};
use schema::selection::Selection;
use snafu::ResultExt;
use std::time::Duration;
@ -97,10 +97,10 @@ pub(super) fn write_chunk_to_object_store(
collect_checkpoints(flush_handle.checkpoint(), &db.catalog);
// Get RecordBatchStream of data from the read buffer chunk
// TODO: If we want to trace this we need to wire an IOxExecutionContext through.
// TODO: If we want to trace this we need to wire an IOxSessionContext through.
let stream = db_chunk
.read_filter(
IOxExecutionContext::default(),
IOxSessionContext::default(),
&Default::default(),
Selection::All,
)

View File

@ -2,7 +2,7 @@
//! needed by DataFusion
use arrow::{datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch};
use datafusion::physical_plan::RecordBatchStream;
use query::exec::IOxExecutionContext;
use query::exec::IOxSessionContext;
use read_buffer::ReadFilterResults;
use std::{
@ -14,15 +14,11 @@ use std::{
pub struct ReadFilterResultsStream {
read_results: ReadFilterResults,
schema: SchemaRef,
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
}
impl ReadFilterResultsStream {
pub fn new(
ctx: IOxExecutionContext,
read_results: ReadFilterResults,
schema: SchemaRef,
) -> Self {
pub fn new(ctx: IOxSessionContext, read_results: ReadFilterResults, schema: SchemaRef) -> Self {
Self {
ctx,
read_results,

View File

@ -10,7 +10,7 @@
use super::{catalog::Catalog, query_log::QueryLog};
use arrow::{datatypes::SchemaRef, error::Result, record_batch::RecordBatch};
use async_trait::async_trait;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::context::{TaskContext, TaskProperties};
use datafusion::physical_plan::expressions::PhysicalSortExpr;
use datafusion::physical_plan::{
Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics,
@ -229,11 +229,19 @@ impl<T: IoxSystemTable + 'static> ExecutionPlan for SystemTableExecutionPlan<T>
async fn execute(
&self,
_partition: usize,
runtime: Arc<RuntimeEnv>,
context: Arc<TaskContext>,
) -> DataFusionResult<SendableRecordBatchStream> {
let batch_size = {
if let TaskProperties::SessionConfig(config) = &context.properties {
config.batch_size
} else {
todo!("Need to always have properties")
}
};
Ok(Box::pin(SystemTableStream {
projected_schema: Arc::clone(&self.projected_schema),
batches: self.table.scan(runtime.batch_size)?,
batches: self.table.scan(batch_size)?,
projection: self.projection.clone(),
}))
}

View File

@ -12,7 +12,7 @@ use arrow::{
};
use datafusion::{
datasource::MemTable,
prelude::{ExecutionConfig, ExecutionContext},
prelude::{SessionConfig, SessionContext},
};
use observability_deps::tracing::{debug, info};
@ -44,14 +44,14 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
/// pre-loaded with consolidated system table views.
pub struct Observer {
/// DataFusion execution context for executing queries
context: ExecutionContext,
context: SessionContext,
}
impl Observer {
/// Attempt to create a new observer instance, loading from the remote server
pub async fn try_new(connection: Connection) -> Result<Self> {
let mut context =
ExecutionContext::with_config(ExecutionConfig::new().with_information_schema(true));
SessionContext::with_config(SessionConfig::new().with_information_schema(true));
load_remote_system_tables(&mut context, connection).await?;
@ -98,7 +98,7 @@ ORDER BY estimated_mb desc;
/// remote server into a local copy that also has an extra
/// `database_name` column for the database
async fn load_remote_system_tables(
context: &mut ExecutionContext,
context: &mut SessionContext,
connection: Connection,
) -> Result<()> {
// all prefixed with "system."
@ -230,7 +230,7 @@ impl AggregatedTableBuilder {
}
/// register a table provider for this system table
fn build(self, ctx: &mut ExecutionContext) {
fn build(self, ctx: &mut SessionContext) {
let Self { tables } = self;
for (table_name, table_builder) in tables {
@ -282,7 +282,7 @@ impl VirtualTableBuilder {
}
/// register a table provider for this sytem table
fn build(self, ctx: &mut ExecutionContext) {
fn build(self, ctx: &mut SessionContext) {
let Self {
table_name,
batches,

View File

@ -3,7 +3,7 @@ use std::sync::Arc;
use datafusion::physical_plan::ExecutionPlan;
use query::{
exec::IOxExecutionContext,
exec::IOxSessionContext,
frontend::{influxrpc::InfluxRpcPlanner, sql::SqlQueryPlanner},
group_by::{Aggregate, WindowDuration},
plan::{fieldlist::FieldListPlan, seriesset::SeriesSetPlans, stringset::StringSetPlan},
@ -21,12 +21,12 @@ use predicate::rpc_predicate::InfluxRpcPredicate;
/// pool.
pub struct Planner {
/// Executors (whose threadpool to use)
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
}
impl Planner {
/// Create a new planner that will plan queries using the provided context
pub fn new(ctx: &IOxExecutionContext) -> Self {
pub fn new(ctx: &IOxSessionContext) -> Self {
Self {
ctx: ctx.child_ctx("Planner"),
}

View File

@ -25,7 +25,7 @@ use tonic::{Request, Response, Streaming};
use data_types::{DatabaseName, DatabaseNameError};
use observability_deps::tracing::{info, warn};
use query::exec::{ExecutionContextProvider, IOxExecutionContext};
use query::exec::{ExecutionContextProvider, IOxSessionContext};
use crate::planner::Planner;
@ -265,7 +265,7 @@ struct GetStream {
impl GetStream {
async fn new(
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
physical_plan: Arc<dyn ExecutionPlan>,
database_name: String,
mut query_completed_token: QueryCompletedToken,

View File

@ -24,7 +24,7 @@ use generated_types::{
TimestampRange,
};
use observability_deps::tracing::{error, info, trace};
use query::exec::IOxExecutionContext;
use query::exec::IOxSessionContext;
use query::{
exec::{
fieldlist::FieldList, seriesset::converter::Error as SeriesSetError,
@ -836,7 +836,7 @@ async fn measurement_name_impl<D>(
db_name: DatabaseName<'static>,
range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>,
ctx: &IOxExecutionContext,
ctx: &IOxSessionContext,
) -> Result<StringValuesResponse>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
@ -882,7 +882,7 @@ async fn tag_keys_impl<D>(
measurement: Option<String>,
range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>,
ctx: &IOxExecutionContext,
ctx: &IOxSessionContext,
) -> Result<StringValuesResponse>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
@ -927,7 +927,7 @@ async fn tag_values_impl<D>(
measurement: Option<String>,
range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>,
ctx: &IOxExecutionContext,
ctx: &IOxSessionContext,
) -> Result<StringValuesResponse>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
@ -974,7 +974,7 @@ async fn tag_values_grouped_by_measurement_and_tag_key_impl<D>(
db: Arc<D>,
db_name: DatabaseName<'static>,
req: TagValuesGroupedByMeasurementAndTagKeyRequest,
ctx: &IOxExecutionContext,
ctx: &IOxSessionContext,
) -> Result<Vec<TagValuesResponse>, Error>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
@ -1048,7 +1048,7 @@ async fn read_filter_impl<D>(
db: Arc<D>,
db_name: DatabaseName<'static>,
req: ReadFilterRequest,
ctx: &IOxExecutionContext,
ctx: &IOxSessionContext,
) -> Result<Vec<ReadResponse>, Error>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
@ -1097,7 +1097,7 @@ async fn query_group_impl<D>(
range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>,
gby_agg: GroupByAndAggregate,
ctx: &IOxExecutionContext,
ctx: &IOxSessionContext,
) -> Result<Vec<ReadResponse>, Error>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
@ -1156,7 +1156,7 @@ async fn field_names_impl<D>(
measurement: Option<String>,
range: Option<TimestampRange>,
rpc_predicate: Option<Predicate>,
ctx: &IOxExecutionContext,
ctx: &IOxSessionContext,
) -> Result<FieldList>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
@ -1196,7 +1196,7 @@ async fn materialise_measurement_names<D>(
db: Arc<D>,
db_name: DatabaseName<'static>,
measurement_exprs: Vec<LiteralOrRegex>,
ctx: &IOxExecutionContext,
ctx: &IOxSessionContext,
) -> Result<BTreeSet<String>, Error>
where
D: QueryDatabase + ExecutionContextProvider + 'static,
@ -1275,7 +1275,7 @@ async fn materialise_tag_keys<D>(
db_name: DatabaseName<'static>,
measurement_name: String,
tag_key_predicate: tag_key_predicate::Value,
ctx: &IOxExecutionContext,
ctx: &IOxSessionContext,
) -> Result<BTreeSet<String>, Error>
where
D: QueryDatabase + ExecutionContextProvider + 'static,

View File

@ -22,7 +22,7 @@ use datafusion::{
use observability_deps::tracing::{debug, trace};
use predicate::{Predicate, PredicateMatch};
use query::{
exec::{stringset::StringSet, IOxExecutionContext},
exec::{stringset::StringSet, IOxSessionContext},
util::{df_physical_expr_from_schema_and_expr, MissingColumnsToNull},
QueryChunk, QueryChunkError, QueryChunkMeta,
};
@ -170,7 +170,7 @@ impl QueryChunk for QueryableBatch {
/// this Chunk. Returns `None` otherwise
fn column_names(
&self,
_ctx: IOxExecutionContext,
_ctx: IOxSessionContext,
_predicate: &Predicate,
_columns: Selection<'_>,
) -> Result<Option<StringSet>, QueryChunkError> {
@ -184,7 +184,7 @@ impl QueryChunk for QueryableBatch {
/// The requested columns must all have String type.
fn column_values(
&self,
_ctx: IOxExecutionContext,
_ctx: IOxSessionContext,
_column_name: &str,
_predicate: &Predicate,
) -> Result<Option<StringSet>, QueryChunkError> {
@ -206,7 +206,7 @@ impl QueryChunk for QueryableBatch {
/// streams from several different `QueryChunk`s.
fn read_filter(
&self,
mut ctx: IOxExecutionContext,
mut ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, QueryChunkError> {
@ -404,7 +404,7 @@ mod tests {
let batch = make_queryable_batch("test_table", 1, batches);
let stream = batch
.read_filter(
IOxExecutionContext::default(),
IOxSessionContext::default(),
&Predicate::default(),
Selection::All,
) // return all columns
@ -431,7 +431,7 @@ mod tests {
let batch = make_queryable_batch("test_table", 1, batches);
let stream = batch
.read_filter(
IOxExecutionContext::default(),
IOxSessionContext::default(),
&Predicate::default(),
Selection::Some(&["time", "field_int"]), // return 2 out of 3 columns
)
@ -461,7 +461,7 @@ mod tests {
let pred = PredicateBuilder::default().add_expr(expr).build();
let stream = batch
.read_filter(IOxExecutionContext::default(), &pred, Selection::All)
.read_filter(IOxSessionContext::default(), &pred, Selection::All)
.unwrap();
let batches = datafusion::physical_plan::common::collect(stream)
.await
@ -486,7 +486,7 @@ mod tests {
let pred = PredicateBuilder::default().add_expr(expr).build();
let stream = batch
.read_filter(IOxExecutionContext::default(), &pred, Selection::All)
.read_filter(IOxSessionContext::default(), &pred, Selection::All)
.unwrap();
let batches = datafusion::physical_plan::common::collect(stream)
.await
@ -506,7 +506,7 @@ mod tests {
let pred = PredicateBuilder::default().add_expr(expr).build();
let stream = batch
.read_filter(IOxExecutionContext::default(), &pred, Selection::All)
.read_filter(IOxSessionContext::default(), &pred, Selection::All)
.unwrap();
let batches = datafusion::physical_plan::common::collect(stream)
.await
@ -531,7 +531,7 @@ mod tests {
let batch = make_queryable_batch("test_table", 1, batches);
let stream = batch
.read_filter(
IOxExecutionContext::default(),
IOxSessionContext::default(),
&Predicate::default(),
Selection::Some(&["foo"]), // column not exist
)
@ -550,7 +550,7 @@ mod tests {
let batch = make_queryable_batch("test_table", 1, batches);
let stream = batch
.read_filter(
IOxExecutionContext::default(),
IOxSessionContext::default(),
&Predicate::default(),
Selection::All,
) // return all columns
@ -596,7 +596,7 @@ mod tests {
let pred = PredicateBuilder::default().add_expr(expr).build();
let stream = batch
.read_filter(IOxExecutionContext::default(), &pred, selection)
.read_filter(IOxSessionContext::default(), &pred, selection)
.unwrap();
let batches = datafusion::physical_plan::common::collect(stream)
.await
@ -620,7 +620,7 @@ mod tests {
let batch = make_queryable_batch("test_table", 1, batches);
let stream = batch
.read_filter(
IOxExecutionContext::default(),
IOxSessionContext::default(),
&Predicate::default(),
Selection::Some(&["foo", "bar"]), // column not exist
)

View File

@ -162,7 +162,7 @@ mod test {
datasource::MemTable,
error::DataFusionError,
logical_plan::{col, Expr},
prelude::ExecutionContext,
prelude::SessionContext,
};
use std::sync::Arc;
@ -284,7 +284,7 @@ mod test {
.unwrap();
let provider = MemTable::try_new(Arc::clone(&schema), vec![vec![rb]]).unwrap();
let mut ctx = ExecutionContext::new();
let mut ctx = SessionContext::new();
ctx.register_table("t", Arc::new(provider)).unwrap();
let df = ctx.table("t").unwrap();

View File

@ -293,7 +293,7 @@ mod tests {
use arrow_util::assert_batches_eq;
use futures::StreamExt;
use iox_tests::util::TestCatalog;
use query::{exec::IOxExecutionContext, QueryChunk};
use query::{exec::IOxSessionContext, QueryChunk};
use schema::selection::Selection;
#[tokio::test]
@ -352,7 +352,7 @@ mod tests {
async fn collect_read_filter(chunk: &QuerierChunk) -> Vec<RecordBatch> {
chunk
.read_filter(
IOxExecutionContext::default(),
IOxSessionContext::default(),
&Default::default(),
Selection::All,
)

View File

@ -78,7 +78,7 @@ impl QueryChunk for QuerierChunk {
fn column_names(
&self,
_ctx: query::exec::IOxExecutionContext,
_ctx: query::exec::IOxSessionContext,
predicate: &predicate::Predicate,
columns: schema::selection::Selection<'_>,
) -> Result<Option<query::exec::stringset::StringSet>, QueryChunkError> {
@ -95,7 +95,7 @@ impl QueryChunk for QuerierChunk {
fn column_values(
&self,
_ctx: query::exec::IOxExecutionContext,
_ctx: query::exec::IOxSessionContext,
_column_name: &str,
_predicate: &predicate::Predicate,
) -> Result<Option<query::exec::stringset::StringSet>, QueryChunkError> {
@ -110,7 +110,7 @@ impl QueryChunk for QuerierChunk {
fn read_filter(
&self,
mut ctx: query::exec::IOxExecutionContext,
mut ctx: query::exec::IOxSessionContext,
predicate: &predicate::Predicate,
selection: schema::selection::Selection<'_>,
) -> Result<datafusion::physical_plan::SendableRecordBatchStream, QueryChunkError> {

View File

@ -7,7 +7,7 @@ use datafusion::{
};
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate};
use query::{
exec::{ExecutionContextProvider, ExecutorType, IOxExecutionContext},
exec::{ExecutionContextProvider, ExecutorType, IOxSessionContext},
QueryChunk, QueryCompletedToken, QueryDatabase, QueryText, DEFAULT_SCHEMA,
};
use schema::Schema;
@ -64,7 +64,7 @@ impl QueryDatabase for QuerierNamespace {
fn record_query(
&self,
_ctx: &IOxExecutionContext,
_ctx: &IOxSessionContext,
_query_type: &str,
_query_text: QueryText,
) -> QueryCompletedToken {
@ -112,6 +112,15 @@ impl CatalogProvider for QuerierCatalogProvider {
_ => None,
}
}
fn register_schema(
&self,
_name: &str,
_schema: Arc<dyn SchemaProvider>,
) -> Option<Arc<dyn SchemaProvider>> {
// https://github.com/apache/arrow-datafusion/issues/2051
unimplemented!("Schemas can not be registered in IOx");
}
}
impl CatalogProvider for QuerierNamespace {
@ -126,6 +135,15 @@ impl CatalogProvider for QuerierNamespace {
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
QuerierCatalogProvider::from_namespace(self).schema(name)
}
fn register_schema(
&self,
_name: &str,
_schema: Arc<dyn SchemaProvider>,
) -> Option<Arc<dyn SchemaProvider>> {
// https://github.com/apache/arrow-datafusion/issues/2051
unimplemented!("Schemas can not be registered in IOx");
}
}
/// Provider for user-provided tables in [`DEFAULT_SCHEMA`].
@ -155,7 +173,7 @@ impl SchemaProvider for UserSchemaProvider {
}
impl ExecutionContextProvider for QuerierNamespace {
fn new_query_context(&self, span_ctx: Option<SpanContext>) -> IOxExecutionContext {
fn new_query_context(&self, span_ctx: Option<SpanContext>) -> IOxSessionContext {
self.exec
.new_execution_config(ExecutorType::Query)
.with_default_catalog(Arc::new(QuerierCatalogProvider::from_namespace(self)) as _)

View File

@ -17,12 +17,11 @@ use std::sync::Arc;
use datafusion::{
self,
execution::{runtime_env::RuntimeEnv, DiskManager, MemoryManager},
execution::runtime_env::{RuntimeConfig, RuntimeEnv},
logical_plan::{normalize_col, plan::Extension, Expr, LogicalPlan},
prelude::ExecutionConfig,
};
pub use context::{IOxExecutionConfig, IOxExecutionContext};
pub use context::{IOxSessionConfig, IOxSessionContext};
use schema_pivot::SchemaPivotNode;
use self::{non_null_checker::NonNullCheckerNode, split::StreamSplitNode};
@ -54,11 +53,9 @@ pub struct Executor {
/// The default configuration options with which to create contexts
config: ExecutorConfig,
/// The DataFusion [MemoryManager] used for all queries run in this executor
memory_manager: Arc<MemoryManager>,
/// The DataFusion DiskManager used for all queries run in this executor
disk_manager: Arc<DiskManager>,
/// The DataFusion [RuntimeEnv] (including memory manager and disk
/// manager) used for all executions
runtime: Arc<RuntimeEnv>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@ -83,33 +80,30 @@ impl Executor {
let query_exec = DedicatedExecutor::new("IOx Query Executor Thread", config.num_threads);
let reorg_exec = DedicatedExecutor::new("IOx Reorg Executor Thread", config.num_threads);
let runtime =
RuntimeEnv::new(ExecutionConfig::default().runtime).expect("creating runtime");
let runtime_config = RuntimeConfig::new();
let runtime = Arc::new(RuntimeEnv::new(runtime_config).expect("creating runtime"));
Self {
query_exec,
reorg_exec,
config,
memory_manager: runtime.memory_manager,
disk_manager: runtime.disk_manager,
runtime,
}
}
/// Return a new execution config, suitable for executing a new query or system task.
///
/// Note that this context (and all its clones) will be shut down once `Executor` is dropped.
pub fn new_execution_config(&self, executor_type: ExecutorType) -> IOxExecutionConfig {
pub fn new_execution_config(&self, executor_type: ExecutorType) -> IOxSessionConfig {
let exec = self.executor(executor_type).clone();
IOxExecutionConfig::new(exec)
IOxSessionConfig::new(exec, Arc::clone(&self.runtime))
.with_target_partitions(self.config.target_query_partitions)
.with_memory_manager(Arc::clone(&self.memory_manager))
.with_disk_manager(Arc::clone(&self.disk_manager))
}
/// Create a new execution context, suitable for executing a new query or system task
///
/// Note that this context (and all its clones) will be shut down once `Executor` is dropped.
pub fn new_context(&self, executor_type: ExecutorType) -> IOxExecutionContext {
pub fn new_context(&self, executor_type: ExecutorType) -> IOxSessionContext {
self.new_execution_config(executor_type).build()
}
@ -235,10 +229,10 @@ pub fn make_stream_split(input: LogicalPlan, split_expr: Expr) -> LogicalPlan {
LogicalPlan::Extension(Extension { node })
}
/// A type that can provide `IOxExecutionContext` for query
/// A type that can provide `IOxSessionContext` for query
pub trait ExecutionContextProvider {
/// Returns a new execution context suitable for running queries
fn new_query_context(&self, span_ctx: Option<trace::ctx::SpanContext>) -> IOxExecutionContext;
fn new_query_context(&self, span_ctx: Option<trace::ctx::SpanContext>) -> IOxSessionContext;
}
#[cfg(test)]

View File

@ -9,8 +9,10 @@ use arrow::record_batch::RecordBatch;
use datafusion::{
catalog::catalog::CatalogProvider,
execution::context::{ExecutionContextState, QueryPlanner},
execution::{DiskManager, MemoryManager},
execution::{
context::{QueryPlanner, SessionState, TaskContext},
runtime_env::RuntimeEnv,
},
logical_plan::{LogicalPlan, UserDefinedLogicalNode},
physical_plan::{
coalesce_partitions::CoalescePartitionsExec,
@ -70,7 +72,7 @@ impl QueryPlanner for IOxQueryPlanner {
async fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
session_state: &SessionState,
) -> Result<Arc<dyn ExecutionPlan>> {
// Teach the default physical planner how to plan SchemaPivot
// and StreamSplit nodes.
@ -78,7 +80,7 @@ impl QueryPlanner for IOxQueryPlanner {
DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(IOxExtensionPlanner {})]);
// Delegate most work of physical planning to the default physical planner
physical_planner
.create_physical_plan(logical_plan, ctx_state)
.create_physical_plan(logical_plan, session_state)
.await
}
}
@ -94,7 +96,7 @@ impl ExtensionPlanner for IOxExtensionPlanner {
node: &dyn UserDefinedLogicalNode,
logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
ctx_state: &ExecutionContextState,
session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
let any = node.as_any();
let plan = if let Some(schema_pivot) = any.downcast_ref::<SchemaPivotNode>() {
@ -126,7 +128,7 @@ impl ExtensionPlanner for IOxExtensionPlanner {
stream_split.split_expr(),
logical_inputs[0].schema(),
&physical_inputs[0].schema(),
ctx_state,
session_state,
)?;
Some(Arc::new(StreamSplitExec::new(
@ -144,12 +146,15 @@ impl ExtensionPlanner for IOxExtensionPlanner {
///
/// Created from an Executor
#[derive(Clone)]
pub struct IOxExecutionConfig {
pub struct IOxSessionConfig {
/// Executor to run on
exec: DedicatedExecutor,
/// DataFusion configuration
execution_config: ExecutionConfig,
/// DataFusion session configuration
session_config: SessionConfig,
/// Shared DataFusion runtime
runtime: Arc<RuntimeEnv>,
/// Default catalog
default_catalog: Option<Arc<dyn CatalogProvider>>,
@ -158,26 +163,26 @@ pub struct IOxExecutionConfig {
span_ctx: Option<SpanContext>,
}
impl fmt::Debug for IOxExecutionConfig {
impl fmt::Debug for IOxSessionConfig {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "IOxExecutionConfig ...")
write!(f, "IOxSessionConfig ...")
}
}
const BATCH_SIZE: usize = 1000;
impl IOxExecutionConfig {
pub(super) fn new(exec: DedicatedExecutor) -> Self {
let execution_config = ExecutionConfig::new()
impl IOxSessionConfig {
pub(super) fn new(exec: DedicatedExecutor, runtime: Arc<RuntimeEnv>) -> Self {
let session_config = SessionConfig::new()
.with_batch_size(BATCH_SIZE)
.create_default_catalog_and_schema(true)
.with_information_schema(true)
.with_default_catalog_and_schema(DEFAULT_CATALOG, DEFAULT_SCHEMA)
.with_query_planner(Arc::new(IOxQueryPlanner {}));
.with_default_catalog_and_schema(DEFAULT_CATALOG, DEFAULT_SCHEMA);
Self {
exec,
execution_config,
session_config,
runtime,
default_catalog: None,
span_ctx: None,
}
@ -185,28 +190,12 @@ impl IOxExecutionConfig {
/// Set execution concurrency
pub fn with_target_partitions(mut self, target_partitions: usize) -> Self {
self.execution_config = self
.execution_config
self.session_config = self
.session_config
.with_target_partitions(target_partitions);
self
}
/// Set the [MemoryManager]
pub fn with_memory_manager(mut self, memory_manager: Arc<MemoryManager>) -> Self {
self.execution_config = self
.execution_config
.with_existing_memory_manager(memory_manager);
self
}
/// Set the [DiskManager]
pub fn with_disk_manager(mut self, disk_manager: Arc<DiskManager>) -> Self {
self.execution_config = self
.execution_config
.with_existing_disk_manager(disk_manager);
self
}
/// Set the default catalog provider
pub fn with_default_catalog(self, catalog: Arc<dyn CatalogProvider>) -> Self {
Self {
@ -221,8 +210,11 @@ impl IOxExecutionConfig {
}
/// Create an ExecutionContext suitable for executing DataFusion plans
pub fn build(self) -> IOxExecutionContext {
let inner = ExecutionContext::with_config(self.execution_config);
pub fn build(self) -> IOxSessionContext {
let state = SessionState::with_config(self.session_config, self.runtime)
.with_query_planner(Arc::new(IOxQueryPlanner {}));
let inner = SessionContext::with_state(state);
if let Some(default_catalog) = self.default_catalog {
inner.register_catalog(DEFAULT_CATALOG, default_catalog);
@ -230,7 +222,7 @@ impl IOxExecutionConfig {
let maybe_span = self.span_ctx.map(|ctx| ctx.child("Query Execution"));
IOxExecutionContext {
IOxSessionContext {
inner,
exec: Some(self.exec),
recorder: SpanRecorder::new(maybe_span),
@ -248,11 +240,11 @@ impl IOxExecutionConfig {
/// types such as Memory and providing visibility into what plans are
/// running
///
/// An IOxExecutionContext is created directly from an Executor, or from
/// an IOxExecutionConfig created by an Executor
/// An IOxSessionContext is created directly from an Executor, or from
/// an IOxSessionConfig created by an Executor
#[derive(Default)]
pub struct IOxExecutionContext {
inner: ExecutionContext,
pub struct IOxSessionContext {
inner: SessionContext,
/// Optional dedicated executor for query execution.
///
@ -266,17 +258,17 @@ pub struct IOxExecutionContext {
recorder: SpanRecorder,
}
impl fmt::Debug for IOxExecutionContext {
impl fmt::Debug for IOxSessionContext {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("IOxExecutionContext")
f.debug_struct("IOxSessionContext")
.field("inner", &"<DataFusion ExecutionContext>")
.finish()
}
}
impl IOxExecutionContext {
impl IOxSessionContext {
/// returns a reference to the inner datafusion execution context
pub fn inner(&self) -> &ExecutionContext {
pub fn inner(&self) -> &SessionContext {
&self.inner
}
@ -363,10 +355,10 @@ impl IOxExecutionContext {
.span()
.map(|span| span.child("execute_stream_partitioned"));
let runtime = self.inner.runtime_env();
let task_context = Arc::new(TaskContext::from(self.inner()));
self.run(async move {
let stream = physical_plan.execute(partition, runtime).await?;
let stream = physical_plan.execute(partition, task_context).await?;
let stream = TracedStream::new(stream, span, physical_plan);
Ok(Box::pin(stream) as _)
})
@ -563,7 +555,7 @@ impl IOxExecutionContext {
}
}
/// Returns a IOxExecutionContext with a SpanRecorder that is a child of the current
/// Returns a IOxSessionContext with a SpanRecorder that is a child of the current
pub fn child_ctx(&self, name: &'static str) -> Self {
Self {
inner: self.inner.clone(),

View File

@ -52,7 +52,7 @@ use arrow::{
};
use datafusion::{
error::{DataFusionError as Error, Result},
execution::runtime_env::RuntimeEnv,
execution::context::TaskContext,
logical_plan::{DFSchemaRef, Expr, LogicalPlan, ToDFSchema, UserDefinedLogicalNode},
physical_plan::{
expressions::PhysicalSortExpr,
@ -249,7 +249,7 @@ impl ExecutionPlan for NonNullCheckerExec {
async fn execute(
&self,
partition: usize,
runtime: Arc<RuntimeEnv>,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if self.output_partitioning().partition_count() <= partition {
return Err(Error::Internal(format!(
@ -259,7 +259,7 @@ impl ExecutionPlan for NonNullCheckerExec {
}
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let input_stream = self.input.execute(partition, runtime).await?;
let input_stream = self.input.execute(partition, context).await?;
let (tx, rx) = mpsc::channel(1);

View File

@ -315,7 +315,7 @@ mod tests {
use async_trait::async_trait;
use chrono::TimeZone;
use datafusion::{
execution::runtime_env::RuntimeEnv,
execution::context::TaskContext,
physical_plan::{
expressions::PhysicalSortExpr,
metrics::{Count, Time, Timestamp},
@ -651,7 +651,7 @@ mod tests {
async fn execute(
&self,
_partition: usize,
_runtime: Arc<RuntimeEnv>,
_context: Arc<TaskContext>,
) -> datafusion::error::Result<datafusion::physical_plan::SendableRecordBatchStream>
{
unimplemented!()

View File

@ -34,7 +34,7 @@ use arrow::{
};
use datafusion::{
error::{DataFusionError as Error, Result},
execution::runtime_env::RuntimeEnv,
execution::context::TaskContext,
logical_plan::{DFSchemaRef, Expr, LogicalPlan, ToDFSchema, UserDefinedLogicalNode},
physical_plan::{
common::SizedRecordBatchStream,
@ -218,7 +218,7 @@ impl ExecutionPlan for SchemaPivotExec {
async fn execute(
&self,
partition: usize,
runtime: Arc<RuntimeEnv>,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if self.output_partitioning().partition_count() <= partition {
return Err(Error::Internal(format!(
@ -228,7 +228,7 @@ impl ExecutionPlan for SchemaPivotExec {
}
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let mut input_reader = self.input.execute(partition, runtime).await?;
let mut input_reader = self.input.execute(partition, context).await?;
// Algorithm: for each column we haven't seen a value for yet,
// check each input row;

View File

@ -18,7 +18,7 @@ use arrow::{
};
use datafusion::{
error::{DataFusionError, Result},
execution::runtime_env::RuntimeEnv,
execution::context::TaskContext,
logical_plan::{DFSchemaRef, Expr, LogicalPlan, UserDefinedLogicalNode},
physical_plan::{
expressions::PhysicalSortExpr,
@ -197,10 +197,10 @@ impl ExecutionPlan for StreamSplitExec {
async fn execute(
&self,
partition: usize,
runtime: Arc<RuntimeEnv>,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
trace!(partition, "SplitExec::execute");
self.start_if_needed(runtime).await?;
self.start_if_needed(context).await?;
let mut state = self.state.lock().await;
match &mut (*state) {
@ -240,7 +240,7 @@ impl ExecutionPlan for StreamSplitExec {
impl StreamSplitExec {
/// if in State::New, sets up the output running and sets self.state --> `Running`
async fn start_if_needed(&self, runtime: Arc<RuntimeEnv>) -> Result<()> {
async fn start_if_needed(&self, context: Arc<TaskContext>) -> Result<()> {
let mut state = self.state.lock().await;
if matches!(*state, State::Running { .. }) {
return Ok(());
@ -257,7 +257,7 @@ impl StreamSplitExec {
trace!("Setting up SplitStreamExec state");
let input_stream = self.input.execute(0, runtime).await?;
let input_stream = self.input.execute(0, context).await?;
let (tx0, rx0) = tokio::sync::mpsc::channel(2);
let (tx1, rx1) = tokio::sync::mpsc::channel(2);
let split_expr = Arc::clone(&self.split_expr);

View File

@ -24,7 +24,7 @@ use schema::{InfluxColumnType, Schema, TIME_COLUMN_NAME};
use snafu::{ensure, OptionExt, ResultExt, Snafu};
use crate::{
exec::{field::FieldColumns, make_non_null_checker, make_schema_pivot, IOxExecutionContext},
exec::{field::FieldColumns, make_non_null_checker, make_schema_pivot, IOxSessionContext},
func::{
selectors::{selector_first, selector_last, selector_max, selector_min, SelectorOutput},
window::make_window_bound_expr,
@ -202,18 +202,18 @@ pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Default, Debug)]
pub struct InfluxRpcPlanner {
/// Optional executor currently only used to provide span context for tracing.
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
}
impl InfluxRpcPlanner {
/// Create a new instance of the RPC planner
pub fn new() -> Self {
Self {
ctx: IOxExecutionContext::default(),
ctx: IOxSessionContext::default(),
}
}
pub fn with_execution_context(self, ctx: IOxExecutionContext) -> Self {
pub fn with_execution_context(self, ctx: IOxSessionContext) -> Self {
Self { ctx }
}
@ -895,7 +895,7 @@ impl InfluxRpcPlanner {
/// ```
fn tag_keys_plan(
&self,
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
table_name: &str,
schema: Arc<Schema>,
predicate: &Predicate,
@ -967,7 +967,7 @@ impl InfluxRpcPlanner {
/// ```
fn field_columns_plan(
&self,
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
table_name: &str,
schema: Arc<Schema>,
predicate: &Predicate,
@ -1079,7 +1079,7 @@ impl InfluxRpcPlanner {
/// Scan
fn read_filter_plan(
&self,
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
table_name: impl AsRef<str>,
schema: Arc<Schema>,
predicate: &Predicate,
@ -1192,7 +1192,7 @@ impl InfluxRpcPlanner {
/// Scan
fn read_group_plan(
&self,
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
table_name: &str,
schema: Arc<Schema>,
predicate: &Predicate,
@ -1307,7 +1307,7 @@ impl InfluxRpcPlanner {
#[allow(clippy::too_many_arguments)]
fn read_window_aggregate_plan(
&self,
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
table_name: impl Into<String>,
schema: Arc<Schema>,
predicate: &Predicate,
@ -1392,7 +1392,7 @@ impl InfluxRpcPlanner {
/// ```
fn scan_and_filter(
&self,
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
table_name: &str,
schema: Arc<Schema>,
predicate: &Predicate,

View File

@ -1,6 +1,6 @@
use std::sync::Arc;
use crate::exec::context::IOxExecutionContext;
use crate::exec::context::IOxSessionContext;
use datafusion::{error::Result, physical_plan::ExecutionPlan};
/// This struct can create plans for running SQL queries against databases
@ -17,7 +17,7 @@ impl SqlQueryPlanner {
pub async fn query(
&self,
query: &str,
ctx: &IOxExecutionContext,
ctx: &IOxSessionContext,
) -> Result<Arc<dyn ExecutionPlan>> {
ctx.prepare_sql(query).await
}

View File

@ -691,7 +691,7 @@ mod test {
inputs: Vec<RecordBatch>,
) -> Vec<String> {
let provider = MemTable::try_new(Arc::clone(&schema), vec![inputs]).unwrap();
let mut ctx = ExecutionContext::new();
let mut ctx = SessionContext::new();
ctx.register_table("t", Arc::new(provider)).unwrap();
let df = ctx.table("t").unwrap();

View File

@ -15,7 +15,7 @@ use data_types::{
partition_metadata::{InfluxDbType, TableSummary},
};
use datafusion::physical_plan::SendableRecordBatchStream;
use exec::{stringset::StringSet, IOxExecutionContext};
use exec::{stringset::StringSet, IOxSessionContext};
use observability_deps::tracing::{debug, trace};
use predicate::{rpc_predicate::QueryDatabaseMeta, Predicate, PredicateMatch};
use schema::selection::Selection;
@ -145,7 +145,7 @@ pub trait QueryDatabase: QueryDatabaseMeta + Debug + Send + Sync {
/// Record that particular type of query was run / planned
fn record_query(
&self,
ctx: &IOxExecutionContext,
ctx: &IOxSessionContext,
query_type: &str,
query_text: QueryText,
) -> QueryCompletedToken;
@ -192,7 +192,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
/// this Chunk. Returns `None` otherwise
fn column_names(
&self,
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
predicate: &Predicate,
columns: Selection<'_>,
) -> Result<Option<StringSet>, QueryChunkError>;
@ -204,7 +204,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
/// The requested columns must all have String type.
fn column_values(
&self,
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
column_name: &str,
predicate: &Predicate,
) -> Result<Option<StringSet>, QueryChunkError>;
@ -224,7 +224,7 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
/// streams from several different `QueryChunk`s.
fn read_filter(
&self,
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, QueryChunkError>;

View File

@ -23,7 +23,7 @@ use schema::{merge::SchemaMerger, sort::SortKey, InfluxColumnType, Schema};
use crate::{
chunks_have_stats, compute_sort_key_for_chunks,
exec::IOxExecutionContext,
exec::IOxSessionContext,
util::{arrow_sort_key_exprs, df_physical_expr},
QueryChunk,
};
@ -109,7 +109,7 @@ pub struct ProviderBuilder {
sort_key: Option<SortKey>,
// execution context used for tracing
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
}
impl ProviderBuilder {
@ -120,11 +120,11 @@ impl ProviderBuilder {
chunk_pruner: None,
chunks: Vec::new(),
sort_key: None,
ctx: IOxExecutionContext::default(),
ctx: IOxSessionContext::default(),
}
}
pub fn with_execution_context(self, ctx: IOxExecutionContext) -> Self {
pub fn with_execution_context(self, ctx: IOxSessionContext) -> Self {
Self { ctx, ..self }
}
@ -204,7 +204,7 @@ pub struct ChunkTableProvider {
sort_key: Option<SortKey>,
// execution context
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
}
impl ChunkTableProvider {
@ -312,7 +312,7 @@ pub(crate) struct Deduplicater {
pub no_duplicates_chunks: Vec<Arc<dyn QueryChunk>>,
// execution context
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
}
impl Deduplicater {
@ -321,11 +321,11 @@ impl Deduplicater {
overlapped_chunks_set: vec![],
in_chunk_duplicates_chunks: vec![],
no_duplicates_chunks: vec![],
ctx: IOxExecutionContext::default(),
ctx: IOxSessionContext::default(),
}
}
pub(crate) fn with_execution_context(self, ctx: IOxExecutionContext) -> Self {
pub(crate) fn with_execution_context(self, ctx: IOxSessionContext) -> Self {
Self { ctx, ..self }
}
@ -587,7 +587,7 @@ impl Deduplicater {
/// └─────────────────┘ └─────────────────┘
///```
fn build_deduplicate_plan_for_overlapped_chunks(
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
table_name: Arc<str>,
output_schema: Arc<Schema>,
chunks: Vec<Arc<dyn QueryChunk>>, // These chunks are identified overlapped
@ -676,7 +676,7 @@ impl Deduplicater {
/// └─────────────────┘
///```
fn build_deduplicate_plan_for_chunk_with_duplicates(
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
table_name: Arc<str>,
output_schema: Arc<Schema>,
chunk: Arc<dyn QueryChunk>, // This chunk is identified having duplicates
@ -804,7 +804,7 @@ impl Deduplicater {
/// └─────────────────┘
///```
fn build_sort_plan_for_read_filter(
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
table_name: Arc<str>,
output_schema: Arc<Schema>,
chunk: Arc<dyn QueryChunk>, // This chunk is identified having duplicates
@ -949,7 +949,7 @@ impl Deduplicater {
/// Return the simplest IOx scan plan of a given chunk which is IOxReadFilterNode
// And some optional operators on top such as applying delete predicates or sort the chunk
fn build_plan_for_non_duplicates_chunk(
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
table_name: Arc<str>,
output_schema: Arc<Schema>,
chunk: Arc<dyn QueryChunk>, // This chunk is identified having no duplicates
@ -998,7 +998,7 @@ impl Deduplicater {
/// └─────────────────┘ └─────────────────┘
///```
fn build_plans_for_non_duplicates_chunks(
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
table_name: Arc<str>,
output_schema: Arc<Schema>,
chunks: Vec<Arc<dyn QueryChunk>>, // These chunks is identified having no duplicates
@ -1177,7 +1177,7 @@ mod test {
// IOx scan operator
let input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
IOxExecutionContext::default(),
IOxSessionContext::default(),
Arc::from("t"),
chunk.schema(),
vec![Arc::clone(&chunk)],
@ -1251,7 +1251,7 @@ mod test {
// IOx scan operator
let input: Arc<dyn ExecutionPlan> = Arc::new(IOxReadFilterNode::new(
IOxExecutionContext::default(),
IOxSessionContext::default(),
Arc::from("t"),
chunk.schema(),
vec![Arc::clone(&chunk)],
@ -1327,7 +1327,7 @@ mod test {
let schema = chunk.schema();
let sort_plan = Deduplicater::build_sort_plan_for_read_filter(
IOxExecutionContext::default(),
IOxSessionContext::default(),
Arc::from("t"),
schema,
Arc::clone(&chunk),
@ -1402,7 +1402,7 @@ mod test {
let output_sort_key = SortKey::from_columns(vec!["tag1", "tag2", "time"]);
let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks(
IOxExecutionContext::default(), //TODO(edd): address this.
IOxSessionContext::default(), //TODO(edd): address this.
Arc::from("t"),
schema,
chunks,
@ -1482,7 +1482,7 @@ mod test {
let output_sort_key = SortKey::from_columns(vec!["tag1", "tag2", "time"]);
let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks(
IOxExecutionContext::default(), //TODO(edd): address this.
IOxSessionContext::default(), //TODO(edd): address this.
Arc::from("t"),
Arc::new(schema),
chunks,
@ -1576,7 +1576,7 @@ mod test {
let output_sort_key = SortKey::from_columns(vec!["tag2", "tag1", "time"]);
let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks(
IOxExecutionContext::default(), //TODO(edd): address this.
IOxSessionContext::default(), //TODO(edd): address this.
Arc::from("t"),
Arc::new(schema),
chunks,
@ -1680,7 +1680,7 @@ mod test {
let output_sort_key = SortKey::from_columns(vec!["tag2", "tag1", "time"]);
let sort_plan = Deduplicater::build_deduplicate_plan_for_overlapped_chunks(
IOxExecutionContext::default(), //TODO(edd): address this.
IOxSessionContext::default(), //TODO(edd): address this.
Arc::from("t"),
Arc::new(schema),
chunks,

View File

@ -14,7 +14,7 @@ use datafusion_util::AdapterStream;
use self::algo::RecordBatchDeduplicator;
use datafusion::{
error::{DataFusionError, Result},
execution::runtime_env::RuntimeEnv,
execution::context::TaskContext,
physical_plan::{
expressions::PhysicalSortExpr,
metrics::{
@ -186,7 +186,7 @@ impl ExecutionPlan for DeduplicateExec {
async fn execute(
&self,
partition: usize,
runtime: Arc<RuntimeEnv>,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
if partition != 0 {
return Err(DataFusionError::Internal(
@ -195,7 +195,7 @@ impl ExecutionPlan for DeduplicateExec {
}
let deduplicate_metrics = DeduplicateMetrics::new(&self.metrics, partition);
let input_stream = self.input.execute(0, runtime).await?;
let input_stream = self.input.execute(0, context).await?;
// the deduplication is performed in a separate task which is
// then sent via a channel to the output
@ -1169,7 +1169,7 @@ mod test {
async fn execute(
&self,
partition: usize,
_runtime: Arc<RuntimeEnv>,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
assert_eq!(partition, 0);

View File

@ -6,7 +6,7 @@ use arrow::datatypes::SchemaRef;
use data_types::partition_metadata::TableSummary;
use datafusion::{
error::DataFusionError,
execution::runtime_env::RuntimeEnv,
execution::context::TaskContext,
physical_plan::{
expressions::PhysicalSortExpr,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet},
@ -16,7 +16,7 @@ use datafusion::{
use schema::selection::Selection;
use schema::Schema;
use crate::{exec::IOxExecutionContext, QueryChunk};
use crate::{exec::IOxSessionContext, QueryChunk};
use predicate::Predicate;
use async_trait::async_trait;
@ -36,7 +36,7 @@ pub(crate) struct IOxReadFilterNode {
metrics: ExecutionPlanMetricsSet,
// execution context used for tracing
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
}
impl IOxReadFilterNode {
@ -44,7 +44,7 @@ impl IOxReadFilterNode {
/// output according to schema, while applying `predicate` and
/// returns
pub fn new(
ctx: IOxExecutionContext,
ctx: IOxSessionContext,
table_name: Arc<str>,
iox_schema: Arc<Schema>,
chunks: Vec<Arc<dyn QueryChunk>>,
@ -96,7 +96,7 @@ impl ExecutionPlan for IOxReadFilterNode {
// For some reason when I used an automatically derived `Clone` implementation
// the compiler didn't recognize the trait implementation
let new_self = Self {
ctx: IOxExecutionContext::default(), // FIXME: we can't clone context because we shouldn't clone span recorder bits
ctx: IOxSessionContext::default(), // FIXME: we can't clone context because we shouldn't clone span recorder bits
table_name: Arc::clone(&self.table_name),
iox_schema: Arc::clone(&self.iox_schema),
chunks,
@ -110,7 +110,7 @@ impl ExecutionPlan for IOxReadFilterNode {
async fn execute(
&self,
partition: usize,
_runtime: Arc<RuntimeEnv>,
_context: Arc<TaskContext>,
) -> datafusion::error::Result<SendableRecordBatchStream> {
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
let timer = baseline_metrics.elapsed_compute().timer();

View File

@ -3,7 +3,7 @@
//!
//! AKA it is a Mock
use crate::exec::{ExecutionContextProvider, Executor, ExecutorType, IOxExecutionContext};
use crate::exec::{ExecutionContextProvider, Executor, ExecutorType, IOxSessionContext};
use crate::{
exec::stringset::{StringSet, StringSetRef},
Predicate, PredicateMatch, QueryChunk, QueryChunkMeta, QueryDatabase,
@ -117,7 +117,7 @@ impl QueryDatabase for TestDatabase {
fn record_query(
&self,
_ctx: &IOxExecutionContext,
_ctx: &IOxSessionContext,
_query_type: &str,
_query_text: QueryText,
) -> QueryCompletedToken {
@ -161,7 +161,7 @@ impl QueryDatabaseMeta for TestDatabase {
}
impl ExecutionContextProvider for TestDatabase {
fn new_query_context(&self, span_ctx: Option<SpanContext>) -> IOxExecutionContext {
fn new_query_context(&self, span_ctx: Option<SpanContext>) -> IOxSessionContext {
// Note: unlike Db this does not register a catalog provider
self.executor
.new_execution_config(ExecutorType::Query)
@ -885,7 +885,7 @@ impl QueryChunk for TestChunk {
fn read_filter(
&self,
_ctx: IOxExecutionContext,
_ctx: IOxSessionContext,
predicate: &Predicate,
_selection: Selection<'_>,
) -> Result<SendableRecordBatchStream, QueryChunkError> {
@ -921,7 +921,7 @@ impl QueryChunk for TestChunk {
fn column_values(
&self,
_ctx: IOxExecutionContext,
_ctx: IOxSessionContext,
_column_name: &str,
_predicate: &Predicate,
) -> Result<Option<StringSet>, QueryChunkError> {
@ -931,7 +931,7 @@ impl QueryChunk for TestChunk {
fn column_names(
&self,
_ctx: IOxExecutionContext,
_ctx: IOxSessionContext,
predicate: &Predicate,
selection: Selection<'_>,
) -> Result<Option<StringSet>, QueryChunkError> {
@ -983,7 +983,7 @@ pub async fn raw_data(chunks: &[Arc<dyn QueryChunk>]) -> Vec<RecordBatch> {
let pred = Predicate::default();
let selection = Selection::All;
let mut stream = c
.read_filter(IOxExecutionContext::default(), &pred, selection)
.read_filter(IOxSessionContext::default(), &pred, selection)
.expect("Error in read_filter");
while let Some(b) = stream.next().await {
let b = b.expect("Error in stream");

View File

@ -2,7 +2,7 @@ use arrow_util::assert_batches_sorted_eq;
use db::{test_helpers::write_lp, utils::TestDb};
use object_store::{DynObjectStore, ObjectStoreImpl, ObjectStoreIntegration};
use query::{
exec::{ExecutionContextProvider, IOxExecutionContext},
exec::{ExecutionContextProvider, IOxSessionContext},
frontend::sql::SqlQueryPlanner,
QueryChunk,
};
@ -115,7 +115,7 @@ async fn test_query_cancellation_slow_store() {
}
/// Wait up to 10s for correct task count.
async fn wait_for_tasks(ctx: &IOxExecutionContext, n: usize) {
async fn wait_for_tasks(ctx: &IOxSessionContext, n: usize) {
tokio::time::timeout(Duration::from_secs(10), async {
loop {
if dbg!(ctx.tasks()) == n {

View File

@ -1,5 +1,5 @@
use datafusion::error::DataFusionError;
use query::exec::IOxExecutionContext;
use query::exec::IOxSessionContext;
use query::plan::seriesset::SeriesSetPlans;
/// Run a series set plan to completion and produce a Vec<String> representation
@ -9,7 +9,7 @@ use query::plan::seriesset::SeriesSetPlans;
/// Panics if there is an error executing a plan, or if unexpected series set
/// items are returned.
#[cfg(test)]
pub async fn run_series_set_plan(ctx: &IOxExecutionContext, plans: SeriesSetPlans) -> Vec<String> {
pub async fn run_series_set_plan(ctx: &IOxSessionContext, plans: SeriesSetPlans) -> Vec<String> {
run_series_set_plan_maybe_error(ctx, plans)
.await
.expect("running plans")
@ -18,7 +18,7 @@ pub async fn run_series_set_plan(ctx: &IOxExecutionContext, plans: SeriesSetPlan
/// Run a series set plan to completion and produce a Result<Vec<String>> representation
#[cfg(test)]
pub async fn run_series_set_plan_maybe_error(
ctx: &IOxExecutionContext,
ctx: &IOxSessionContext,
plans: SeriesSetPlans,
) -> Result<Vec<String>, DataFusionError> {
Ok(ctx