Merge branch 'main' into cn/chunk-addr-smaller
commit
559a7e0221
|
@ -201,7 +201,7 @@ impl ChunkSummary {
|
|||
/// ID of a chunk.
|
||||
///
|
||||
/// This ID is unique within a single partition.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
|
||||
pub struct ChunkId(Uuid);
|
||||
|
||||
impl ChunkId {
|
||||
|
@ -225,9 +225,21 @@ impl ChunkId {
|
|||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for ChunkId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
<Self as std::fmt::Display>::fmt(self, f)
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for ChunkId {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
if (self.0.get_variant() == Some(uuid::Variant::RFC4122))
|
||||
&& (self.0.get_version() == Some(uuid::Version::Random))
|
||||
{
|
||||
f.debug_tuple("ChunkId").field(&self.0).finish()
|
||||
} else {
|
||||
f.debug_tuple("ChunkId").field(&self.0.as_u128()).finish()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -295,3 +307,41 @@ impl std::fmt::Display for ChunkOrder {
|
|||
f.debug_tuple("ChunkOrder").field(&self.0.get()).finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_chunk_id_new() {
|
||||
// `ChunkId::new()` create new random ID
|
||||
assert_ne!(ChunkId::new(), ChunkId::new());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_chunk_id_new_test() {
|
||||
// `ChunkId::new_test(...)` creates deterministic ID
|
||||
assert_eq!(ChunkId::new_test(1), ChunkId::new_test(1));
|
||||
assert_ne!(ChunkId::new_test(1), ChunkId::new_test(2));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_chunk_id_debug_and_display() {
|
||||
// Random chunk IDs use UUID-format
|
||||
let id_random = ChunkId::new();
|
||||
let inner: Uuid = id_random.get();
|
||||
assert_eq!(
|
||||
format!("{:?}", id_random),
|
||||
format!("ChunkId({})", inner.to_string())
|
||||
);
|
||||
assert_eq!(
|
||||
format!("{}", id_random),
|
||||
format!("ChunkId({})", inner.to_string())
|
||||
);
|
||||
|
||||
// Deterministic IDs use integer format
|
||||
let id_test = ChunkId::new_test(42);
|
||||
assert_eq!(format!("{:?}", id_test), "ChunkId(42)");
|
||||
assert_eq!(format!("{}", id_test), "ChunkId(42)");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -467,7 +467,7 @@ File {
|
|||
table_name: "table1",
|
||||
partition_key: "part1",
|
||||
chunk_id: ChunkId(
|
||||
00000000-0000-0000-0000-000000000000,
|
||||
0,
|
||||
),
|
||||
partition_checkpoint: PartitionCheckpoint {
|
||||
table_name: "table1",
|
||||
|
|
|
@ -7,13 +7,14 @@ use std::{
|
|||
use arrow::datatypes::{DataType, Field};
|
||||
use data_types::chunk_metadata::ChunkId;
|
||||
use datafusion::{
|
||||
error::DataFusionError,
|
||||
logical_plan::{Expr, LogicalPlan, LogicalPlanBuilder},
|
||||
error::{DataFusionError, Result as DatafusionResult},
|
||||
logical_plan::{lit, Column, Expr, ExprRewriter, LogicalPlan, LogicalPlanBuilder},
|
||||
optimizer::utils::expr_to_columns,
|
||||
prelude::col,
|
||||
};
|
||||
use datafusion_util::AsExpr;
|
||||
|
||||
use hashbrown::HashSet;
|
||||
use hashbrown::{HashMap, HashSet};
|
||||
use internal_types::{
|
||||
schema::{InfluxColumnType, Schema, TIME_COLUMN_NAME},
|
||||
selection::Selection,
|
||||
|
@ -35,10 +36,19 @@ use crate::{
|
|||
stringset::{Error as StringSetError, StringSetPlan, StringSetPlanBuilder},
|
||||
},
|
||||
provider::ProviderBuilder,
|
||||
util::schema_has_all_expr_columns,
|
||||
QueryChunk, QueryChunkMeta, QueryDatabase,
|
||||
};
|
||||
|
||||
/// Any column references to this name are rewritten to be
|
||||
/// the actual table name by the Influx gRPC planner.
|
||||
///
|
||||
/// This is required to support predicates like
|
||||
/// `_measurement = "foo" OR tag1 = "bar"`
|
||||
///
|
||||
/// The plan for each table will have the value of `_measurement`
|
||||
/// filled in with a literal for the respective name of that field
|
||||
pub const MEASUREMENT_COLUMN_NAME: &str = "_measurement";
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
pub enum Error {
|
||||
#[snafu(display("gRPC planner got error making table_name plan for chunk: {}", source))]
|
||||
|
@ -200,9 +210,13 @@ impl InfluxRpcPlanner {
|
|||
D: QueryDatabase + 'static,
|
||||
{
|
||||
let mut builder = StringSetPlanBuilder::new();
|
||||
let mut normalizer = PredicateNormalizer::new(predicate);
|
||||
|
||||
for chunk in database.chunks(&predicate) {
|
||||
for chunk in database.chunks(normalizer.unnormalized()) {
|
||||
// Try and apply the predicate using only metadata
|
||||
let table_name = chunk.table_name();
|
||||
let predicate = normalizer.normalized(table_name);
|
||||
|
||||
let pred_result = chunk
|
||||
.apply_predicate_to_metadata(&predicate)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
|
@ -216,15 +230,18 @@ impl InfluxRpcPlanner {
|
|||
PredicateMatch::Zero => builder,
|
||||
// can't evaluate predicate, need a new plan
|
||||
PredicateMatch::Unknown => {
|
||||
// TODO: General purpose plans for
|
||||
// table_names. For now, return an error
|
||||
// TODO: General purpose plans for table_names.
|
||||
// https://github.com/influxdata/influxdb_iox/issues/762
|
||||
debug!(
|
||||
chunk=%chunk.id().get(),
|
||||
?predicate,
|
||||
table_name=%chunk.table_name(),
|
||||
%table_name,
|
||||
"can not evaluate predicate"
|
||||
);
|
||||
return UnsupportedPredicateForTableNames { predicate }.fail();
|
||||
return UnsupportedPredicateForTableNames {
|
||||
predicate: predicate.as_ref().clone(),
|
||||
}
|
||||
.fail();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -254,9 +271,13 @@ impl InfluxRpcPlanner {
|
|||
// for that table but that we couldn't evaluate the predicate
|
||||
// entirely using the metadata
|
||||
let mut need_full_plans = BTreeMap::new();
|
||||
let mut normalizer = PredicateNormalizer::new(predicate);
|
||||
|
||||
let mut known_columns = BTreeSet::new();
|
||||
for chunk in database.chunks(&predicate) {
|
||||
for chunk in database.chunks(normalizer.unnormalized()) {
|
||||
let table_name = chunk.table_name();
|
||||
let predicate = normalizer.normalized(table_name);
|
||||
|
||||
// Try and apply the predicate using only metadata
|
||||
let pred_result = chunk
|
||||
.apply_predicate_to_metadata(&predicate)
|
||||
|
@ -268,7 +289,6 @@ impl InfluxRpcPlanner {
|
|||
if matches!(pred_result, PredicateMatch::Zero) {
|
||||
continue;
|
||||
}
|
||||
let table_name = chunk.table_name();
|
||||
|
||||
// get only tag columns from metadata
|
||||
let schema = chunk.schema();
|
||||
|
@ -327,7 +347,7 @@ impl InfluxRpcPlanner {
|
|||
let schema = database.table_schema(&table_name).context(TableRemoved {
|
||||
table_name: &table_name,
|
||||
})?;
|
||||
let plan = self.tag_keys_plan(&table_name, schema, &predicate, chunks)?;
|
||||
let plan = self.tag_keys_plan(&table_name, schema, &mut normalizer, chunks)?;
|
||||
|
||||
if let Some(plan) = plan {
|
||||
builder = builder.append(plan)
|
||||
|
@ -369,8 +389,12 @@ impl InfluxRpcPlanner {
|
|||
// entirely using the metadata
|
||||
let mut need_full_plans = BTreeMap::new();
|
||||
|
||||
let mut normalizer = PredicateNormalizer::new(predicate);
|
||||
let mut known_values = BTreeSet::new();
|
||||
for chunk in database.chunks(&predicate) {
|
||||
for chunk in database.chunks(normalizer.unnormalized()) {
|
||||
let table_name = chunk.table_name();
|
||||
let predicate = normalizer.normalized(table_name);
|
||||
|
||||
// Try and apply the predicate using only metadata
|
||||
let pred_result = chunk
|
||||
.apply_predicate_to_metadata(&predicate)
|
||||
|
@ -382,7 +406,6 @@ impl InfluxRpcPlanner {
|
|||
if matches!(pred_result, PredicateMatch::Zero) {
|
||||
continue;
|
||||
}
|
||||
let table_name = chunk.table_name();
|
||||
|
||||
// use schema to validate column type
|
||||
let schema = chunk.schema();
|
||||
|
@ -425,7 +448,7 @@ impl InfluxRpcPlanner {
|
|||
table_name,
|
||||
names=?names,
|
||||
chunk_id=%chunk.id().get(),
|
||||
"column values found from metadata",
|
||||
"tag values found from metadata",
|
||||
);
|
||||
known_values.append(&mut names);
|
||||
}
|
||||
|
@ -433,7 +456,7 @@ impl InfluxRpcPlanner {
|
|||
debug!(
|
||||
table_name,
|
||||
chunk_id=%chunk.id().get(),
|
||||
"need full plan to find column values"
|
||||
"need full plan to find tag values"
|
||||
);
|
||||
// can't get columns only from metadata, need
|
||||
// a general purpose plan
|
||||
|
@ -456,7 +479,8 @@ impl InfluxRpcPlanner {
|
|||
let schema = database.table_schema(&table_name).context(TableRemoved {
|
||||
table_name: &table_name,
|
||||
})?;
|
||||
let scan_and_filter = self.scan_and_filter(&table_name, schema, &predicate, chunks)?;
|
||||
let scan_and_filter =
|
||||
self.scan_and_filter(&table_name, schema, &mut normalizer, chunks)?;
|
||||
|
||||
// if we have any data to scan, make a plan!
|
||||
if let Some(TableScanAndFilter {
|
||||
|
@ -510,17 +534,20 @@ impl InfluxRpcPlanner {
|
|||
//
|
||||
// The executor then figures out which columns have non-null
|
||||
// values and stops the plan executing once it has them
|
||||
let mut normalizer = PredicateNormalizer::new(predicate);
|
||||
|
||||
// map table -> Vec<Arc<Chunk>>
|
||||
let chunks = database.chunks(&predicate);
|
||||
let table_chunks = self.group_chunks_by_table(&predicate, chunks)?;
|
||||
let chunks = database.chunks(normalizer.unnormalized());
|
||||
let table_chunks = self.group_chunks_by_table(&mut normalizer, chunks)?;
|
||||
|
||||
let mut field_list_plan = FieldListPlan::new();
|
||||
for (table_name, chunks) in table_chunks {
|
||||
let schema = database.table_schema(&table_name).context(TableRemoved {
|
||||
table_name: &table_name,
|
||||
})?;
|
||||
if let Some(plan) = self.field_columns_plan(&table_name, schema, &predicate, chunks)? {
|
||||
if let Some(plan) =
|
||||
self.field_columns_plan(&table_name, schema, &mut normalizer, chunks)?
|
||||
{
|
||||
field_list_plan = field_list_plan.append(plan);
|
||||
}
|
||||
}
|
||||
|
@ -554,10 +581,12 @@ impl InfluxRpcPlanner {
|
|||
{
|
||||
debug!(predicate=?predicate, "planning read_filter");
|
||||
|
||||
let mut normalizer = PredicateNormalizer::new(predicate);
|
||||
|
||||
// group tables by chunk, pruning if possible
|
||||
// key is table name, values are chunks
|
||||
let chunks = database.chunks(&predicate);
|
||||
let table_chunks = self.group_chunks_by_table(&predicate, chunks)?;
|
||||
let chunks = database.chunks(normalizer.unnormalized());
|
||||
let table_chunks = self.group_chunks_by_table(&mut normalizer, chunks)?;
|
||||
|
||||
// now, build up plans for each table
|
||||
let mut ss_plans = Vec::with_capacity(table_chunks.len());
|
||||
|
@ -568,7 +597,7 @@ impl InfluxRpcPlanner {
|
|||
})?;
|
||||
|
||||
let ss_plan =
|
||||
self.read_filter_plan(table_name, schema, prefix_columns, &predicate, chunks)?;
|
||||
self.read_filter_plan(table_name, schema, prefix_columns, &mut normalizer, chunks)?;
|
||||
// If we have to do real work, add it to the list of plans
|
||||
if let Some(ss_plan) = ss_plan {
|
||||
ss_plans.push(ss_plan);
|
||||
|
@ -594,9 +623,11 @@ impl InfluxRpcPlanner {
|
|||
{
|
||||
debug!(predicate=?predicate, agg=?agg, "planning read_group");
|
||||
|
||||
let mut normalizer = PredicateNormalizer::new(predicate);
|
||||
|
||||
// group tables by chunk, pruning if possible
|
||||
let chunks = database.chunks(&predicate);
|
||||
let table_chunks = self.group_chunks_by_table(&predicate, chunks)?;
|
||||
let chunks = database.chunks(normalizer.unnormalized());
|
||||
let table_chunks = self.group_chunks_by_table(&mut normalizer, chunks)?;
|
||||
let num_prefix_tag_group_columns = group_columns.len();
|
||||
|
||||
// now, build up plans for each table
|
||||
|
@ -610,13 +641,13 @@ impl InfluxRpcPlanner {
|
|||
table_name,
|
||||
Arc::clone(&schema),
|
||||
Some(group_columns),
|
||||
&predicate,
|
||||
&mut normalizer,
|
||||
chunks,
|
||||
)?,
|
||||
_ => self.read_group_plan(
|
||||
table_name,
|
||||
schema,
|
||||
&predicate,
|
||||
&mut normalizer,
|
||||
agg,
|
||||
group_columns,
|
||||
chunks,
|
||||
|
@ -654,9 +685,11 @@ impl InfluxRpcPlanner {
|
|||
"planning read_window_aggregate"
|
||||
);
|
||||
|
||||
let mut normalizer = PredicateNormalizer::new(predicate);
|
||||
|
||||
// group tables by chunk, pruning if possible
|
||||
let chunks = database.chunks(&predicate);
|
||||
let table_chunks = self.group_chunks_by_table(&predicate, chunks)?;
|
||||
let chunks = database.chunks(normalizer.unnormalized());
|
||||
let table_chunks = self.group_chunks_by_table(&mut normalizer, chunks)?;
|
||||
|
||||
// now, build up plans for each table
|
||||
let mut ss_plans = Vec::with_capacity(table_chunks.len());
|
||||
|
@ -665,7 +698,13 @@ impl InfluxRpcPlanner {
|
|||
table_name: &table_name,
|
||||
})?;
|
||||
let ss_plan = self.read_window_aggregate_plan(
|
||||
table_name, schema, &predicate, agg, &every, &offset, chunks,
|
||||
table_name,
|
||||
schema,
|
||||
&mut normalizer,
|
||||
agg,
|
||||
&every,
|
||||
&offset,
|
||||
chunks,
|
||||
)?;
|
||||
// If we have to do real work, add it to the list of plans
|
||||
if let Some(ss_plan) = ss_plan {
|
||||
|
@ -679,7 +718,7 @@ impl InfluxRpcPlanner {
|
|||
/// Creates a map of table_name --> Chunks that have that table that *may* pass the predicate
|
||||
fn group_chunks_by_table<C>(
|
||||
&self,
|
||||
predicate: &Predicate,
|
||||
normalizer: &mut PredicateNormalizer,
|
||||
chunks: Vec<Arc<C>>,
|
||||
) -> Result<BTreeMap<String, Vec<Arc<C>>>>
|
||||
where
|
||||
|
@ -687,9 +726,10 @@ impl InfluxRpcPlanner {
|
|||
{
|
||||
let mut table_chunks = BTreeMap::new();
|
||||
for chunk in chunks {
|
||||
let predicate = normalizer.normalized(chunk.table_name());
|
||||
// Try and apply the predicate using only metadata
|
||||
let pred_result = chunk
|
||||
.apply_predicate_to_metadata(predicate)
|
||||
.apply_predicate_to_metadata(&predicate)
|
||||
.map_err(|e| Box::new(e) as _)
|
||||
.context(CheckingChunkPredicate {
|
||||
chunk_id: chunk.id(),
|
||||
|
@ -727,13 +767,13 @@ impl InfluxRpcPlanner {
|
|||
&self,
|
||||
table_name: &str,
|
||||
schema: Arc<Schema>,
|
||||
predicate: &Predicate,
|
||||
normalizer: &mut PredicateNormalizer,
|
||||
chunks: Vec<Arc<C>>,
|
||||
) -> Result<Option<StringSetPlan>>
|
||||
where
|
||||
C: QueryChunk + 'static,
|
||||
{
|
||||
let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(table_name, schema, normalizer, chunks)?;
|
||||
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
|
@ -790,13 +830,13 @@ impl InfluxRpcPlanner {
|
|||
&self,
|
||||
table_name: &str,
|
||||
schema: Arc<Schema>,
|
||||
predicate: &Predicate,
|
||||
normalizer: &mut PredicateNormalizer,
|
||||
chunks: Vec<Arc<C>>,
|
||||
) -> Result<Option<LogicalPlan>>
|
||||
where
|
||||
C: QueryChunk + 'static,
|
||||
{
|
||||
let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(table_name, schema, normalizer, chunks)?;
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
schema,
|
||||
|
@ -842,14 +882,15 @@ impl InfluxRpcPlanner {
|
|||
table_name: impl AsRef<str>,
|
||||
schema: Arc<Schema>,
|
||||
prefix_columns: Option<&[impl AsRef<str>]>,
|
||||
predicate: &Predicate,
|
||||
normalizer: &mut PredicateNormalizer,
|
||||
chunks: Vec<Arc<C>>,
|
||||
) -> Result<Option<SeriesSetPlan>>
|
||||
where
|
||||
C: QueryChunk + 'static,
|
||||
{
|
||||
let table_name = table_name.as_ref();
|
||||
let scan_and_filter = self.scan_and_filter(table_name, schema, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(table_name, schema, normalizer, chunks)?;
|
||||
let predicate = normalizer.normalized(table_name);
|
||||
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
|
@ -885,7 +926,7 @@ impl InfluxRpcPlanner {
|
|||
// Select away anything that isn't in the influx data model
|
||||
let tags_fields_and_timestamps: Vec<Expr> = schema
|
||||
.tags_iter()
|
||||
.chain(filtered_fields_iter(&schema, predicate))
|
||||
.chain(filtered_fields_iter(&schema, &predicate))
|
||||
.chain(schema.time_iter())
|
||||
.map(|field| field.name().as_expr())
|
||||
.collect();
|
||||
|
@ -901,7 +942,7 @@ impl InfluxRpcPlanner {
|
|||
.map(|field| Arc::from(field.name().as_str()))
|
||||
.collect();
|
||||
|
||||
let field_columns = filtered_fields_iter(&schema, predicate)
|
||||
let field_columns = filtered_fields_iter(&schema, &predicate)
|
||||
.map(|field| Arc::from(field.name().as_str()))
|
||||
.collect();
|
||||
|
||||
|
@ -962,7 +1003,7 @@ impl InfluxRpcPlanner {
|
|||
&self,
|
||||
table_name: impl Into<String>,
|
||||
schema: Arc<Schema>,
|
||||
predicate: &Predicate,
|
||||
normalizer: &mut PredicateNormalizer,
|
||||
agg: Aggregate,
|
||||
group_columns: &[impl AsRef<str>],
|
||||
chunks: Vec<Arc<C>>,
|
||||
|
@ -971,7 +1012,8 @@ impl InfluxRpcPlanner {
|
|||
C: QueryChunk + 'static,
|
||||
{
|
||||
let table_name = table_name.into();
|
||||
let scan_and_filter = self.scan_and_filter(&table_name, schema, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(&table_name, schema, normalizer, chunks)?;
|
||||
let predicate = normalizer.normalized(&table_name);
|
||||
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
|
@ -1000,7 +1042,7 @@ impl InfluxRpcPlanner {
|
|||
let AggExprs {
|
||||
agg_exprs,
|
||||
field_columns,
|
||||
} = AggExprs::try_new(agg, &schema, predicate)?;
|
||||
} = AggExprs::try_new(agg, &schema, &predicate)?;
|
||||
|
||||
let sort_exprs = group_exprs
|
||||
.iter()
|
||||
|
@ -1056,7 +1098,7 @@ impl InfluxRpcPlanner {
|
|||
&self,
|
||||
table_name: impl Into<String>,
|
||||
schema: Arc<Schema>,
|
||||
predicate: &Predicate,
|
||||
normalizer: &mut PredicateNormalizer,
|
||||
agg: Aggregate,
|
||||
every: &WindowDuration,
|
||||
offset: &WindowDuration,
|
||||
|
@ -1066,7 +1108,8 @@ impl InfluxRpcPlanner {
|
|||
C: QueryChunk + 'static,
|
||||
{
|
||||
let table_name = table_name.into();
|
||||
let scan_and_filter = self.scan_and_filter(&table_name, schema, predicate, chunks)?;
|
||||
let scan_and_filter = self.scan_and_filter(&table_name, schema, normalizer, chunks)?;
|
||||
let predicate = normalizer.normalized(&table_name);
|
||||
|
||||
let TableScanAndFilter {
|
||||
plan_builder,
|
||||
|
@ -1087,7 +1130,7 @@ impl InfluxRpcPlanner {
|
|||
.collect::<Vec<_>>();
|
||||
|
||||
// aggregate each field
|
||||
let agg_exprs = filtered_fields_iter(&schema, predicate)
|
||||
let agg_exprs = filtered_fields_iter(&schema, &predicate)
|
||||
.map(|field| make_agg_expr(agg, field.name()))
|
||||
.collect::<Result<Vec<_>>>()?;
|
||||
|
||||
|
@ -1111,7 +1154,7 @@ impl InfluxRpcPlanner {
|
|||
.map(|field| Arc::from(field.name().as_str()))
|
||||
.collect();
|
||||
|
||||
let field_columns = filtered_fields_iter(&schema, predicate)
|
||||
let field_columns = filtered_fields_iter(&schema, &predicate)
|
||||
.map(|field| Arc::from(field.name().as_str()))
|
||||
.collect();
|
||||
|
||||
|
@ -1144,12 +1187,14 @@ impl InfluxRpcPlanner {
|
|||
&self,
|
||||
table_name: &str,
|
||||
schema: Arc<Schema>,
|
||||
predicate: &Predicate,
|
||||
normalizer: &mut PredicateNormalizer,
|
||||
chunks: Vec<Arc<C>>,
|
||||
) -> Result<Option<TableScanAndFilter>>
|
||||
where
|
||||
C: QueryChunk + 'static,
|
||||
{
|
||||
let predicate = normalizer.normalized(table_name);
|
||||
|
||||
// Scan all columns to begin with (DataFusion projection
|
||||
// push-down optimization will prune out unneeded columns later)
|
||||
let projection = None;
|
||||
|
@ -1459,8 +1504,123 @@ fn make_selector_expr(
|
|||
.alias(column_name))
|
||||
}
|
||||
|
||||
/// Creates specialized / normalized predicates
|
||||
#[derive(Debug)]
|
||||
struct PredicateNormalizer {
|
||||
unnormalized: Predicate,
|
||||
normalized: HashMap<String, TableNormalizedPredicate>,
|
||||
}
|
||||
|
||||
impl PredicateNormalizer {
|
||||
fn new(unnormalized: Predicate) -> Self {
|
||||
Self {
|
||||
unnormalized,
|
||||
normalized: Default::default(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Return a reference to the unnormalized predicate
|
||||
fn unnormalized(&self) -> &Predicate {
|
||||
&self.unnormalized
|
||||
}
|
||||
|
||||
/// Return a reference to a predicate specialized for `table_name`
|
||||
fn normalized(&mut self, table_name: &str) -> Arc<Predicate> {
|
||||
if let Some(normalized_predicate) = self.normalized.get(table_name) {
|
||||
return normalized_predicate.inner();
|
||||
}
|
||||
|
||||
let normalized_predicate =
|
||||
TableNormalizedPredicate::new(table_name, self.unnormalized.clone());
|
||||
|
||||
self.normalized
|
||||
.entry(table_name.to_string())
|
||||
.or_insert(normalized_predicate)
|
||||
.inner()
|
||||
}
|
||||
}
|
||||
|
||||
/// Predicate that has been "specialized" / normalized for a
|
||||
/// particular table. Specifically:
|
||||
///
|
||||
/// * all references to the [MEASUREMENT_COLUMN_NAME] column in any
|
||||
/// `Exprs` are rewritten with the actual table name
|
||||
///
|
||||
/// For example if the original predicate was
|
||||
/// ```text
|
||||
/// _measurement = "some_table"
|
||||
/// ```
|
||||
///
|
||||
/// When evaluated on table "cpu" then the predicate is rewritten to
|
||||
/// ```text
|
||||
/// "cpu" = "some_table"
|
||||
///
|
||||
#[derive(Debug)]
|
||||
struct TableNormalizedPredicate {
|
||||
inner: Arc<Predicate>,
|
||||
}
|
||||
|
||||
impl TableNormalizedPredicate {
|
||||
fn new(table_name: &str, mut inner: Predicate) -> Self {
|
||||
inner.exprs = inner
|
||||
.exprs
|
||||
.into_iter()
|
||||
.map(|e| rewrite_measurement_references(table_name, e))
|
||||
.collect::<Vec<_>>();
|
||||
|
||||
Self {
|
||||
inner: Arc::new(inner),
|
||||
}
|
||||
}
|
||||
|
||||
fn inner(&self) -> Arc<Predicate> {
|
||||
Arc::clone(&self.inner)
|
||||
}
|
||||
}
|
||||
|
||||
/// Rewrites all references to the [MEASUREMENT_COLUMN_NAME] column
|
||||
/// with the actual table name
|
||||
fn rewrite_measurement_references(table_name: &str, expr: Expr) -> Expr {
|
||||
let mut rewriter = MeasurementRewriter { table_name };
|
||||
expr.rewrite(&mut rewriter).expect("rewrite is infallable")
|
||||
}
|
||||
|
||||
struct MeasurementRewriter<'a> {
|
||||
table_name: &'a str,
|
||||
}
|
||||
|
||||
impl ExprRewriter for MeasurementRewriter<'_> {
|
||||
fn mutate(&mut self, expr: Expr) -> DatafusionResult<Expr> {
|
||||
Ok(match expr {
|
||||
// rewrite col("_measurement") --> "table_name"
|
||||
Expr::Column(Column { relation, name }) if name == MEASUREMENT_COLUMN_NAME => {
|
||||
// should not have a qualified foo._measurement
|
||||
// reference
|
||||
assert!(relation.is_none());
|
||||
lit(self.table_name)
|
||||
}
|
||||
// no rewrite needed
|
||||
_ => expr,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns true if all columns referred to in `expr` are present in
|
||||
/// the schema, false otherwise
|
||||
pub fn schema_has_all_expr_columns(schema: &Schema, expr: &Expr) -> bool {
|
||||
let mut predicate_columns = std::collections::HashSet::new();
|
||||
expr_to_columns(expr, &mut predicate_columns).unwrap();
|
||||
|
||||
predicate_columns.into_iter().all(|col| {
|
||||
let col_name = col.name.as_str();
|
||||
col_name == MEASUREMENT_COLUMN_NAME || schema.find_index_of(col_name).is_some()
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use internal_types::schema::builder::SchemaBuilder;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
|
@ -1543,4 +1703,37 @@ mod tests {
|
|||
Err(e) => format!("{}", e),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_schema_has_all_exprs_() {
|
||||
let schema = SchemaBuilder::new().tag("t1").timestamp().build().unwrap();
|
||||
|
||||
assert!(schema_has_all_expr_columns(
|
||||
&schema,
|
||||
&col("t1").eq(lit("foo"))
|
||||
));
|
||||
assert!(!schema_has_all_expr_columns(
|
||||
&schema,
|
||||
&col("t2").eq(lit("foo"))
|
||||
));
|
||||
assert!(schema_has_all_expr_columns(
|
||||
&schema,
|
||||
&col("t1").eq(col("time"))
|
||||
));
|
||||
assert!(!schema_has_all_expr_columns(
|
||||
&schema,
|
||||
&col("t1").eq(col("time2"))
|
||||
));
|
||||
assert!(!schema_has_all_expr_columns(
|
||||
&schema,
|
||||
&col("t1").eq(col("time")).and(col("t3").lt(col("time")))
|
||||
));
|
||||
|
||||
assert!(schema_has_all_expr_columns(
|
||||
&schema,
|
||||
&col("t1")
|
||||
.eq(col("time"))
|
||||
.and(col("_measurement").eq(lit("the_table")))
|
||||
));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,20 +1,19 @@
|
|||
//! This module contains DataFusion utility functions and helpers
|
||||
|
||||
use std::{collections::HashSet, convert::TryInto, sync::Arc};
|
||||
use std::{convert::TryInto, sync::Arc};
|
||||
|
||||
use arrow::{compute::SortOptions, datatypes::Schema as ArrowSchema, record_batch::RecordBatch};
|
||||
|
||||
use datafusion::{
|
||||
error::DataFusionError,
|
||||
logical_plan::{DFSchema, Expr, LogicalPlan, LogicalPlanBuilder},
|
||||
optimizer::utils::expr_to_columns,
|
||||
physical_plan::{
|
||||
expressions::{col as physical_col, PhysicalSortExpr},
|
||||
planner::DefaultPhysicalPlanner,
|
||||
ExecutionPlan, PhysicalExpr,
|
||||
},
|
||||
};
|
||||
use internal_types::schema::{sort::SortKey, Schema};
|
||||
use internal_types::schema::sort::SortKey;
|
||||
|
||||
/// Create a logical plan that produces the record batch
|
||||
pub fn make_scan_plan(batch: RecordBatch) -> std::result::Result<LogicalPlan, DataFusionError> {
|
||||
|
@ -24,17 +23,6 @@ pub fn make_scan_plan(batch: RecordBatch) -> std::result::Result<LogicalPlan, Da
|
|||
LogicalPlanBuilder::scan_memory(partitions, schema, projection)?.build()
|
||||
}
|
||||
|
||||
/// Returns true if all columns referred to in schema are present, false
|
||||
/// otherwise
|
||||
pub fn schema_has_all_expr_columns(schema: &Schema, expr: &Expr) -> bool {
|
||||
let mut predicate_columns = HashSet::new();
|
||||
expr_to_columns(expr, &mut predicate_columns).unwrap();
|
||||
|
||||
predicate_columns
|
||||
.into_iter()
|
||||
.all(|col_name| schema.find_index_of(&col_name.name).is_some())
|
||||
}
|
||||
|
||||
/// Returns the pk in arrow's expression used for data sorting
|
||||
pub fn arrow_pk_sort_exprs(
|
||||
key_columns: Vec<&str>,
|
||||
|
@ -95,37 +83,3 @@ pub fn df_physical_expr(
|
|||
&ctx_state,
|
||||
)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use datafusion::prelude::*;
|
||||
use internal_types::schema::builder::SchemaBuilder;
|
||||
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_schema_has_all_exprs_() {
|
||||
let schema = SchemaBuilder::new().tag("t1").timestamp().build().unwrap();
|
||||
|
||||
assert!(schema_has_all_expr_columns(
|
||||
&schema,
|
||||
&col("t1").eq(lit("foo"))
|
||||
));
|
||||
assert!(!schema_has_all_expr_columns(
|
||||
&schema,
|
||||
&col("t2").eq(lit("foo"))
|
||||
));
|
||||
assert!(schema_has_all_expr_columns(
|
||||
&schema,
|
||||
&col("t1").eq(col("time"))
|
||||
));
|
||||
assert!(!schema_has_all_expr_columns(
|
||||
&schema,
|
||||
&col("t1").eq(col("time2"))
|
||||
));
|
||||
assert!(!schema_has_all_expr_columns(
|
||||
&schema,
|
||||
&col("t1").eq(col("time")).and(col("t3").lt(col("time")))
|
||||
));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -99,6 +99,36 @@ async fn test_field_columns_with_pred() {
|
|||
run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_field_columns_measurement_pred() {
|
||||
// get only fields from h2o using a _measurement predicate
|
||||
let predicate = PredicateBuilder::default()
|
||||
.add_expr(col("_measurement").eq(lit("h2o")))
|
||||
.build();
|
||||
|
||||
let expected_fields = FieldList {
|
||||
fields: vec![
|
||||
Field {
|
||||
name: "moisture".into(),
|
||||
data_type: DataType::Float64,
|
||||
last_timestamp: 100000,
|
||||
},
|
||||
Field {
|
||||
name: "other_temp".into(),
|
||||
data_type: DataType::Float64,
|
||||
last_timestamp: 350,
|
||||
},
|
||||
Field {
|
||||
name: "temp".into(),
|
||||
data_type: DataType::Float64,
|
||||
last_timestamp: 100000,
|
||||
},
|
||||
],
|
||||
};
|
||||
|
||||
run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_field_columns_with_ts_pred() {
|
||||
let predicate = PredicateBuilder::default()
|
||||
|
|
|
@ -221,6 +221,36 @@ async fn test_read_filter_data_filter_fields() {
|
|||
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_filter_data_filter_measurement_pred() {
|
||||
// use an expr on table name to pick just the last row from o2
|
||||
let predicate = PredicateBuilder::default()
|
||||
.timestamp_range(200, 400)
|
||||
.add_expr(col("_measurement").eq(lit("o2")))
|
||||
.build();
|
||||
|
||||
// Only expect other_temp in this location
|
||||
let expected_results = vec![
|
||||
"SeriesSet",
|
||||
"table_name: o2",
|
||||
"tags",
|
||||
" (state, CA)",
|
||||
"field_indexes:",
|
||||
" (value_index: 2, timestamp_index: 4)",
|
||||
" (value_index: 3, timestamp_index: 4)",
|
||||
"start_row: 0",
|
||||
"num_rows: 1",
|
||||
"Batches:",
|
||||
"+------+-------+---------+------+--------------------------------+",
|
||||
"| city | state | reading | temp | time |",
|
||||
"+------+-------+---------+------+--------------------------------+",
|
||||
"| | CA | | 79 | 1970-01-01T00:00:00.000000300Z |",
|
||||
"+------+-------+---------+------+--------------------------------+",
|
||||
];
|
||||
|
||||
run_read_filter_test_case(TwoMeasurementsManyFields {}, predicate, expected_results).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_filter_data_pred_refers_to_non_existent_column() {
|
||||
let predicate = PredicateBuilder::default()
|
||||
|
|
|
@ -312,6 +312,62 @@ async fn test_grouped_series_set_plan_mean() {
|
|||
.await;
|
||||
}
|
||||
|
||||
struct TwoMeasurementForAggs {}
|
||||
#[async_trait]
|
||||
impl DbSetup for TwoMeasurementForAggs {
|
||||
async fn make(&self) -> Vec<DbScenario> {
|
||||
let partition_key = "1970-01-01T00";
|
||||
|
||||
let lp_lines1 = vec![
|
||||
"h2o,state=MA,city=Boston temp=70.4 100",
|
||||
"h2o,state=MA,city=Boston temp=72.4 250",
|
||||
];
|
||||
let lp_lines2 = vec![
|
||||
"o2,state=CA,city=LA temp=90.0 200",
|
||||
"o2,state=CA,city=LA temp=90.0 350",
|
||||
];
|
||||
|
||||
make_two_chunk_scenarios(partition_key, &lp_lines1.join("\n"), &lp_lines2.join("\n")).await
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_grouped_series_set_plan_count_measurement_pred() {
|
||||
let predicate = PredicateBuilder::default()
|
||||
// city = 'Boston' OR (_measurement = o2)
|
||||
.add_expr(
|
||||
col("city")
|
||||
.eq(lit("Boston"))
|
||||
.or(col("_measurement").eq(lit("o2"))),
|
||||
)
|
||||
.build();
|
||||
|
||||
let agg = Aggregate::Count;
|
||||
let group_columns = vec!["state"];
|
||||
|
||||
let expected_results = vec![
|
||||
"+-------+--------+------+--------------------------------+",
|
||||
"| state | city | temp | time |",
|
||||
"+-------+--------+------+--------------------------------+",
|
||||
"| MA | Boston | 2 | 1970-01-01T00:00:00.000000250Z |",
|
||||
"+-------+--------+------+--------------------------------+",
|
||||
"+-------+------+------+--------------------------------+",
|
||||
"| state | city | temp | time |",
|
||||
"+-------+------+------+--------------------------------+",
|
||||
"| CA | LA | 2 | 1970-01-01T00:00:00.000000350Z |",
|
||||
"+-------+------+------+--------------------------------+",
|
||||
];
|
||||
|
||||
run_read_group_test_case(
|
||||
TwoMeasurementForAggs {},
|
||||
predicate,
|
||||
agg,
|
||||
group_columns,
|
||||
expected_results,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
struct MeasurementForSelectors {}
|
||||
#[async_trait]
|
||||
impl DbSetup for MeasurementForSelectors {
|
||||
|
|
|
@ -150,6 +150,46 @@ async fn test_read_window_aggregate_nanoseconds() {
|
|||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_read_window_aggregate_nanoseconds_measurement_pred() {
|
||||
let predicate = PredicateBuilder::default()
|
||||
// city=Cambridge OR (_measurement != 'other' AND city = LA)
|
||||
.add_expr(
|
||||
col("city").eq(lit("Boston")).or(col("_measurement")
|
||||
.not_eq(lit("other"))
|
||||
.and(col("city").eq(lit("LA")))),
|
||||
)
|
||||
.timestamp_range(100, 450)
|
||||
.build();
|
||||
|
||||
let agg = Aggregate::Mean;
|
||||
let every = WindowDuration::from_nanoseconds(200);
|
||||
let offset = WindowDuration::from_nanoseconds(0);
|
||||
|
||||
let expected_results = vec![
|
||||
"+--------+-------+--------------------------------+------+",
|
||||
"| city | state | time | temp |",
|
||||
"+--------+-------+--------------------------------+------+",
|
||||
"| Boston | MA | 1970-01-01T00:00:00.000000200Z | 70 |",
|
||||
"| Boston | MA | 1970-01-01T00:00:00.000000400Z | 71.5 |",
|
||||
"| Boston | MA | 1970-01-01T00:00:00.000000600Z | 73 |",
|
||||
"| LA | CA | 1970-01-01T00:00:00.000000200Z | 90 |",
|
||||
"| LA | CA | 1970-01-01T00:00:00.000000400Z | 91.5 |",
|
||||
"| LA | CA | 1970-01-01T00:00:00.000000600Z | 93 |",
|
||||
"+--------+-------+--------------------------------+------+",
|
||||
];
|
||||
|
||||
run_read_window_aggregate_test_case(
|
||||
MeasurementForWindowAggregate {},
|
||||
predicate,
|
||||
agg,
|
||||
every,
|
||||
offset,
|
||||
expected_results,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
struct MeasurementForWindowAggregateMonths {}
|
||||
#[async_trait]
|
||||
impl DbSetup for MeasurementForWindowAggregateMonths {
|
||||
|
|
|
@ -73,6 +73,10 @@ async fn list_table_names_data_pred_250_300() {
|
|||
run_table_names_test_case(TwoMeasurements {}, tsp(250, 300), vec![]).await;
|
||||
}
|
||||
|
||||
// Note when table names supports general purpose predicates, add a
|
||||
// test here with a `_measurement` predicate
|
||||
// https://github.com/influxdata/influxdb_iox/issues/762
|
||||
|
||||
// make a single timestamp predicate between r1 and r2
|
||||
fn tsp(r1: i64, r2: i64) -> Predicate {
|
||||
PredicateBuilder::default().timestamp_range(r1, r2).build()
|
||||
|
|
|
@ -61,6 +61,7 @@ async fn list_tag_columns_timestamp() {
|
|||
let expected_tag_keys = vec!["city", "state"];
|
||||
run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_tag_columns_predicate() {
|
||||
let predicate = PredicateBuilder::default()
|
||||
|
@ -70,6 +71,19 @@ async fn list_tag_columns_predicate() {
|
|||
run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_tag_columns_measurement_pred() {
|
||||
// Select only the following line using a _measurement predicate
|
||||
//
|
||||
// "o2,state=NY,city=NYC temp=61.0 500",
|
||||
let predicate = PredicateBuilder::default()
|
||||
.timestamp_range(450, 550)
|
||||
.add_expr(col("_measurement").eq(lit("o2"))) // _measurement=o2
|
||||
.build();
|
||||
let expected_tag_keys = vec!["city", "state"];
|
||||
run_tag_keys_test_case(TwoMeasurementsManyNulls {}, predicate, expected_tag_keys).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_tag_columns_timestamp_and_predicate() {
|
||||
let predicate = PredicateBuilder::default()
|
||||
|
|
|
@ -236,6 +236,49 @@ async fn list_tag_values_table_and_timestamp_and_state_pred_state_col_no_rows()
|
|||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_tag_values_measurement_pred() {
|
||||
let tag_name = "state";
|
||||
let predicate = PredicateBuilder::default()
|
||||
.timestamp_range(1, 600) // filters out the NY row
|
||||
.add_expr(col("_measurement").not_eq(lit("o2")))
|
||||
.build();
|
||||
let expected_tag_keys = vec!["CA", "MA"];
|
||||
|
||||
run_tag_values_test_case(
|
||||
TwoMeasurementsManyNulls {},
|
||||
tag_name,
|
||||
predicate,
|
||||
expected_tag_keys,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_tag_values_measurement_pred_and_or() {
|
||||
let tag_name = "city";
|
||||
let predicate = PredicateBuilder::default()
|
||||
.timestamp_range(1, 600) // filters out the NY row
|
||||
// since there is an 'OR' in this predicate, can't answer
|
||||
// with metadata alone
|
||||
// _measurement = 'o2' OR temp > 70.0
|
||||
.add_expr(
|
||||
col("_measurement")
|
||||
.eq(lit("o2"))
|
||||
.or(col("temp").gt(lit(70.0))),
|
||||
)
|
||||
.build();
|
||||
let expected_tag_keys = vec!["Boston", "LA", "NYC"];
|
||||
|
||||
run_tag_values_test_case(
|
||||
TwoMeasurementsManyNulls {},
|
||||
tag_name,
|
||||
predicate,
|
||||
expected_tag_keys,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn list_tag_values_field_col() {
|
||||
let db_setup = TwoMeasurementsManyNulls {};
|
||||
|
|
|
@ -496,23 +496,18 @@ impl Column {
|
|||
}
|
||||
}
|
||||
|
||||
// When the predicate is one of {<, <=, >, >=} and the column doesn't
|
||||
// contain any null values, and the entire range of values satisfies the
|
||||
// predicate then the column doesn't need to be read.
|
||||
cmp::Operator::GT | cmp::Operator::GTE | cmp::Operator::LT | cmp::Operator::LTE => {
|
||||
// When the predicate is one of {!=, <, <=, >, >=} and the column
|
||||
// doesn't contain any null values, and the entire range of values
|
||||
// satisfies the predicate then the column doesn't need to be read.
|
||||
cmp::Operator::NotEqual
|
||||
| cmp::Operator::GT
|
||||
| cmp::Operator::GTE
|
||||
| cmp::Operator::LT
|
||||
| cmp::Operator::LTE => {
|
||||
if self.predicate_matches_all_values(op, value) {
|
||||
return PredicateMatch::All;
|
||||
}
|
||||
}
|
||||
|
||||
// When the predicate is != and the metadata range indicates that the
|
||||
// column can't possibly contain `value` then the predicate must
|
||||
// match all rows on the column.
|
||||
cmp::Operator::NotEqual => {
|
||||
if !self.might_contain_value(value) {
|
||||
return PredicateMatch::All; // all rows are going to match.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if self.predicate_matches_no_values(op, value) {
|
||||
|
@ -1429,6 +1424,7 @@ pub(crate) struct Statistics {
|
|||
mod test {
|
||||
use super::*;
|
||||
use arrow::array::{Int64Array, StringArray};
|
||||
use cmp::Operator;
|
||||
use encoding::string::NULL_ID;
|
||||
|
||||
#[test]
|
||||
|
@ -2020,7 +2016,7 @@ mod test {
|
|||
|
||||
row_ids = col.row_ids_filter(
|
||||
&cmp::Operator::NotEqual,
|
||||
&Value::from(-1257_i64),
|
||||
&Value::from(1257_i64),
|
||||
RowIDs::new_bitmap(),
|
||||
);
|
||||
assert!(matches!(row_ids, RowIDsOption::All(_)));
|
||||
|
@ -2256,63 +2252,90 @@ mod test {
|
|||
|
||||
#[test]
|
||||
fn evaluate_predicate_on_meta() {
|
||||
let input = &[100_i64, 200, 300, 2, 200, 22, 30];
|
||||
let col = Column::from(&input[..]);
|
||||
|
||||
let col = Column::from(&[100_i64, 200, 300, 2, 200, 22, 30][..]);
|
||||
let cases: Vec<(cmp::Operator, Scalar, PredicateMatch)> = vec![
|
||||
(Operator::GT, Scalar::U64(100), PredicateMatch::SomeMaybe),
|
||||
(Operator::GT, Scalar::I64(100), PredicateMatch::SomeMaybe),
|
||||
(Operator::GT, Scalar::I64(-99), PredicateMatch::All),
|
||||
(Operator::GT, Scalar::I64(100), PredicateMatch::SomeMaybe),
|
||||
(Operator::LT, Scalar::I64(300), PredicateMatch::SomeMaybe),
|
||||
(Operator::LTE, Scalar::I64(300), PredicateMatch::All),
|
||||
(Operator::Equal, Scalar::I64(2), PredicateMatch::SomeMaybe),
|
||||
(
|
||||
cmp::Operator::GT,
|
||||
Scalar::U64(100),
|
||||
PredicateMatch::SomeMaybe,
|
||||
),
|
||||
(
|
||||
cmp::Operator::GT,
|
||||
Scalar::I64(100),
|
||||
PredicateMatch::SomeMaybe,
|
||||
),
|
||||
(cmp::Operator::GT, Scalar::I64(-99), PredicateMatch::All),
|
||||
(
|
||||
cmp::Operator::GT,
|
||||
Scalar::I64(100),
|
||||
PredicateMatch::SomeMaybe,
|
||||
),
|
||||
(
|
||||
cmp::Operator::LT,
|
||||
Scalar::I64(300),
|
||||
PredicateMatch::SomeMaybe,
|
||||
),
|
||||
(cmp::Operator::LTE, Scalar::I64(300), PredicateMatch::All),
|
||||
(
|
||||
cmp::Operator::Equal,
|
||||
Operator::NotEqual,
|
||||
Scalar::I64(2),
|
||||
PredicateMatch::SomeMaybe,
|
||||
),
|
||||
(
|
||||
cmp::Operator::NotEqual,
|
||||
Scalar::I64(2),
|
||||
PredicateMatch::SomeMaybe,
|
||||
),
|
||||
(cmp::Operator::NotEqual, Scalar::I64(1), PredicateMatch::All),
|
||||
(
|
||||
cmp::Operator::NotEqual,
|
||||
Scalar::I64(301),
|
||||
PredicateMatch::All,
|
||||
),
|
||||
(cmp::Operator::GT, Scalar::I64(100000), PredicateMatch::None),
|
||||
(cmp::Operator::GTE, Scalar::I64(301), PredicateMatch::None),
|
||||
(cmp::Operator::LT, Scalar::I64(2), PredicateMatch::None),
|
||||
(cmp::Operator::LTE, Scalar::I64(-100), PredicateMatch::None),
|
||||
(
|
||||
cmp::Operator::Equal,
|
||||
Scalar::I64(100000),
|
||||
PredicateMatch::None,
|
||||
),
|
||||
(Operator::NotEqual, Scalar::I64(1), PredicateMatch::All),
|
||||
(Operator::NotEqual, Scalar::I64(301), PredicateMatch::All),
|
||||
(Operator::GT, Scalar::I64(100000), PredicateMatch::None),
|
||||
(Operator::GTE, Scalar::I64(301), PredicateMatch::None),
|
||||
(Operator::LT, Scalar::I64(2), PredicateMatch::None),
|
||||
(Operator::LTE, Scalar::I64(-100), PredicateMatch::None),
|
||||
(Operator::Equal, Scalar::I64(100000), PredicateMatch::None),
|
||||
];
|
||||
|
||||
for (op, scalar, result) in cases {
|
||||
for (i, (op, scalar, result)) in cases.into_iter().enumerate() {
|
||||
assert_eq!(
|
||||
col.evaluate_predicate_on_meta(&op, &Value::Scalar(scalar)),
|
||||
result
|
||||
result,
|
||||
"case {:?} failed",
|
||||
i
|
||||
);
|
||||
}
|
||||
|
||||
// Now with a column containing at least one NULL value.
|
||||
let col = Column::from(Int64Array::from(vec![
|
||||
Some(100_i64),
|
||||
Some(200),
|
||||
Some(300),
|
||||
Some(2),
|
||||
None,
|
||||
Some(200),
|
||||
Some(22),
|
||||
Some(30),
|
||||
]));
|
||||
|
||||
let cases: Vec<(cmp::Operator, Scalar, PredicateMatch)> = vec![
|
||||
(Operator::GT, Scalar::U64(100), PredicateMatch::SomeMaybe),
|
||||
(Operator::GT, Scalar::I64(100), PredicateMatch::SomeMaybe),
|
||||
// Not all values will match > -99 because of NULL
|
||||
(Operator::GT, Scalar::I64(-99), PredicateMatch::SomeMaybe),
|
||||
(Operator::GT, Scalar::I64(100), PredicateMatch::SomeMaybe),
|
||||
(Operator::LT, Scalar::I64(300), PredicateMatch::SomeMaybe),
|
||||
// Not all values will match < 300 because of NULL
|
||||
(Operator::LTE, Scalar::I64(300), PredicateMatch::SomeMaybe),
|
||||
(Operator::Equal, Scalar::I64(2), PredicateMatch::SomeMaybe),
|
||||
(
|
||||
Operator::NotEqual,
|
||||
Scalar::I64(2),
|
||||
PredicateMatch::SomeMaybe,
|
||||
),
|
||||
// Not all values will match != 1 because of NULL
|
||||
(
|
||||
Operator::NotEqual,
|
||||
Scalar::I64(1),
|
||||
PredicateMatch::SomeMaybe,
|
||||
),
|
||||
// Not all values will match != 301 because of NULL
|
||||
(
|
||||
Operator::NotEqual,
|
||||
Scalar::I64(301),
|
||||
PredicateMatch::SomeMaybe,
|
||||
),
|
||||
(Operator::GT, Scalar::I64(100000), PredicateMatch::None),
|
||||
(Operator::GTE, Scalar::I64(301), PredicateMatch::None),
|
||||
(Operator::LT, Scalar::I64(2), PredicateMatch::None),
|
||||
(Operator::LTE, Scalar::I64(-100), PredicateMatch::None),
|
||||
(Operator::Equal, Scalar::I64(100000), PredicateMatch::None),
|
||||
];
|
||||
|
||||
for (i, (op, scalar, result)) in cases.into_iter().enumerate() {
|
||||
assert_eq!(
|
||||
col.evaluate_predicate_on_meta(&op, &Value::Scalar(scalar)),
|
||||
result,
|
||||
"case {:?} failed",
|
||||
i
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2638,23 +2638,31 @@ mod test {
|
|||
|
||||
fn _read_filter_setup() -> RowGroup {
|
||||
let mut columns = vec![];
|
||||
let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6][..]));
|
||||
let tc = ColumnType::Time(Column::from(&[1_i64, 2, 3, 4, 5, 6, 8][..]));
|
||||
columns.push(("time".to_string(), tc));
|
||||
|
||||
let rc = ColumnType::Tag(Column::from(
|
||||
&["west", "west", "east", "west", "south", "north"][..],
|
||||
&[
|
||||
Some("west"),
|
||||
Some("west"),
|
||||
Some("east"),
|
||||
Some("west"),
|
||||
Some("south"),
|
||||
Some("north"),
|
||||
None,
|
||||
][..],
|
||||
));
|
||||
columns.push(("region".to_string(), rc));
|
||||
|
||||
let mc = ColumnType::Tag(Column::from(
|
||||
&["GET", "POST", "POST", "POST", "PUT", "GET"][..],
|
||||
&["GET", "POST", "POST", "POST", "PUT", "GET", "PATCH"][..],
|
||||
));
|
||||
columns.push(("method".to_string(), mc));
|
||||
|
||||
let fc = ColumnType::Field(Column::from(&[100_u64, 101, 200, 203, 203, 10][..]));
|
||||
let fc = ColumnType::Field(Column::from(&[100_u64, 101, 200, 203, 203, 10, 29][..]));
|
||||
columns.push(("count".to_string(), fc));
|
||||
|
||||
RowGroup::new(6, columns)
|
||||
RowGroup::new(7, columns)
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -2711,6 +2719,28 @@ POST
|
|||
west,2
|
||||
east,3
|
||||
west,4
|
||||
",
|
||||
),
|
||||
(
|
||||
vec!["region", "time"],
|
||||
Predicate::with_time_range(&[BinaryExpr::from(("method", "!=", "HEAD"))], 0, 10),
|
||||
"region,time
|
||||
west,1
|
||||
west,2
|
||||
east,3
|
||||
west,4
|
||||
south,5
|
||||
north,6
|
||||
NULL,8
|
||||
",
|
||||
),
|
||||
(
|
||||
vec!["region", "time"],
|
||||
Predicate::with_time_range(&[BinaryExpr::from(("region", "!=", "west"))], 0, 10),
|
||||
"region,time
|
||||
east,3
|
||||
south,5
|
||||
north,6
|
||||
",
|
||||
),
|
||||
];
|
||||
|
@ -2744,6 +2774,7 @@ POST,east,3
|
|||
POST,west,4
|
||||
PUT,south,5
|
||||
GET,north,6
|
||||
PATCH,NULL,8
|
||||
",
|
||||
),
|
||||
(
|
||||
|
|
|
@ -22,7 +22,10 @@ use generated_types::{
|
|||
use super::{TAG_KEY_FIELD, TAG_KEY_MEASUREMENT};
|
||||
use observability_deps::tracing::warn;
|
||||
use predicate::{predicate::PredicateBuilder, regex::regex_match_expr};
|
||||
use query::group_by::{Aggregate as QueryAggregate, WindowDuration};
|
||||
use query::{
|
||||
frontend::influxrpc::MEASUREMENT_COLUMN_NAME,
|
||||
group_by::{Aggregate as QueryAggregate, WindowDuration},
|
||||
};
|
||||
use snafu::{OptionExt, ResultExt, Snafu};
|
||||
|
||||
#[derive(Debug, Snafu)]
|
||||
|
@ -61,9 +64,6 @@ pub enum Error {
|
|||
))]
|
||||
InvalidWindowOffsetDuration { description: String },
|
||||
|
||||
#[snafu(display("Internal error: found measurement tag reference in unexpected location"))]
|
||||
InternalInvalidMeasurementReference {},
|
||||
|
||||
#[snafu(display("Internal error: found field tag reference in unexpected location"))]
|
||||
InternalInvalidFieldReference {},
|
||||
|
||||
|
@ -422,11 +422,12 @@ fn convert_node_to_expr(node: RPCNode) -> Result<Expr> {
|
|||
}
|
||||
|
||||
fn make_tag_name(tag_name: Vec<u8>) -> Result<String> {
|
||||
if tag_name.is_measurement() {
|
||||
// convert to "_measurement" which is handled specially in grpc planner
|
||||
Ok(MEASUREMENT_COLUMN_NAME.to_string())
|
||||
} else if tag_name.is_field() {
|
||||
// These should have been handled at a higher level -- if we get
|
||||
// here it is too late
|
||||
if tag_name.is_measurement() {
|
||||
InternalInvalidMeasurementReference.fail()
|
||||
} else if tag_name.is_field() {
|
||||
InternalInvalidFieldReference.fail()
|
||||
} else {
|
||||
String::from_utf8(tag_name).context(ConvertingTagName)
|
||||
|
@ -808,7 +809,7 @@ mod tests {
|
|||
let res = PredicateBuilder::default().rpc_predicate(Some(rpc_predicate));
|
||||
|
||||
let expected_error = "Unexpected empty predicate: Node";
|
||||
let actual_error = error_result_to_string(res);
|
||||
let actual_error = res.unwrap_err().to_string();
|
||||
assert!(
|
||||
actual_error.contains(expected_error),
|
||||
"expected '{}' not found in '{}'",
|
||||
|
@ -830,21 +831,52 @@ mod tests {
|
|||
.expect("successfully converting predicate")
|
||||
.build();
|
||||
|
||||
assert_eq!(predicate.exprs.len(), 1);
|
||||
let converted_expr = &predicate.exprs[0];
|
||||
|
||||
// compare the expression using their string representations
|
||||
// as Expr can't be compared directly.
|
||||
let converted_expr = format!("{:?}", converted_expr);
|
||||
let expected_expr = format!("{:?}", expected_expr);
|
||||
let converted_expr = &predicate.exprs;
|
||||
|
||||
assert_eq!(
|
||||
expected_expr, converted_expr,
|
||||
&expected_expr, converted_expr,
|
||||
"expected '{:#?}' doesn't match actual '{:#?}'",
|
||||
expected_expr, converted_expr
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_convert_predicate_measurement() {
|
||||
// _measurement = "foo"
|
||||
let field_ref = RPCNode {
|
||||
node_type: RPCNodeType::TagRef as i32,
|
||||
children: vec![],
|
||||
value: Some(RPCValue::TagRefValue(TAG_KEY_MEASUREMENT.to_vec())),
|
||||
};
|
||||
let iconst = RPCNode {
|
||||
node_type: RPCNodeType::Literal as i32,
|
||||
children: vec![],
|
||||
value: Some(RPCValue::StringValue("foo".into())),
|
||||
};
|
||||
let comparison = RPCNode {
|
||||
node_type: RPCNodeType::ComparisonExpression as i32,
|
||||
children: vec![field_ref, iconst],
|
||||
value: Some(RPCValue::Comparison(RPCComparison::NotEqual as i32)),
|
||||
};
|
||||
|
||||
let rpc_predicate = RPCPredicate {
|
||||
root: Some(comparison),
|
||||
};
|
||||
|
||||
let predicate = PredicateBuilder::default()
|
||||
.rpc_predicate(Some(rpc_predicate))
|
||||
.expect("successfully converting predicate")
|
||||
.build();
|
||||
|
||||
let expected_exprs = vec![col("_measurement").not_eq(lit("foo"))];
|
||||
|
||||
assert_eq!(
|
||||
&expected_exprs, &predicate.exprs,
|
||||
"expected '{:#?}' doesn't match actual '{:#?}'",
|
||||
expected_exprs, predicate.exprs,
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_convert_predicate_no_children() {
|
||||
let comparison = RPCNode {
|
||||
|
@ -860,7 +892,7 @@ mod tests {
|
|||
let res = PredicateBuilder::default().rpc_predicate(Some(rpc_predicate));
|
||||
|
||||
let expected_error = "Error creating predicate: Unsupported number of children in binary operator Gt: 0 (must be 2)";
|
||||
let actual_error = error_result_to_string(res);
|
||||
let actual_error = res.unwrap_err().to_string();
|
||||
assert!(
|
||||
actual_error.contains(expected_error),
|
||||
"expected '{}' not found in '{}'",
|
||||
|
@ -891,7 +923,7 @@ mod tests {
|
|||
let res = PredicateBuilder::default().rpc_predicate(Some(rpc_predicate));
|
||||
|
||||
let expected_error = "Error creating predicate: Unknown comparison node type: 42";
|
||||
let actual_error = error_result_to_string(res);
|
||||
let actual_error = res.unwrap_err().to_string();
|
||||
assert!(
|
||||
actual_error.contains(expected_error),
|
||||
"expected '{}' not found in '{}'",
|
||||
|
@ -922,7 +954,7 @@ mod tests {
|
|||
let res = PredicateBuilder::default().rpc_predicate(Some(rpc_predicate));
|
||||
|
||||
let expected_error = "Error creating predicate: Unknown logical node type: 42";
|
||||
let actual_error = error_result_to_string(res);
|
||||
let actual_error = res.unwrap_err().to_string();
|
||||
assert!(
|
||||
actual_error.contains(expected_error),
|
||||
"expected '{}' not found in '{}'",
|
||||
|
@ -1017,14 +1049,10 @@ mod tests {
|
|||
.unwrap()
|
||||
.build();
|
||||
|
||||
// compare the expression using their string representations
|
||||
// as Expr can't be compared directly.
|
||||
assert_eq!(predicate.exprs.len(), 1);
|
||||
let converted_expr = format!("{:?}", predicate.exprs[0]);
|
||||
let expected_expr = format!("{:?}", expected_expr);
|
||||
let converted_expr = &predicate.exprs;
|
||||
|
||||
assert_eq!(
|
||||
expected_expr, converted_expr,
|
||||
&expected_expr, converted_expr,
|
||||
"expected '{:#?}' doesn't match actual '{:#?}'",
|
||||
expected_expr, converted_expr
|
||||
);
|
||||
|
@ -1072,7 +1100,7 @@ mod tests {
|
|||
let res = PredicateBuilder::default().rpc_predicate(Some(rpc_predicate));
|
||||
|
||||
let expected_error = "Internal error: found field tag reference in unexpected location";
|
||||
let actual_error = error_result_to_string(res);
|
||||
let actual_error = res.unwrap_err().to_string();
|
||||
assert!(
|
||||
actual_error.contains(expected_error),
|
||||
"expected '{}' not found in '{}'",
|
||||
|
@ -1092,7 +1120,7 @@ mod tests {
|
|||
}
|
||||
|
||||
/// returns (RPCNode, and expected_expr for the "host > 5.0")
|
||||
fn make_host_comparison() -> (RPCNode, Expr) {
|
||||
fn make_host_comparison() -> (RPCNode, Vec<Expr>) {
|
||||
// host > 5.0
|
||||
let field_ref = RPCNode {
|
||||
node_type: RPCNodeType::FieldRef as i32,
|
||||
|
@ -1112,7 +1140,7 @@ mod tests {
|
|||
|
||||
let expected_expr = col("host").gt(lit(5.0));
|
||||
|
||||
(comparison, expected_expr)
|
||||
(comparison, vec![expected_expr])
|
||||
}
|
||||
|
||||
fn make_tag_ref_node(tag_name: &[u8], field_name: impl Into<String>) -> RPCNode {
|
||||
|
@ -1157,15 +1185,6 @@ mod tests {
|
|||
v.iter().map(|s| s.to_string()).collect::<BTreeSet<_>>()
|
||||
}
|
||||
|
||||
/// Return the dislay formay of the resulting error, or
|
||||
/// 'UNEXPECTED SUCCESS' if `res` is not an error.
|
||||
fn error_result_to_string<R>(res: Result<R>) -> String {
|
||||
match res {
|
||||
Ok(_) => "UNEXPECTED SUCCESS".into(),
|
||||
Err(e) => format!("{}", e),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_make_read_group_aggregate() {
|
||||
assert_eq!(
|
||||
|
@ -1220,17 +1239,17 @@ mod tests {
|
|||
let agg = make_read_window_aggregate(vec![], 5, 10, None);
|
||||
let expected =
|
||||
"Error creating aggregate: Exactly one aggregate is supported, but 0 were supplied: []";
|
||||
assert_eq!(error_result_to_string(agg), expected);
|
||||
assert_eq!(agg.unwrap_err().to_string(), expected);
|
||||
|
||||
let agg =
|
||||
make_read_window_aggregate(vec![make_aggregate(1), make_aggregate(2)], 5, 10, None);
|
||||
let expected = "Error creating aggregate: Exactly one aggregate is supported, but 2 were supplied: [Aggregate { r#type: Sum }, Aggregate { r#type: Count }]";
|
||||
assert_eq!(error_result_to_string(agg), expected);
|
||||
assert_eq!(agg.unwrap_err().to_string(), expected);
|
||||
|
||||
// now window specified
|
||||
let agg = make_read_window_aggregate(vec![make_aggregate(1)], 0, 0, None);
|
||||
let expected = "Error parsing window bounds: No window specified";
|
||||
assert_eq!(error_result_to_string(agg), expected);
|
||||
assert_eq!(agg.unwrap_err().to_string(), expected);
|
||||
|
||||
// correct window + window_every
|
||||
let agg = make_read_window_aggregate(vec![make_aggregate(1)], 5, 10, None).unwrap();
|
||||
|
@ -1302,7 +1321,7 @@ mod tests {
|
|||
Some(make_rpc_window(5, 1, false, 10, 0, false)),
|
||||
);
|
||||
let expected = "Error parsing window bounds duration \'window.every\': duration used as an interval cannot mix month and nanosecond units";
|
||||
assert_eq!(error_result_to_string(agg), expected);
|
||||
assert_eq!(agg.unwrap_err().to_string(), expected);
|
||||
|
||||
// invalid durations
|
||||
let agg = make_read_window_aggregate(
|
||||
|
@ -1312,7 +1331,7 @@ mod tests {
|
|||
Some(make_rpc_window(5, 0, false, 10, 1, false)),
|
||||
);
|
||||
let expected = "Error parsing window bounds duration \'window.offset\': duration used as an interval cannot mix month and nanosecond units";
|
||||
assert_eq!(error_result_to_string(agg), expected);
|
||||
assert_eq!(agg.unwrap_err().to_string(), expected);
|
||||
|
||||
// invalid durations
|
||||
let agg = make_read_window_aggregate(
|
||||
|
@ -1322,7 +1341,7 @@ mod tests {
|
|||
Some(make_rpc_window(0, 0, false, 5, 0, false)),
|
||||
);
|
||||
let expected = "Error parsing window bounds duration \'window.every\': duration used as an interval cannot be zero";
|
||||
assert_eq!(error_result_to_string(agg), expected);
|
||||
assert_eq!(agg.unwrap_err().to_string(), expected);
|
||||
}
|
||||
|
||||
#[test]
|
||||
|
@ -1330,7 +1349,7 @@ mod tests {
|
|||
assert_eq!(convert_group_type(0).unwrap(), RPCGroup::None);
|
||||
assert_eq!(convert_group_type(2).unwrap(), RPCGroup::By);
|
||||
assert_eq!(
|
||||
error_result_to_string(convert_group_type(1)),
|
||||
convert_group_type(1).unwrap_err().to_string(),
|
||||
"Error creating aggregate: Unknown group type: 1"
|
||||
);
|
||||
}
|
||||
|
@ -1371,7 +1390,9 @@ mod tests {
|
|||
QueryAggregate::Mean
|
||||
);
|
||||
assert_eq!(
|
||||
error_result_to_string(convert_aggregate(Some(make_aggregate(100)))),
|
||||
convert_aggregate(Some(make_aggregate(100)))
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
"Error creating aggregate: Unknown aggregate type 100"
|
||||
);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue