refactor: Use scan_and_filter in ReorgPlanner (#4822)

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
pull/24376/head
Andrew Lamb 2022-06-10 13:31:25 -04:00 committed by GitHub
parent 99f1f0a10c
commit 9fdbfb05e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 73 additions and 45 deletions

View File

@ -33,12 +33,7 @@ pub struct Config {
table: String,
/// The columns to request
#[clap(
long = "--columns",
default_value = "",
multiple_values = true,
use_value_delimiter = true
)]
#[clap(long = "--columns", multiple_values = true, use_value_delimiter = true)]
columns: Vec<String>,
/// Predicate in base64 protobuf encoded form.

View File

@ -5,6 +5,7 @@ use arrow::{error::ArrowError, record_batch::RecordBatch};
use arrow_util::util::merge_record_batches;
use datafusion::{
error::DataFusionError,
logical_plan::LogicalPlanBuilder,
physical_plan::{
common::SizedRecordBatchStream,
metrics::{ExecutionPlanMetricsSet, MemTrackingMetrics},
@ -26,11 +27,14 @@ use std::{collections::BTreeMap, sync::Arc};
#[derive(Debug, Snafu)]
#[allow(missing_copy_implementations, missing_docs)]
pub enum Error {
#[snafu(display("Error building logical plan for querying Ingester data to send to Querier"))]
LogicalPlan {
#[snafu(display("Error in ReorgPlanner reorg for querying Ingester data to send to Querier"))]
ReorgPlanner {
source: iox_query::frontend::reorg::Error,
},
#[snafu(display("Error building logical plan for querying Ingester data to send to Querier"))]
LogicalPlan { source: DataFusionError },
#[snafu(display(
"Error building physical plan for querying Ingester data to send to Querier: {}",
source
@ -277,7 +281,7 @@ async fn run_query(
/// Query a given Queryable Batch, applying selection and filters as appropriate
/// Return stream of record batches
pub async fn query(
pub(crate) async fn query(
executor: &Executor,
data: Arc<QueryableBatch>,
predicate: Predicate,
@ -286,19 +290,6 @@ pub async fn query(
// Build logical plan for filtering data
// Note that this query will also apply the delete predicates that go with the QueryableBatch
let indices = match selection {
Selection::All => None,
Selection::Some(columns) => {
let schema = data.schema();
Some(
columns
.iter()
.flat_map(|&column_name| schema.find_index_of(column_name))
.collect(),
)
}
};
let mut expr = vec![];
if let Some(filter_expr) = predicate.filter_expr() {
expr.push(filter_expr);
@ -308,8 +299,29 @@ pub async fn query(
// we may want to add more types into the ExecutorType to have better log and resource managment
let ctx = executor.new_context(ExecutorType::Query);
let logical_plan = ReorgPlanner::new()
.scan_single_chunk_plan_with_filter(data.schema(), data, indices, expr)
.context(LogicalPlanSnafu {})?;
.scan_single_chunk_plan_with_filter(data.schema(), data, &predicate)
.context(ReorgPlannerSnafu {})?;
// Now, restrict to all columns that are relevant
let logical_plan = match selection {
Selection::All => logical_plan,
Selection::Some(cols) => {
// filter out columns that are not in the schema
let schema = Arc::clone(logical_plan.schema());
let cols = cols.iter().filter_map(|col_name| {
schema
.index_of_column_by_name(None, col_name)
.ok()
.map(|_| datafusion::prelude::col(col_name))
});
LogicalPlanBuilder::from(logical_plan)
.project(cols)
.context(LogicalPlanSnafu)?
.build()
.context(LogicalPlanSnafu)?
}
};
// Build physical plan
let physical_plan = ctx

View File

@ -3,17 +3,19 @@
use std::sync::Arc;
use datafusion::logical_plan::{
col, lit_timestamp_nano, provider_as_source, Expr, LogicalPlan, LogicalPlanBuilder,
col, lit_timestamp_nano, provider_as_source, LogicalPlan, LogicalPlanBuilder,
};
use observability_deps::tracing::debug;
use observability_deps::tracing::{debug, trace};
use predicate::Predicate;
use schema::{sort::SortKey, Schema, TIME_COLUMN_NAME};
use crate::{
exec::make_stream_split,
exec::{make_stream_split, IOxSessionContext},
frontend::common::{scan_and_filter, TableScanAndFilter},
provider::{ChunkTableProvider, ProviderBuilder},
QueryChunk,
};
use snafu::{ResultExt, Snafu};
use snafu::{OptionExt, ResultExt, Snafu};
#[derive(Debug, Snafu)]
pub enum Error {
@ -25,6 +27,9 @@ pub enum Error {
source: datafusion::error::DataFusionError,
},
#[snafu(display("Reorg planner had no data for: {}", table_name))]
NoRows { table_name: String },
#[snafu(display(
"Reorg planner got error adding chunk for table {}: {}",
table_name,
@ -34,6 +39,16 @@ pub enum Error {
table_name: String,
source: crate::provider::Error,
},
#[snafu(display(
"Reorg planner got error adding creating scan for {}: {}",
table_name,
source
))]
CreatingScan {
table_name: String,
source: super::common::Error,
},
}
pub type Result<T, E = Error> = std::result::Result<T, E>;
@ -55,7 +70,7 @@ impl ReorgPlanner {
schema: Arc<Schema>,
chunk: Arc<dyn QueryChunk>,
) -> Result<LogicalPlan> {
self.scan_single_chunk_plan_with_filter(schema, chunk, None, vec![])
self.scan_single_chunk_plan_with_filter(schema, chunk, &Predicate::default())
}
/// Creates an execution plan for a scan and filter data of a single chunk
@ -63,28 +78,34 @@ impl ReorgPlanner {
&self,
schema: Arc<Schema>,
chunk: Arc<dyn QueryChunk>,
projection: Option<Vec<usize>>,
filters: Vec<Expr>,
predicate: &Predicate,
) -> Result<LogicalPlan> {
let table_name = chunk.table_name();
// Prepare the plan for the table
let mut builder = ProviderBuilder::new(table_name, schema);
let table_name = chunk.table_name().to_string();
debug!(%table_name, ?predicate, "Creating single chunk scan plan");
trace!(?schema, chunk_schema=?chunk.schema(), "Schemas");
// There are no predicates in these plans, so no need to prune them
builder = builder.add_no_op_pruner();
builder = builder.add_chunk(Arc::clone(&chunk));
let ctx = IOxSessionContext::default();
let provider = builder
.build()
.context(CreatingProviderSnafu { table_name })?;
let scan_and_filter = scan_and_filter(
ctx.child_ctx("scan_and_filter planning"),
&table_name,
schema,
predicate,
vec![chunk],
)
.context(CreatingScanSnafu {
table_name: &table_name,
})?;
let source = provider_as_source(Arc::new(provider));
let TableScanAndFilter {
plan_builder,
schema: _,
} = scan_and_filter.context(NoRowsSnafu {
table_name: &table_name,
})?;
// Logical plan to scan given columns and apply predicates
let plan = LogicalPlanBuilder::scan_with_filters(table_name, source, projection, filters)
.context(BuildingPlanSnafu)?
.build()
.context(BuildingPlanSnafu)?;
let plan = plan_builder.build().context(BuildingPlanSnafu)?;
debug!(%table_name, plan=%plan.display_indent_schema(),
"created single chunk scan plan");