fix: optimize field columns for all-time predicates (#5046)

* fix: optimize field columns for all-time predicates

Also fix timestamp range to allow selecting points at MAX_NANO_TIME

* fix: clamp end to MIN_NANO_TIME for safety

* refactor: add contains_all method to TimestampRange
pull/24376/head
Sam Arnold 2022-07-06 08:01:28 -04:00 committed by GitHub
parent 2b527bbf64
commit e193913ed3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 217 additions and 95 deletions

View File

@ -2085,17 +2085,23 @@ impl TimestampRange {
///
/// Takes an inclusive start and an exclusive end. You may create an empty range by setting `start = end`.
///
/// Clamps to [`MIN_NANO_TIME`]/[`MAX_NANO_TIME`].
/// Clamps start to [`MIN_NANO_TIME`].
/// end is unclamped - end may be set to i64:MAX == MAX_NANO_TIME+1 to indicate no restriction on time.
///
/// # Panic
/// Panics if `start > end`.
pub fn new(start: i64, end: i64) -> Self {
assert!(end >= start, "start ({start}) > end ({end})");
let start = start.max(MIN_NANO_TIME);
let end = end.min(MAX_NANO_TIME);
let end = end.max(MIN_NANO_TIME);
Self { start, end }
}
/// Returns true if this range contains all representable timestamps
pub fn contains_all(&self) -> bool {
self.start <= MIN_NANO_TIME && self.end > MAX_NANO_TIME
}
#[inline]
/// Returns true if this range contains the value v
pub fn contains(&self, v: i64) -> bool {
@ -3104,8 +3110,8 @@ mod tests {
fn test_timestamp_nano_min_max() {
let cases = vec![
(
"MIN/MAX Nanos",
TimestampRange::new(MIN_NANO_TIME, MAX_NANO_TIME),
"MIN / MAX Nanos",
TimestampRange::new(MIN_NANO_TIME, MAX_NANO_TIME + 1),
),
("MIN/MAX i64", TimestampRange::new(i64::MIN, i64::MAX)),
];
@ -3113,11 +3119,13 @@ mod tests {
for (name, range) in cases {
println!("case: {}", name);
assert!(!range.contains(i64::MIN));
assert!(!range.contains(i64::MIN + 1));
assert!(range.contains(MIN_NANO_TIME));
assert!(range.contains(MIN_NANO_TIME + 1));
assert!(range.contains(MAX_NANO_TIME - 1));
assert!(!range.contains(MAX_NANO_TIME));
assert!(range.contains(MAX_NANO_TIME));
assert!(!range.contains(i64::MAX));
assert!(range.contains_all());
}
}
@ -3132,6 +3140,7 @@ mod tests {
assert!(!range.contains(MAX_NANO_TIME - 1));
assert!(!range.contains(MAX_NANO_TIME));
assert!(!range.contains(i64::MAX));
assert!(!range.contains_all());
}
#[test]

View File

@ -466,10 +466,13 @@ impl IOxSessionContext {
/// Executes `plan` and return the resulting FieldList on the query executor
pub async fn to_field_list(&self, plan: FieldListPlan) -> Result<FieldList> {
let FieldListPlan { plans } = plan;
let FieldListPlan {
known_values,
extra_plans,
} = plan;
// Run the plans in parallel
let handles = plans
let handles = extra_plans
.into_iter()
.map(|plan| {
let ctx = self.child_ctx("to_field_list");
@ -492,6 +495,12 @@ impl IOxSessionContext {
// collect them all up and combine them
let mut results = Vec::new();
if !known_values.is_empty() {
let list = known_values.into_iter().map(|f| f.1).collect();
results.push(FieldList { fields: list })
}
for join_handle in handles {
let fieldlist = join_handle.await?;

View File

@ -1,7 +1,10 @@
//! Query frontend for InfluxDB Storage gRPC requests
use crate::{
exec::{field::FieldColumns, make_non_null_checker, make_schema_pivot, IOxSessionContext},
exec::{
field::FieldColumns, fieldlist::Field, make_non_null_checker, make_schema_pivot,
IOxSessionContext,
},
frontend::common::ScanPlanBuilder,
plan::{
fieldlist::FieldListPlan,
@ -49,9 +52,6 @@ pub enum Error {
source: Box<dyn std::error::Error + Send + Sync>,
},
#[snafu(display("gRPC planner table {} not found", table_name))]
GettingTableSchema { table_name: String },
#[snafu(display("gRPC planner got error finding column values: {}", source))]
FindingColumnValues {
source: Box<dyn std::error::Error + Send + Sync>,
@ -354,7 +354,7 @@ impl InfluxRpcPlanner {
known_columns.extend(
database
.table_schema(table_name)
.context(GettingTableSchemaSnafu { table_name })?
.context(TableRemovedSnafu { table_name })?
.tags_iter()
.map(|f| f.name().clone()),
);
@ -670,9 +670,25 @@ impl InfluxRpcPlanner {
let table_predicates = rpc_predicate
.table_predicates(database.as_meta())
.context(CreatingPredicatesSnafu)?;
let mut field_list_plan = FieldListPlan::with_capacity(table_predicates.len());
let mut field_list_plan = FieldListPlan::new();
for (table_name, predicate) in &table_predicates {
if predicate.is_empty() {
// optimization: just get the field columns from metadata.
// note this both ignores field keys, and sets the timestamp data 'incorrectly'.
let schema = database
.table_schema(table_name)
.context(TableRemovedSnafu { table_name })?;
let fields = schema.fields_iter().map(|f| Field {
name: f.name().clone(),
data_type: f.data_type().clone(),
last_timestamp: 0,
});
for field in fields {
field_list_plan.append_field(field);
}
continue;
}
let chunks = database
.chunks(table_name, predicate)
.await
@ -694,7 +710,7 @@ impl InfluxRpcPlanner {
chunks,
)?;
field_list_plan = field_list_plan.append(plan);
field_list_plan = field_list_plan.append_other(plan.into());
}
Ok(field_list_plan)

View File

@ -1,10 +1,40 @@
use crate::exec::fieldlist::Field;
use datafusion::logical_plan::LogicalPlan;
use std::collections::BTreeMap;
pub type FieldSet = BTreeMap<String, Field>;
/// A plan which produces a logical set of Fields (e.g. InfluxDB
/// Fields with name, and data type, and last_timestamp).
///
/// known_values has a set of pre-computed values to be merged with
/// the extra_plans.
#[derive(Debug, Default)]
pub struct FieldListPlan {
pub plans: Vec<LogicalPlan>,
/// Known values
pub known_values: FieldSet,
/// General plans
pub extra_plans: Vec<LogicalPlan>,
}
impl From<Vec<LogicalPlan>> for FieldListPlan {
/// Create FieldList plan from a DataFusion LogicalPlan node, each
/// of which must produce fields in the correct format. The output
/// of each plan will be included into the final set.
fn from(plans: Vec<LogicalPlan>) -> Self {
Self {
known_values: FieldSet::new(),
extra_plans: plans,
}
}
}
impl From<LogicalPlan> for FieldListPlan {
/// Create a StringSet plan from a single DataFusion LogicalPlan
/// node, which must produce fields in the correct format
fn from(plan: LogicalPlan) -> Self {
Self::from(vec![plan])
}
}
impl FieldListPlan {
@ -12,15 +42,15 @@ impl FieldListPlan {
Self::default()
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
plans: Vec::with_capacity(capacity),
}
}
/// Append a new plan to this list of plans
pub fn append(mut self, plan: LogicalPlan) -> Self {
self.plans.push(plan);
/// Append the other plan to ourselves
pub fn append_other(mut self, other: Self) -> Self {
self.extra_plans.extend(other.extra_plans.into_iter());
self.known_values.extend(other.known_values.into_iter());
self
}
/// Append a single field to the known set of fields in this builder
pub fn append_field(&mut self, s: Field) {
self.known_values.insert(s.name.clone(), s);
}
}

View File

@ -19,7 +19,7 @@ use arrow::{
},
datatypes::SchemaRef,
};
use data_types::{InfluxDbType, TableSummary, TimestampRange, MAX_NANO_TIME, MIN_NANO_TIME};
use data_types::{InfluxDbType, TableSummary, TimestampRange};
use datafusion::{
error::DataFusionError,
logical_expr::{binary_expr, utils::expr_to_columns},
@ -220,7 +220,7 @@ impl Predicate {
/// existing storage engine
pub(crate) fn with_clear_timestamp_if_max_range(mut self) -> Self {
self.range = self.range.take().and_then(|range| {
if range.start() <= MIN_NANO_TIME && range.end() >= MAX_NANO_TIME {
if range.contains_all() {
None
} else {
Some(range)
@ -604,7 +604,7 @@ impl From<ValueExpr> for Expr {
mod tests {
use super::*;
use arrow::datatypes::DataType as ArrowDataType;
use data_types::{ColumnSummary, InfluxDbType, StatValues};
use data_types::{ColumnSummary, InfluxDbType, StatValues, MAX_NANO_TIME, MIN_NANO_TIME};
use datafusion::logical_plan::{col, lit};
use schema::builder::SchemaBuilder;
use test_helpers::maybe_start_logging;
@ -770,7 +770,7 @@ mod tests {
#[test]
fn test_clear_timestamp_if_max_range_out_of_range_high() {
let p = Predicate::new()
.with_range(0, MAX_NANO_TIME)
.with_range(0, MAX_NANO_TIME + 1)
.with_expr(col("foo").eq(lit(42)));
let expected = p.clone();
@ -782,7 +782,7 @@ mod tests {
#[test]
fn test_clear_timestamp_if_max_range_in_range() {
let p = Predicate::new()
.with_range(MIN_NANO_TIME, MAX_NANO_TIME)
.with_range(MIN_NANO_TIME, MAX_NANO_TIME + 1)
.with_expr(col("foo").eq(lit(42)));
let expected = Predicate::new().with_expr(col("foo").eq(lit(42)));

View File

@ -90,7 +90,9 @@ async fn test_field_columns_with_pred() {
#[tokio::test]
async fn test_field_columns_measurement_pred() {
// get only fields from h2o using a _measurement predicate
let predicate = Predicate::default().with_expr(col("_measurement").eq(lit("h2o")));
let predicate = Predicate::default()
.with_expr(col("_measurement").eq(lit("h2o")))
.with_range(i64::MIN, i64::MAX - 1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList {
@ -116,6 +118,36 @@ async fn test_field_columns_measurement_pred() {
run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await;
}
#[tokio::test]
async fn test_field_columns_measurement_pred_all_time() {
// get only fields from h2o using a _measurement predicate
let predicate = Predicate::default().with_expr(col("_measurement").eq(lit("h2o")));
let predicate = InfluxRpcPredicate::new(None, predicate);
// optimized all-time case returns zero last_timestamp
let expected_fields = FieldList {
fields: vec![
Field {
name: "moisture".into(),
data_type: DataType::Float64,
last_timestamp: 0,
},
Field {
name: "other_temp".into(),
data_type: DataType::Float64,
last_timestamp: 0,
},
Field {
name: "temp".into(),
data_type: DataType::Float64,
last_timestamp: 0,
},
],
};
run_field_columns_test_case(TwoMeasurementsManyFields {}, predicate, expected_fields).await;
}
#[tokio::test]
async fn test_field_columns_with_ts_pred() {
let predicate = Predicate::default()
@ -205,8 +237,72 @@ async fn test_field_name_plan_with_delete() {
}
#[tokio::test]
async fn list_field_columns_max_time() {
let predicate = Predicate::default().with_range(MIN_NANO_TIME, MAX_NANO_TIME);
async fn test_field_name_plan_with_delete_all_time() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default();
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList {
fields: vec![
Field {
name: "field1".into(),
data_type: DataType::Float64,
last_timestamp: 0, // all time queries are optimized but do not return timestamps
},
Field {
name: "field2".into(),
data_type: DataType::Utf8,
last_timestamp: 0,
},
Field {
name: "field3".into(),
data_type: DataType::Float64,
last_timestamp: 0,
},
Field {
name: "field4".into(),
data_type: DataType::Boolean,
last_timestamp: 0,
},
Field {
name: "field5".into(),
data_type: DataType::Boolean,
last_timestamp: 0,
},
],
};
run_field_columns_test_case(
OneMeasurementManyFieldsWithDelete {},
predicate,
expected_fields,
)
.await;
}
#[tokio::test]
async fn list_field_columns_all_time() {
let predicate = Predicate::default().with_range(MIN_NANO_TIME, i64::MAX);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList {
fields: vec![Field {
name: "value".into(),
data_type: DataType::Float64,
last_timestamp: 0, // we hit the optimized case that ignores timestamps
}],
};
run_field_columns_test_case(MeasurementWithMaxTime {}, predicate, expected_fields).await;
}
#[tokio::test]
async fn list_field_columns_max_time_included() {
// if the range started at i64:MIN, we would hit the optimized case for 'all time'
// and get the 'wrong' timestamp, since in the optimized case we don't check what timestamps
// exist
let predicate = Predicate::default().with_range(MIN_NANO_TIME + 1, MAX_NANO_TIME + 1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList {
@ -221,36 +317,10 @@ async fn list_field_columns_max_time() {
}
#[tokio::test]
async fn list_field_columns_max_i64() {
let predicate = Predicate::default().with_range(i64::MIN, i64::MAX);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList {
fields: vec![Field {
name: "value".into(),
data_type: DataType::Float64,
last_timestamp: MAX_NANO_TIME,
}],
};
run_field_columns_test_case(MeasurementWithMaxTime {}, predicate, expected_fields).await;
}
#[tokio::test]
async fn list_field_columns_max_time_less_one() {
async fn list_field_columns_max_time_excluded() {
let predicate = Predicate::default()
// one less than max timestamp
.with_range(MIN_NANO_TIME, MAX_NANO_TIME - 1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList { fields: vec![] };
run_field_columns_test_case(MeasurementWithMaxTime {}, predicate, expected_fields).await;
}
#[tokio::test]
async fn list_field_columns_max_time_greater_one() {
let predicate = Predicate::default().with_range(MIN_NANO_TIME + 1, MAX_NANO_TIME);
.with_range(MIN_NANO_TIME + 1, MAX_NANO_TIME);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_fields = FieldList { fields: vec![] };

View File

@ -201,47 +201,36 @@ async fn list_table_names_data_pred_250_300_with_delete_all() {
}
#[tokio::test]
async fn list_table_names_max_time() {
async fn list_table_names_max_time_included() {
run_table_names_test_case(
MeasurementWithMaxTime {},
tsp(MIN_NANO_TIME, MAX_NANO_TIME),
tsp(MIN_NANO_TIME + 1, MAX_NANO_TIME + 1),
vec!["cpu"],
)
.await;
}
#[tokio::test]
async fn list_table_names_max_i64() {
async fn list_table_names_max_time_excluded() {
run_table_names_test_case(
MeasurementWithMaxTime {},
// outside valid timestamp range
tsp(i64::MIN, i64::MAX),
vec!["cpu"],
)
.await;
}
#[tokio::test]
async fn list_table_names_time_less_one() {
run_table_names_test_case(
MeasurementWithMaxTime {},
tsp(MIN_NANO_TIME, MAX_NANO_TIME - 1),
vec![],
)
.await;
}
#[tokio::test]
async fn list_table_names_max_time_greater_one() {
run_table_names_test_case(
MeasurementWithMaxTime {},
// one more than max timestamp
tsp(MIN_NANO_TIME + 1, MAX_NANO_TIME),
vec![],
)
.await;
}
#[tokio::test]
async fn list_table_names_all_time() {
run_table_names_test_case(
MeasurementWithMaxTime {},
tsp(MIN_NANO_TIME, MAX_NANO_TIME + 1),
vec!["cpu"],
)
.await;
}
// Note when table names supports general purpose predicates, add a
// test here with a `_measurement` predicate
// https://github.com/influxdata/influxdb_iox/issues/762

View File

@ -190,38 +190,37 @@ async fn list_tag_name_end_to_end_with_delete() {
#[tokio::test]
async fn list_tag_name_max_time() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default().with_range(MIN_NANO_TIME, MAX_NANO_TIME);
let predicate = Predicate::default().with_range(MIN_NANO_TIME, i64::MAX);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec!["host"];
run_tag_keys_test_case(MeasurementWithMaxTime {}, predicate, expected_tag_keys).await;
}
#[tokio::test]
async fn list_tag_name_max_i64() {
async fn list_tag_name_all_time() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default()
// outside valid timestamp range
.with_range(i64::MIN, i64::MAX);
let predicate = Predicate::default().with_range(MIN_NANO_TIME, MAX_NANO_TIME + 1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec!["host"];
run_tag_keys_test_case(MeasurementWithMaxTime {}, predicate, expected_tag_keys).await;
}
#[tokio::test]
async fn list_tag_name_max_time_less_one() {
async fn list_tag_name_max_time_excluded() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default().with_range(MIN_NANO_TIME, MAX_NANO_TIME - 1); // one less than max timestamp
let predicate = Predicate::default().with_range(MIN_NANO_TIME + 1, MAX_NANO_TIME); // exclusive end
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec![];
run_tag_keys_test_case(MeasurementWithMaxTime {}, predicate, expected_tag_keys).await;
}
#[tokio::test]
async fn list_tag_name_max_time_greater_one() {
async fn list_tag_name_max_time_included() {
test_helpers::maybe_start_logging();
let predicate = Predicate::default().with_range(MIN_NANO_TIME + 1, MAX_NANO_TIME); // one more than min timestamp
// The predicate matters (since MIN_NANO_TIME would be filtered out) and so cannot be optimized away
let predicate = Predicate::default().with_range(MIN_NANO_TIME + 1, MAX_NANO_TIME + 1);
let predicate = InfluxRpcPredicate::new(None, predicate);
let expected_tag_keys = vec![];
let expected_tag_keys = vec!["host"];
run_tag_keys_test_case(MeasurementWithMaxTime {}, predicate, expected_tag_keys).await;
}

View File

@ -2908,7 +2908,7 @@ mod tests {
let request = MeasurementFieldsRequest {
source: source.clone(),
measurement: "TheMeasurement".into(),
range: None,
range: Some(make_timestamp_range(i64::MIN, i64::MAX - 1)),
predicate: None,
};