refactor: consolidate pruning code (#7815)
Let's have a single chunk pruning implementation in our code, not two. Also removes a bit of crust from `QueryChunk` since it is technically no longer responsible for pruning (this part has been pushed into the querier for early pruning and bits for the `iox_query_influxrpc` for some RPC shenanigans).pull/24376/head
parent
bf847584d4
commit
db9fe92981
|
@ -23,7 +23,7 @@ use hashbrown::HashMap;
|
|||
use observability_deps::tracing::{debug, trace};
|
||||
use once_cell::sync::Lazy;
|
||||
use parquet_file::storage::ParquetExecInput;
|
||||
use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate, PredicateMatch};
|
||||
use predicate::{rpc_predicate::QueryNamespaceMeta, Predicate};
|
||||
use schema::{
|
||||
sort::{SortKey, SortKeyBuilder},
|
||||
Projection, Schema, TIME_COLUMN_NAME,
|
||||
|
@ -236,25 +236,6 @@ pub trait QueryChunk: QueryChunkMeta + Debug + Send + Sync + 'static {
|
|||
/// key" within itself
|
||||
fn may_contain_pk_duplicates(&self) -> bool;
|
||||
|
||||
/// Returns the result of applying the `predicate` to the chunk
|
||||
/// using an efficient, but inexact method, based on metadata.
|
||||
///
|
||||
/// NOTE: This method is suitable for calling during planning, and
|
||||
/// may return PredicateMatch::Unknown for certain types of
|
||||
/// predicates.
|
||||
fn apply_predicate_to_metadata(
|
||||
&self,
|
||||
ctx: &IOxSessionContext,
|
||||
predicate: &Predicate,
|
||||
) -> Result<PredicateMatch, DataFusionError> {
|
||||
let state = ctx.inner().state();
|
||||
Ok(predicate.apply_to_table_summary(
|
||||
state.execution_props(),
|
||||
&self.summary(),
|
||||
self.schema().as_arrow(),
|
||||
))
|
||||
}
|
||||
|
||||
/// Returns a set of Strings with column names from the specified
|
||||
/// table that have at least one row that matches `predicate`, if
|
||||
/// the predicate can be evaluated entirely on the metadata of
|
||||
|
|
|
@ -7,8 +7,9 @@ use crate::{
|
|||
stringset::{StringSet, StringSetRef},
|
||||
ExecutionContextProvider, Executor, ExecutorType, IOxSessionContext,
|
||||
},
|
||||
Predicate, PredicateMatch, QueryChunk, QueryChunkData, QueryChunkMeta, QueryCompletedToken,
|
||||
QueryNamespace, QueryText,
|
||||
pruning::prune_chunks,
|
||||
Predicate, QueryChunk, QueryChunkData, QueryChunkMeta, QueryCompletedToken, QueryNamespace,
|
||||
QueryText,
|
||||
};
|
||||
use arrow::array::{BooleanArray, Float64Array};
|
||||
use arrow::datatypes::SchemaRef;
|
||||
|
@ -116,7 +117,7 @@ impl QueryNamespace for TestDatabase {
|
|||
table_name: &str,
|
||||
predicate: &Predicate,
|
||||
_projection: Option<&Vec<usize>>,
|
||||
ctx: IOxSessionContext,
|
||||
_ctx: IOxSessionContext,
|
||||
) -> Result<Vec<Arc<dyn QueryChunk>>, DataFusionError> {
|
||||
// save last predicate
|
||||
*self.chunks_predicate.lock() = predicate.clone();
|
||||
|
@ -129,14 +130,14 @@ impl QueryNamespace for TestDatabase {
|
|||
.filter(|c| c.table_name == table_name)
|
||||
// only keep chunks if their statistics overlap
|
||||
.filter(|c| {
|
||||
!matches!(
|
||||
predicate.apply_to_table_summary(
|
||||
ctx.inner().state().execution_props(),
|
||||
&c.table_summary,
|
||||
c.schema.as_arrow()
|
||||
),
|
||||
PredicateMatch::Zero
|
||||
prune_chunks(
|
||||
c.schema(),
|
||||
&[Arc::clone(*c) as Arc<dyn QueryChunk>],
|
||||
predicate,
|
||||
)
|
||||
.ok()
|
||||
.map(|res| res[0])
|
||||
.unwrap_or(true)
|
||||
})
|
||||
.map(|x| Arc::clone(x) as Arc<dyn QueryChunk>)
|
||||
.collect::<Vec<_>>())
|
||||
|
@ -322,18 +323,12 @@ pub struct TestChunk {
|
|||
/// Set the flag if this chunk might contain duplicates
|
||||
may_contain_pk_duplicates: bool,
|
||||
|
||||
/// A copy of the captured predicates passed
|
||||
predicates: Mutex<Vec<Predicate>>,
|
||||
|
||||
/// Data in this chunk.
|
||||
table_data: QueryChunkData,
|
||||
|
||||
/// A saved error that is returned instead of actual results
|
||||
saved_error: Option<String>,
|
||||
|
||||
/// Return value for apply_predicate, if desired
|
||||
predicate_match: Option<PredicateMatch>,
|
||||
|
||||
/// Copy of delete predicates passed
|
||||
delete_predicates: Vec<Arc<DeletePredicate>>,
|
||||
|
||||
|
@ -417,10 +412,8 @@ impl TestChunk {
|
|||
table_summary: TableSummary::default(),
|
||||
id: ChunkId::new_test(0),
|
||||
may_contain_pk_duplicates: Default::default(),
|
||||
predicates: Default::default(),
|
||||
table_data: QueryChunkData::RecordBatches(vec![]),
|
||||
saved_error: Default::default(),
|
||||
predicate_match: Default::default(),
|
||||
delete_predicates: Default::default(),
|
||||
order: ChunkOrder::MIN,
|
||||
sort_key: None,
|
||||
|
@ -507,12 +500,6 @@ impl TestChunk {
|
|||
self
|
||||
}
|
||||
|
||||
/// specify that any call to apply_predicate should return this value
|
||||
pub fn with_predicate_match(mut self, predicate_match: PredicateMatch) -> Self {
|
||||
self.predicate_match = Some(predicate_match);
|
||||
self
|
||||
}
|
||||
|
||||
/// Checks the saved error, and returns it if any, otherwise returns OK
|
||||
fn check_error(&self) -> Result<(), DataFusionError> {
|
||||
if let Some(message) = self.saved_error.as_ref() {
|
||||
|
@ -765,11 +752,6 @@ impl TestChunk {
|
|||
self
|
||||
}
|
||||
|
||||
/// Get a copy of any predicate passed to the function
|
||||
pub fn predicates(&self) -> Vec<Predicate> {
|
||||
self.predicates.lock().clone()
|
||||
}
|
||||
|
||||
/// Prepares this chunk to return a specific record batch with one
|
||||
/// row of non null data.
|
||||
/// tag: MA
|
||||
|
@ -1167,6 +1149,8 @@ impl QueryChunk for TestChunk {
|
|||
}
|
||||
|
||||
fn data(&self) -> QueryChunkData {
|
||||
self.check_error().unwrap();
|
||||
|
||||
self.table_data.clone()
|
||||
}
|
||||
|
||||
|
@ -1174,30 +1158,14 @@ impl QueryChunk for TestChunk {
|
|||
"Test Chunk"
|
||||
}
|
||||
|
||||
fn apply_predicate_to_metadata(
|
||||
&self,
|
||||
_ctx: &IOxSessionContext,
|
||||
predicate: &Predicate,
|
||||
) -> Result<PredicateMatch, DataFusionError> {
|
||||
self.check_error()?;
|
||||
|
||||
// save the predicate
|
||||
self.predicates.lock().push(predicate.clone());
|
||||
|
||||
// check if there is a saved result to return
|
||||
if let Some(&predicate_match) = self.predicate_match.as_ref() {
|
||||
return Ok(predicate_match);
|
||||
}
|
||||
|
||||
Ok(PredicateMatch::Unknown)
|
||||
}
|
||||
|
||||
fn column_values(
|
||||
&self,
|
||||
_ctx: IOxSessionContext,
|
||||
_column_name: &str,
|
||||
_predicate: &Predicate,
|
||||
) -> Result<Option<StringSet>, DataFusionError> {
|
||||
self.check_error()?;
|
||||
|
||||
// Model not being able to get column values from metadata
|
||||
Ok(None)
|
||||
}
|
||||
|
@ -1205,14 +1173,11 @@ impl QueryChunk for TestChunk {
|
|||
fn column_names(
|
||||
&self,
|
||||
_ctx: IOxSessionContext,
|
||||
predicate: &Predicate,
|
||||
_predicate: &Predicate,
|
||||
selection: Projection<'_>,
|
||||
) -> Result<Option<StringSet>, DataFusionError> {
|
||||
self.check_error()?;
|
||||
|
||||
// save the predicate
|
||||
self.predicates.lock().push(predicate.clone());
|
||||
|
||||
// only return columns specified in selection
|
||||
let column_names = match selection {
|
||||
Projection::All => self.all_column_names(),
|
||||
|
@ -1233,6 +1198,8 @@ impl QueryChunk for TestChunk {
|
|||
|
||||
impl QueryChunkMeta for TestChunk {
|
||||
fn summary(&self) -> Arc<TableSummary> {
|
||||
self.check_error().unwrap();
|
||||
|
||||
Arc::new(self.table_summary.clone())
|
||||
}
|
||||
|
||||
|
|
|
@ -27,13 +27,13 @@ use iox_query::{
|
|||
},
|
||||
QueryChunk, QueryNamespace,
|
||||
};
|
||||
use observability_deps::tracing::{debug, trace, warn};
|
||||
use observability_deps::tracing::{debug, warn};
|
||||
use predicate::{
|
||||
rpc_predicate::{
|
||||
InfluxRpcPredicate, FIELD_COLUMN_NAME, GROUP_KEY_SPECIAL_START, GROUP_KEY_SPECIAL_STOP,
|
||||
MEASUREMENT_COLUMN_NAME,
|
||||
},
|
||||
Predicate, PredicateMatch,
|
||||
Predicate,
|
||||
};
|
||||
use query_functions::{
|
||||
group_by::{Aggregate, WindowDuration},
|
||||
|
@ -246,55 +246,19 @@ impl InfluxRpcPlanner {
|
|||
// Special case predicates that span the entire valid timestamp range
|
||||
let rpc_predicate = rpc_predicate.clear_timestamp_if_max_range();
|
||||
|
||||
let metadata_ctx = ctx.child_ctx("apply_predicate_to_metadata");
|
||||
let metadata_ctx = &metadata_ctx; // needed to use inside the move closure
|
||||
|
||||
let table_predicates = rpc_predicate
|
||||
.table_predicates(namespace.as_meta())
|
||||
.context(CreatingPredicatesSnafu)?;
|
||||
let tables: Vec<_> =
|
||||
table_chunk_stream(Arc::clone(&namespace), false, &table_predicates, &ctx)
|
||||
.try_filter_map(|(table_name, predicate, chunks)| async move {
|
||||
// Identify which chunks can answer from its metadata and then record its table,
|
||||
// and which chunks needs full plan and group them into their table
|
||||
let mut chunks_full = vec![];
|
||||
.try_filter_map(
|
||||
|(table_name, table_schema, table_predicate, chunks)| async move {
|
||||
let chunks_full = prune_chunks(&table_schema, chunks, table_predicate);
|
||||
|
||||
for chunk in cheap_chunk_first(chunks) {
|
||||
trace!(chunk_id=%chunk.id(), %table_name, "Considering table");
|
||||
|
||||
// If the chunk has delete predicates, we need to scan (do full plan) the data to eliminate
|
||||
// deleted data before we can determine if its table participates in the requested predicate.
|
||||
if chunk.has_delete_predicates() {
|
||||
chunks_full.push(chunk);
|
||||
} else {
|
||||
// Try and apply the predicate using only metadata
|
||||
let pred_result = chunk
|
||||
.apply_predicate_to_metadata(metadata_ctx, predicate)
|
||||
.context(CheckingChunkPredicateSnafu {
|
||||
chunk_id: chunk.id(),
|
||||
})?;
|
||||
|
||||
match pred_result {
|
||||
PredicateMatch::AtLeastOneNonNullField => {
|
||||
trace!("Metadata predicate: table matches");
|
||||
// Meta data of the table covers predicates of the request
|
||||
return Ok(Some((table_name, None)));
|
||||
}
|
||||
PredicateMatch::Unknown => {
|
||||
trace!("Metadata predicate: unknown match");
|
||||
// We cannot match the predicate to get answer from meta data, let do full plan
|
||||
chunks_full.push(chunk);
|
||||
}
|
||||
PredicateMatch::Zero => {
|
||||
trace!("Metadata predicate: zero rows match");
|
||||
} // this chunk's table does not participate in the request
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((!chunks_full.is_empty())
|
||||
.then_some((table_name, Some((predicate, chunks_full)))))
|
||||
})
|
||||
Ok((!chunks_full.is_empty())
|
||||
.then_some((table_name, Some((table_predicate, chunks_full)))))
|
||||
},
|
||||
)
|
||||
.try_collect()
|
||||
.await?;
|
||||
|
||||
|
@ -375,7 +339,7 @@ impl InfluxRpcPlanner {
|
|||
&table_predicates_need_chunks,
|
||||
&ctx,
|
||||
)
|
||||
.and_then(|(table_name, predicate, chunks)| {
|
||||
.and_then(|(table_name, table_schema, predicate, chunks)| {
|
||||
let mut ctx = ctx.child_ctx("table");
|
||||
ctx.set_metadata("table", table_name.to_string());
|
||||
|
||||
|
@ -383,18 +347,8 @@ impl InfluxRpcPlanner {
|
|||
let mut chunks_full = vec![];
|
||||
let mut known_columns = BTreeSet::new();
|
||||
|
||||
let chunks = prune_chunks(&table_schema, chunks, predicate);
|
||||
for chunk in cheap_chunk_first(chunks) {
|
||||
// Try and apply the predicate using only metadata
|
||||
let pred_result = chunk.apply_predicate_to_metadata(&ctx, predicate).context(
|
||||
CheckingChunkPredicateSnafu {
|
||||
chunk_id: chunk.id(),
|
||||
},
|
||||
)?;
|
||||
|
||||
if matches!(pred_result, PredicateMatch::Zero) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// get only tag columns from metadata
|
||||
let schema = chunk.schema();
|
||||
|
||||
|
@ -522,31 +476,18 @@ impl InfluxRpcPlanner {
|
|||
table_predicates_filtered.push((table_name, predicate));
|
||||
}
|
||||
|
||||
let metadata_ctx = ctx.child_ctx("apply_predicate_to_metadata");
|
||||
let metadata_ctx = &metadata_ctx; // needed to use inside the move closure
|
||||
|
||||
let tables: Vec<_> = table_chunk_stream(
|
||||
Arc::clone(&namespace),
|
||||
false,
|
||||
&table_predicates_filtered,
|
||||
&ctx,
|
||||
)
|
||||
.and_then(|(table_name, predicate, chunks)| async move {
|
||||
.and_then(|(table_name, table_schema, predicate, chunks)| async move {
|
||||
let mut chunks_full = vec![];
|
||||
let mut known_values = BTreeSet::new();
|
||||
|
||||
let chunks = prune_chunks(&table_schema, chunks, predicate);
|
||||
for chunk in cheap_chunk_first(chunks) {
|
||||
// Try and apply the predicate using only metadata
|
||||
let pred_result = chunk
|
||||
.apply_predicate_to_metadata(metadata_ctx, predicate)
|
||||
.context(CheckingChunkPredicateSnafu {
|
||||
chunk_id: chunk.id(),
|
||||
})?;
|
||||
|
||||
if matches!(pred_result, PredicateMatch::Zero) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// use schema to validate column type
|
||||
let schema = chunk.schema();
|
||||
|
||||
|
@ -1363,23 +1304,37 @@ fn table_chunk_stream<'a>(
|
|||
need_fields: bool,
|
||||
table_predicates: &'a [(Arc<str>, Predicate)],
|
||||
ctx: &'a IOxSessionContext,
|
||||
) -> impl Stream<Item = Result<(&'a Arc<str>, &'a Predicate, Vec<Arc<dyn QueryChunk>>)>> + 'a {
|
||||
) -> impl Stream<
|
||||
Item = Result<(
|
||||
&'a Arc<str>,
|
||||
Arc<Schema>,
|
||||
&'a Predicate,
|
||||
Vec<Arc<dyn QueryChunk>>,
|
||||
)>,
|
||||
> + 'a {
|
||||
let namespace2 = Arc::clone(&namespace);
|
||||
futures::stream::iter(table_predicates)
|
||||
.map(move |(table_name, predicate)| {
|
||||
.filter_map(move |(table_name, predicate)| {
|
||||
let namespace = Arc::clone(&namespace);
|
||||
|
||||
async move {
|
||||
let Some(table_schema) = namespace.table_schema(table_name) else {
|
||||
return None;
|
||||
};
|
||||
let table_schema = Arc::new(table_schema);
|
||||
Some((table_name, table_schema, predicate))
|
||||
}
|
||||
})
|
||||
.map(move |(table_name, table_schema, predicate)| {
|
||||
let mut ctx = ctx.child_ctx("table");
|
||||
ctx.set_metadata("table", table_name.to_string());
|
||||
|
||||
let namespace = Arc::clone(&namespace);
|
||||
|
||||
let table_schema = namespace.table_schema(table_name);
|
||||
let projection = match table_schema {
|
||||
Some(table_schema) => {
|
||||
columns_in_predicates(need_fields, &table_schema, table_name, predicate)
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
let namespace = Arc::clone(&namespace2);
|
||||
|
||||
async move {
|
||||
let projection =
|
||||
columns_in_predicates(need_fields, &table_schema, table_name, predicate);
|
||||
|
||||
let chunks = namespace
|
||||
.chunks(
|
||||
table_name,
|
||||
|
@ -1392,7 +1347,7 @@ fn table_chunk_stream<'a>(
|
|||
table_name: table_name.as_ref(),
|
||||
})?;
|
||||
|
||||
Ok((table_name, predicate, chunks))
|
||||
Ok((table_name, Arc::clone(&table_schema), predicate, chunks))
|
||||
}
|
||||
})
|
||||
.buffered(CONCURRENT_TABLE_JOBS)
|
||||
|
@ -1497,12 +1452,9 @@ where
|
|||
+ Sync,
|
||||
P: Send,
|
||||
{
|
||||
let metadata_ctx = ctx.child_ctx("apply_predicate_to_metadata");
|
||||
let metadata_ctx = &metadata_ctx; // needed to use inside the move closure
|
||||
|
||||
table_chunk_stream(Arc::clone(&namespace), true, table_predicates, &ctx)
|
||||
.and_then(|(table_name, predicate, chunks)| async move {
|
||||
let chunks = prune_chunks_metadata(metadata_ctx, chunks, predicate)?;
|
||||
.and_then(|(table_name, table_schema, predicate, chunks)| async move {
|
||||
let chunks = prune_chunks(&table_schema, chunks, predicate);
|
||||
Ok((table_name, predicate, chunks))
|
||||
})
|
||||
// rustc seems to heavily confused about the filter step here, esp. it dislikes `.try_filter` and even
|
||||
|
@ -1532,33 +1484,6 @@ where
|
|||
.await
|
||||
}
|
||||
|
||||
/// Prunes the provided list of chunks using [`QueryChunk::apply_predicate_to_metadata`]
|
||||
///
|
||||
/// TODO: Should this logic live with the rest of the chunk pruning logic?
|
||||
fn prune_chunks_metadata(
|
||||
ctx: &IOxSessionContext,
|
||||
chunks: Vec<Arc<dyn QueryChunk>>,
|
||||
predicate: &Predicate,
|
||||
) -> Result<Vec<Arc<dyn QueryChunk>>> {
|
||||
let mut filtered = Vec::with_capacity(chunks.len());
|
||||
for chunk in chunks {
|
||||
// Try and apply the predicate using only metadata
|
||||
let pred_result = chunk.apply_predicate_to_metadata(ctx, predicate).context(
|
||||
CheckingChunkPredicateSnafu {
|
||||
chunk_id: chunk.id(),
|
||||
},
|
||||
)?;
|
||||
|
||||
trace!(?pred_result, chunk_id=?chunk.id(), "applied predicate to metadata");
|
||||
|
||||
if !matches!(pred_result, PredicateMatch::Zero) {
|
||||
filtered.push(chunk)
|
||||
}
|
||||
}
|
||||
|
||||
Ok(filtered)
|
||||
}
|
||||
|
||||
/// Return a `Vec` of `Exprs` such that it starts with `prefix` cols and
|
||||
/// then has all columns in `schema` that are not already in the prefix.
|
||||
fn project_exprs_in_schema(prefix: &[&str], schema: &DFSchemaRef) -> Vec<Expr> {
|
||||
|
@ -1890,6 +1815,25 @@ fn cheap_chunk_first(mut chunks: Vec<Arc<dyn QueryChunk>>) -> Vec<Arc<dyn QueryC
|
|||
chunks
|
||||
}
|
||||
|
||||
fn prune_chunks(
|
||||
table_schema: &Schema,
|
||||
chunks: Vec<Arc<dyn QueryChunk>>,
|
||||
predicate: &Predicate,
|
||||
) -> Vec<Arc<dyn QueryChunk>> {
|
||||
use iox_query::pruning::prune_chunks;
|
||||
|
||||
let Ok(mask) = prune_chunks(table_schema, &chunks, predicate) else {
|
||||
return chunks;
|
||||
};
|
||||
|
||||
chunks
|
||||
.into_iter()
|
||||
.zip(mask)
|
||||
.filter(|(c, m)| *m && !c.has_delete_predicates())
|
||||
.map(|(c, _m)| c)
|
||||
.collect()
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datafusion::{
|
||||
|
@ -2028,10 +1972,10 @@ mod tests {
|
|||
assert!(!result.is_empty());
|
||||
assert_eq!(result.len(), 1);
|
||||
assert_eq!(result[0].0.as_ref(), "h2o"); // table name
|
||||
assert_eq!(result[0].2.len(), 1); // returned chunks
|
||||
assert_eq!(result[0].3.len(), 1); // returned chunks
|
||||
|
||||
// chunk schema includes all 5 columns of the table because we asked it return all fileds (and implicit PK) even though the predicate is on `foo` only
|
||||
let chunk = &result[0].2[0];
|
||||
let chunk = &result[0].3[0];
|
||||
let chunk_schema = (*chunk.schema()).clone();
|
||||
assert_eq!(chunk_schema.len(), 5);
|
||||
let chunk_schema = chunk_schema.sort_fields_by_name();
|
||||
|
@ -2056,10 +2000,10 @@ mod tests {
|
|||
assert!(!result.is_empty());
|
||||
assert_eq!(result.len(), 1);
|
||||
assert_eq!(result[0].0.as_ref(), "h2o"); // table name
|
||||
assert_eq!(result[0].2.len(), 1); // returned chunks
|
||||
assert_eq!(result[0].3.len(), 1); // returned chunks
|
||||
|
||||
// chunk schema includes still includes everything (the test table implementation does NOT project chunks)
|
||||
let chunk = &result[0].2[0];
|
||||
let chunk = &result[0].3[0];
|
||||
let chunk_schema = (*chunk.schema()).clone();
|
||||
assert_eq!(chunk_schema.len(), 5);
|
||||
let chunk_schema = chunk_schema.sort_fields_by_name();
|
||||
|
@ -2102,10 +2046,10 @@ mod tests {
|
|||
assert!(!result.is_empty());
|
||||
assert_eq!(result.len(), 1);
|
||||
assert_eq!(result[0].0.as_ref(), "h2o"); // table name
|
||||
assert_eq!(result[0].2.len(), 1); // returned chunks
|
||||
assert_eq!(result[0].3.len(), 1); // returned chunks
|
||||
|
||||
// chunk schema includes all 5 columns of the table because the preidcate is empty
|
||||
let chunk = &result[0].2[0];
|
||||
let chunk = &result[0].3[0];
|
||||
let chunk_schema = (*chunk.schema()).clone();
|
||||
assert_eq!(chunk_schema.len(), 5);
|
||||
let chunk_schema = chunk_schema.sort_fields_by_name();
|
||||
|
@ -2129,10 +2073,10 @@ mod tests {
|
|||
assert!(!result.is_empty());
|
||||
assert_eq!(result.len(), 1);
|
||||
assert_eq!(result[0].0.as_ref(), "h2o"); // table name
|
||||
assert_eq!(result[0].2.len(), 1); // returned chunks
|
||||
assert_eq!(result[0].3.len(), 1); // returned chunks
|
||||
|
||||
// chunk schema includes all 5 columns of the table because the preidcate is empty
|
||||
let chunk = &result[0].2[0];
|
||||
let chunk = &result[0].3[0];
|
||||
let chunk_schema = (*chunk.schema()).clone();
|
||||
assert_eq!(chunk_schema.len(), 5);
|
||||
}
|
||||
|
@ -2168,10 +2112,10 @@ mod tests {
|
|||
assert!(!result.is_empty());
|
||||
assert_eq!(result.len(), 1);
|
||||
assert_eq!(result[0].0.as_ref(), "h2o"); // table name
|
||||
assert_eq!(result[0].2.len(), 1); // returned chunks
|
||||
assert_eq!(result[0].3.len(), 1); // returned chunks
|
||||
|
||||
// Since no data, we do not do pushdown in the test chunk.
|
||||
let chunk = &result[0].2[0];
|
||||
let chunk = &result[0].3[0];
|
||||
let chunk_schema = (*chunk.schema()).clone();
|
||||
assert_eq!(chunk_schema.len(), 5);
|
||||
let chunk_schema = chunk_schema.sort_fields_by_name();
|
||||
|
@ -2217,10 +2161,10 @@ mod tests {
|
|||
assert!(!result.is_empty());
|
||||
assert_eq!(result.len(), 1);
|
||||
assert_eq!(result[0].0.as_ref(), "h2o"); // table name
|
||||
assert_eq!(result[0].2.len(), 1); // returned chunks
|
||||
assert_eq!(result[0].3.len(), 1); // returned chunks
|
||||
|
||||
// chunk schema includes everything (test table does NOT perform any projection)
|
||||
let chunk = &result[0].2[0];
|
||||
let chunk = &result[0].3[0];
|
||||
let chunk_schema = (*chunk.schema()).clone();
|
||||
assert_eq!(chunk_schema.len(), 5);
|
||||
let chunk_schema = chunk_schema.sort_fields_by_name();
|
||||
|
@ -2250,10 +2194,10 @@ mod tests {
|
|||
assert!(!result.is_empty());
|
||||
assert_eq!(result.len(), 1);
|
||||
assert_eq!(result[0].0.as_ref(), "h2o"); // table name
|
||||
assert_eq!(result[0].2.len(), 1); // returned chunks
|
||||
assert_eq!(result[0].3.len(), 1); // returned chunks
|
||||
|
||||
// chunk schema includes everything (test table does NOT perform any projection)
|
||||
let chunk = &result[0].2[0];
|
||||
let chunk = &result[0].3[0];
|
||||
let chunk_schema = (*chunk.schema()).clone();
|
||||
assert_eq!(chunk_schema.len(), 5);
|
||||
let chunk_schema = chunk_schema.sort_fields_by_name();
|
||||
|
@ -2296,10 +2240,10 @@ mod tests {
|
|||
assert!(!result.is_empty());
|
||||
assert_eq!(result.len(), 1);
|
||||
assert_eq!(result[0].0.as_ref(), "h2o"); // table name
|
||||
assert_eq!(result[0].2.len(), 1); // returned chunks
|
||||
assert_eq!(result[0].3.len(), 1); // returned chunks
|
||||
|
||||
// chunk schema includes all 5 columns since we hit the unknown columnd
|
||||
let chunk = &result[0].2[0];
|
||||
// chunk schema includes all 5 columns since we hit the unknown column
|
||||
let chunk = &result[0].3[0];
|
||||
let chunk_schema = (*chunk.schema()).clone();
|
||||
assert_eq!(chunk_schema.len(), 5);
|
||||
let chunk_schema = chunk_schema.sort_fields_by_name();
|
||||
|
|
|
@ -15,11 +15,8 @@ pub mod delete_expr;
|
|||
pub mod delete_predicate;
|
||||
pub mod rpc_predicate;
|
||||
|
||||
use arrow::{
|
||||
array::{
|
||||
BooleanArray, Float64Array, Int64Array, StringArray, TimestampNanosecondArray, UInt64Array,
|
||||
},
|
||||
datatypes::SchemaRef,
|
||||
use arrow::array::{
|
||||
BooleanArray, Float64Array, Int64Array, StringArray, TimestampNanosecondArray, UInt64Array,
|
||||
};
|
||||
use data_types::{InfluxDbType, TableSummary, TimestampRange};
|
||||
use datafusion::{
|
||||
|
@ -27,11 +24,10 @@ use datafusion::{
|
|||
error::DataFusionError,
|
||||
logical_expr::{binary_expr, utils::expr_to_columns, BinaryExpr},
|
||||
optimizer::utils::split_conjunction,
|
||||
physical_expr::execution_props::ExecutionProps,
|
||||
physical_optimizer::pruning::PruningStatistics,
|
||||
prelude::{col, lit_timestamp_nano, Expr},
|
||||
};
|
||||
use datafusion_util::{create_pruning_predicate, make_range_expr, nullable_schema, AsExpr};
|
||||
use datafusion_util::{make_range_expr, AsExpr};
|
||||
use observability_deps::tracing::debug;
|
||||
use rpc_predicate::VALUE_COLUMN_NAME;
|
||||
use schema::TIME_COLUMN_NAME;
|
||||
|
@ -243,61 +239,6 @@ impl Predicate {
|
|||
|
||||
self
|
||||
}
|
||||
|
||||
/// Apply predicate to given table summary and avoid having to
|
||||
/// look at actual data.
|
||||
pub fn apply_to_table_summary(
|
||||
&self,
|
||||
props: &ExecutionProps,
|
||||
table_summary: &TableSummary,
|
||||
schema: SchemaRef,
|
||||
) -> PredicateMatch {
|
||||
let summary = SummaryWrapper {
|
||||
summary: table_summary,
|
||||
};
|
||||
|
||||
// If we don't have statistics for a particular column, its
|
||||
// value will be null, so we need to ensure the schema we used
|
||||
// in pruning predicates allows for null.
|
||||
let schema = nullable_schema(schema);
|
||||
|
||||
if let Some(expr) = self.filter_expr() {
|
||||
match create_pruning_predicate(props, &expr, &schema) {
|
||||
Ok(pp) => {
|
||||
match pp.prune(&summary) {
|
||||
Ok(matched) => {
|
||||
assert_eq!(matched.len(), 1);
|
||||
if matched[0] {
|
||||
// might match
|
||||
return PredicateMatch::Unknown;
|
||||
} else {
|
||||
// does not match => since expressions are `AND`ed, we know that we will have zero matches
|
||||
return PredicateMatch::Zero;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
%e,
|
||||
%expr,
|
||||
"cannot prune summary with PruningPredicate",
|
||||
);
|
||||
return PredicateMatch::Unknown;
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
debug!(
|
||||
%e,
|
||||
%expr,
|
||||
"cannot create PruningPredicate from expression",
|
||||
);
|
||||
return PredicateMatch::Unknown;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
PredicateMatch::Unknown
|
||||
}
|
||||
}
|
||||
|
||||
struct SummaryWrapper<'a> {
|
||||
|
@ -400,21 +341,6 @@ impl fmt::Display for Predicate {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
/// The result of evaluating a predicate on a set of rows
|
||||
pub enum PredicateMatch {
|
||||
/// There is at least one row that matches the predicate that has
|
||||
/// at least one non null value in each field of the predicate
|
||||
AtLeastOneNonNullField,
|
||||
|
||||
/// There are exactly zero rows that match the predicate
|
||||
Zero,
|
||||
|
||||
/// There *may* be rows that match, OR there *may* be no rows that
|
||||
/// match
|
||||
Unknown,
|
||||
}
|
||||
|
||||
impl Predicate {
|
||||
/// Sets the timestamp range
|
||||
pub fn with_range(mut self, start: i64, end: i64) -> Self {
|
||||
|
@ -674,10 +600,9 @@ impl TreeNodeVisitor for RowBasedVisitor {
|
|||
mod tests {
|
||||
use super::*;
|
||||
use arrow::datatypes::DataType as ArrowDataType;
|
||||
use data_types::{ColumnSummary, InfluxDbType, StatValues, MAX_NANO_TIME, MIN_NANO_TIME};
|
||||
use data_types::{MAX_NANO_TIME, MIN_NANO_TIME};
|
||||
use datafusion::prelude::{col, cube, lit};
|
||||
use schema::builder::SchemaBuilder;
|
||||
use test_helpers::maybe_start_logging;
|
||||
|
||||
#[test]
|
||||
fn test_default_predicate_is_empty() {
|
||||
|
@ -787,166 +712,6 @@ mod tests {
|
|||
assert_eq!(p.with_clear_timestamp_if_max_range(), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_apply_to_table_summary() {
|
||||
maybe_start_logging();
|
||||
let props = ExecutionProps::new();
|
||||
|
||||
let p = Predicate::new()
|
||||
.with_range(100, 200)
|
||||
.with_expr(col("foo").eq(lit(42i64)))
|
||||
.with_expr(col("bar").eq(lit(42i64)))
|
||||
.with_expr(col(TIME_COLUMN_NAME).gt(lit_timestamp_nano(120i64)));
|
||||
|
||||
let schema = SchemaBuilder::new()
|
||||
.field("foo", ArrowDataType::Int64)
|
||||
.unwrap()
|
||||
.field("bar", ArrowDataType::Int64)
|
||||
.unwrap()
|
||||
.timestamp()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let summary = TableSummary {
|
||||
columns: vec![ColumnSummary {
|
||||
name: "foo".to_owned(),
|
||||
influxdb_type: InfluxDbType::Field,
|
||||
stats: data_types::Statistics::I64(StatValues {
|
||||
min: Some(10),
|
||||
max: Some(20),
|
||||
null_count: Some(0),
|
||||
total_count: 1_000,
|
||||
distinct_count: None,
|
||||
}),
|
||||
}],
|
||||
};
|
||||
assert_eq!(
|
||||
p.apply_to_table_summary(&props, &summary, schema.as_arrow()),
|
||||
PredicateMatch::Zero,
|
||||
);
|
||||
|
||||
let summary = TableSummary {
|
||||
columns: vec![ColumnSummary {
|
||||
name: "foo".to_owned(),
|
||||
influxdb_type: InfluxDbType::Field,
|
||||
stats: data_types::Statistics::I64(StatValues {
|
||||
min: Some(10),
|
||||
max: Some(50),
|
||||
null_count: Some(0),
|
||||
total_count: 1_000,
|
||||
distinct_count: None,
|
||||
}),
|
||||
}],
|
||||
};
|
||||
assert_eq!(
|
||||
p.apply_to_table_summary(&props, &summary, schema.as_arrow()),
|
||||
PredicateMatch::Unknown,
|
||||
);
|
||||
|
||||
let summary = TableSummary {
|
||||
columns: vec![ColumnSummary {
|
||||
name: TIME_COLUMN_NAME.to_owned(),
|
||||
influxdb_type: InfluxDbType::Timestamp,
|
||||
stats: data_types::Statistics::I64(StatValues {
|
||||
min: Some(115),
|
||||
max: Some(115),
|
||||
null_count: Some(0),
|
||||
total_count: 1_000,
|
||||
distinct_count: None,
|
||||
}),
|
||||
}],
|
||||
};
|
||||
assert_eq!(
|
||||
p.apply_to_table_summary(&props, &summary, schema.as_arrow()),
|
||||
PredicateMatch::Zero,
|
||||
);
|
||||
|
||||
let summary = TableSummary {
|
||||
columns: vec![ColumnSummary {
|
||||
name: TIME_COLUMN_NAME.to_owned(),
|
||||
influxdb_type: InfluxDbType::Timestamp,
|
||||
stats: data_types::Statistics::I64(StatValues {
|
||||
min: Some(300),
|
||||
max: Some(300),
|
||||
null_count: Some(0),
|
||||
total_count: 1_000,
|
||||
distinct_count: None,
|
||||
}),
|
||||
}],
|
||||
};
|
||||
assert_eq!(
|
||||
p.apply_to_table_summary(&props, &summary, schema.as_arrow()),
|
||||
PredicateMatch::Zero,
|
||||
);
|
||||
|
||||
let summary = TableSummary {
|
||||
columns: vec![ColumnSummary {
|
||||
name: TIME_COLUMN_NAME.to_owned(),
|
||||
influxdb_type: InfluxDbType::Timestamp,
|
||||
stats: data_types::Statistics::I64(StatValues {
|
||||
min: Some(150),
|
||||
max: Some(300),
|
||||
null_count: Some(0),
|
||||
total_count: 1_000,
|
||||
distinct_count: None,
|
||||
}),
|
||||
}],
|
||||
};
|
||||
assert_eq!(
|
||||
p.apply_to_table_summary(&props, &summary, schema.as_arrow()),
|
||||
PredicateMatch::Unknown,
|
||||
)
|
||||
}
|
||||
|
||||
/// Test that pruning works even when some expressions within the predicate cannot be evaluated by DataFusion
|
||||
#[test]
|
||||
fn test_apply_to_table_summary_partially_unsupported() {
|
||||
maybe_start_logging();
|
||||
let props = ExecutionProps::new();
|
||||
|
||||
let p = Predicate::new()
|
||||
.with_range(100, 200)
|
||||
.with_expr(col("foo").eq(lit(42i64)).not()); // NOT expressions are currently mostly unsupported by DataFusion
|
||||
|
||||
let schema = SchemaBuilder::new()
|
||||
.field("foo", ArrowDataType::Int64)
|
||||
.unwrap()
|
||||
.timestamp()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let summary = TableSummary {
|
||||
columns: vec![
|
||||
ColumnSummary {
|
||||
name: TIME_COLUMN_NAME.to_owned(),
|
||||
influxdb_type: InfluxDbType::Timestamp,
|
||||
stats: data_types::Statistics::I64(StatValues {
|
||||
min: Some(10),
|
||||
max: Some(20),
|
||||
null_count: Some(0),
|
||||
total_count: 1_000,
|
||||
distinct_count: None,
|
||||
}),
|
||||
},
|
||||
ColumnSummary {
|
||||
name: "foo".to_owned(),
|
||||
influxdb_type: InfluxDbType::Field,
|
||||
stats: data_types::Statistics::I64(StatValues {
|
||||
min: Some(10),
|
||||
max: Some(20),
|
||||
null_count: Some(0),
|
||||
total_count: 1_000,
|
||||
distinct_count: None,
|
||||
}),
|
||||
},
|
||||
],
|
||||
};
|
||||
assert_eq!(
|
||||
p.apply_to_table_summary(&props, &summary, schema.as_arrow()),
|
||||
PredicateMatch::Zero,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_push_through_dedup() {
|
||||
let schema = SchemaBuilder::default()
|
||||
|
|
|
@ -1718,9 +1718,6 @@ where
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use data_types::ChunkId;
|
||||
use datafusion::prelude::{col, Expr};
|
||||
use datafusion_util::lit_dict;
|
||||
use futures::Future;
|
||||
use generated_types::{
|
||||
google::rpc::Status as GrpcStatus, i_ox_testing_client::IOxTestingClient,
|
||||
|
@ -1734,7 +1731,6 @@ mod tests {
|
|||
use iox_query::test::TestChunk;
|
||||
use metric::{Attributes, Metric, U64Counter, U64Gauge};
|
||||
use panic_logging::SendPanicsToTracing;
|
||||
use predicate::{Predicate, PredicateMatch};
|
||||
use service_common::test_util::TestDatabaseStore;
|
||||
use std::{
|
||||
any::Any,
|
||||
|
@ -1820,11 +1816,15 @@ mod tests {
|
|||
|
||||
let chunk0 = TestChunk::new("h2o")
|
||||
.with_id(0)
|
||||
.with_predicate_match(PredicateMatch::AtLeastOneNonNullField);
|
||||
.with_tag_column("state")
|
||||
.with_time_column_with_stats(Some(1000), Some(1000))
|
||||
.with_one_row_of_data();
|
||||
|
||||
let chunk1 = TestChunk::new("o2")
|
||||
.with_id(1)
|
||||
.with_predicate_match(PredicateMatch::AtLeastOneNonNullField);
|
||||
.with_tag_column("state")
|
||||
.with_time_column_with_stats(Some(1000), Some(1000))
|
||||
.with_one_row_of_data();
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -1852,8 +1852,8 @@ mod tests {
|
|||
|
||||
// --- Timestamp range
|
||||
let range = TimestampRange {
|
||||
start: 150,
|
||||
end: 200,
|
||||
start: 900,
|
||||
end: 1100,
|
||||
};
|
||||
let request = MeasurementNamesRequest {
|
||||
source,
|
||||
|
@ -1869,25 +1869,12 @@ mod tests {
|
|||
let expected_measurements = to_string_vec(&["h2o", "o2"]);
|
||||
assert_eq!(actual_measurements, expected_measurements);
|
||||
|
||||
// also ensure the plumbing is hooked correctly and that the predicate made it
|
||||
// down to the chunk
|
||||
let expected_predicate = Predicate::default().with_range(150, 200);
|
||||
|
||||
fixture
|
||||
.expect_predicates(
|
||||
db_info.db_name(),
|
||||
"my_partition_key",
|
||||
0,
|
||||
&expected_predicate,
|
||||
)
|
||||
.await;
|
||||
|
||||
// --- general predicate
|
||||
let request = MeasurementNamesRequest {
|
||||
source: Some(StorageClient::read_source(&db_info, 1)),
|
||||
range: Some(TimestampRange {
|
||||
start: 150,
|
||||
end: 200,
|
||||
start: 900,
|
||||
end: 1100,
|
||||
}),
|
||||
predicate: Some(make_state_eq_ma_predicate()),
|
||||
};
|
||||
|
@ -1947,21 +1934,6 @@ mod tests {
|
|||
|
||||
assert_eq!(actual_tag_keys, expected_tag_keys,);
|
||||
|
||||
// also ensure the plumbing is hooked correctly and that the predicate made it
|
||||
// down to the chunk
|
||||
let expected_predicate = Predicate::default()
|
||||
.with_range(150, 200)
|
||||
.with_expr(make_state_ma_expr());
|
||||
|
||||
fixture
|
||||
.expect_predicates(
|
||||
db_info.db_name(),
|
||||
"my_partition_key",
|
||||
0,
|
||||
&expected_predicate,
|
||||
)
|
||||
.await;
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "TagKeys", "ok", 1);
|
||||
}
|
||||
|
||||
|
@ -2052,21 +2024,6 @@ mod tests {
|
|||
"unexpected tag keys while getting column names"
|
||||
);
|
||||
|
||||
// also ensure the plumbing is hooked correctly and that the predicate made it
|
||||
// down to the chunk
|
||||
let expected_predicate = Predicate::default()
|
||||
.with_range(150, 200)
|
||||
.with_expr(make_state_ma_expr());
|
||||
|
||||
fixture
|
||||
.expect_predicates(
|
||||
db_info.db_name(),
|
||||
"my_partition_key",
|
||||
0,
|
||||
&expected_predicate,
|
||||
)
|
||||
.await;
|
||||
|
||||
grpc_request_metric_has_count(&fixture, "MeasurementTagKeys", "ok", 1);
|
||||
}
|
||||
|
||||
|
@ -2169,8 +2126,10 @@ mod tests {
|
|||
tag_key: [0].into(),
|
||||
};
|
||||
|
||||
let chunk =
|
||||
TestChunk::new("h2o").with_predicate_match(PredicateMatch::AtLeastOneNonNullField);
|
||||
let chunk = TestChunk::new("h2o")
|
||||
.with_tag_column("tag")
|
||||
.with_time_column_with_stats(Some(1100), Some(1200))
|
||||
.with_one_row_of_data();
|
||||
|
||||
fixture
|
||||
.test_storage
|
||||
|
@ -2741,24 +2700,6 @@ mod tests {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
// also ensure the plumbing is hooked correctly and that the predicate made it
|
||||
// down to the chunk and it was normalized to namevalue
|
||||
let expected_predicate = Predicate::default()
|
||||
.with_range(0, 10000)
|
||||
// should NOT have CASE nonsense for handling empty strings as
|
||||
// that should bave been optimized by the time it gets to
|
||||
// the chunk
|
||||
.with_expr(col("state").eq(lit_dict("MA")));
|
||||
|
||||
fixture
|
||||
.expect_predicates(
|
||||
db_info.db_name(),
|
||||
"my_partition_key",
|
||||
0,
|
||||
&expected_predicate,
|
||||
)
|
||||
.await;
|
||||
|
||||
// TODO: encode the actual output in the test case or something
|
||||
assert_eq!(
|
||||
frames.len(),
|
||||
|
@ -2803,24 +2744,6 @@ mod tests {
|
|||
.read_filter(request.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// also ensure the plumbing is hooked correctly and that the predicate made it
|
||||
// down to the chunk and it was normalized to namevalue
|
||||
let expected_predicate = Predicate::default()
|
||||
.with_range(0, 10000)
|
||||
// comparison to empty string conversion results in a messier translation
|
||||
// to handle backwards compatibility semantics
|
||||
// #state IS NULL OR #state = Utf8("")
|
||||
.with_expr(col("state").is_null().or(col("state").eq(lit_dict(""))));
|
||||
|
||||
fixture
|
||||
.expect_predicates(
|
||||
db_info.db_name(),
|
||||
"my_partition_key",
|
||||
0,
|
||||
&expected_predicate,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
|
@ -3787,13 +3710,6 @@ mod tests {
|
|||
generated_types::Predicate { root: Some(root) }
|
||||
}
|
||||
|
||||
/// return an DataFusion Expr predicate like
|
||||
///
|
||||
/// state="MA"
|
||||
fn make_state_ma_expr() -> Expr {
|
||||
col("state").eq(lit_dict("MA"))
|
||||
}
|
||||
|
||||
/// Convert to a Vec<String> to facilitate comparison with results of client
|
||||
fn to_string_vec(v: &[&str]) -> Vec<String> {
|
||||
v.iter().map(|s| s.to_string()).collect()
|
||||
|
@ -3882,29 +3798,6 @@ mod tests {
|
|||
join_handle,
|
||||
})
|
||||
}
|
||||
|
||||
/// Gathers predicates applied to the specified chunks and
|
||||
/// asserts that `expected_predicate` is within it
|
||||
async fn expect_predicates(
|
||||
&self,
|
||||
db_name: &str,
|
||||
partition_key: &str,
|
||||
chunk_id: u128,
|
||||
expected_predicate: &predicate::Predicate,
|
||||
) {
|
||||
let actual_predicates = self
|
||||
.test_storage
|
||||
.db_or_create(db_name)
|
||||
.await
|
||||
.get_chunk(partition_key, ChunkId::new_test(chunk_id))
|
||||
.unwrap()
|
||||
.predicates();
|
||||
|
||||
assert!(
|
||||
actual_predicates.contains(expected_predicate),
|
||||
"\nActual: {actual_predicates:?}\nExpected: {expected_predicate:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for Fixture {
|
||||
|
|
Loading…
Reference in New Issue