diff --git a/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_group.rs b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_group.rs index 434be2db89..713f1a9fc2 100644 --- a/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_group.rs +++ b/influxdb_iox/tests/end_to_end_cases/querier/influxrpc/read_group.rs @@ -207,6 +207,90 @@ async fn test_read_group_periods() { .await } +#[tokio::test] +async fn test_group_key_not_found() { + do_test_invalid_group_key(InvalidGroupKey::ColNotFound).await; +} + +#[tokio::test] +async fn test_not_a_tag() { + do_test_invalid_group_key(InvalidGroupKey::NotATag).await; +} + +#[tokio::test] +async fn test_duplicate_group_keys() { + do_test_invalid_group_key(InvalidGroupKey::DuplicateKeys).await; +} + +#[tokio::test] +async fn test_group_by_time() { + do_test_invalid_group_key(InvalidGroupKey::Time).await; +} + +enum InvalidGroupKey { + ColNotFound, + NotATag, + DuplicateKeys, + Time, +} + +async fn do_test_invalid_group_key(variant: InvalidGroupKey) { + test_helpers::maybe_start_logging(); + let database_url = maybe_skip_integration!(); + + // Set up the cluster ==================================== + let mut cluster = MiniCluster::create_shared(database_url).await; + + StepTest::new( + &mut cluster, + vec![ + Step::WriteLineProtocol("measurement,tag=foo field=1 1000".to_string()), + Step::WaitForReadable, + Step::Custom(Box::new(move |state: &mut StepTestState| { + async move { + let grpc_connection = state + .cluster() + .querier() + .querier_grpc_connection() + .into_grpc_connection(); + let mut storage_client = StorageClient::new(grpc_connection); + + let group_keys = match variant { + InvalidGroupKey::ColNotFound => ["tag", "unknown_tag"], + InvalidGroupKey::NotATag => ["tag", "field"], + InvalidGroupKey::DuplicateKeys => ["tag", "tag"], + InvalidGroupKey::Time => ["tag", "time"], + }; + + let read_group_request = GrpcRequestBuilder::new() + .timestamp_range(0, 2000) + .field_predicate("field") + .group_keys(group_keys) + .group(Group::By) + .aggregate_type(AggregateType::Last) + .source(state.cluster()) + .build_read_group(); + + let status = storage_client + .read_group(read_group_request) + .await + .unwrap_err(); + assert_eq!( + status.code(), + tonic::Code::InvalidArgument, + "Wrong status code: {}\n\nStatus:\n{}", + status.code(), + status, + ); + } + .boxed() + })), + ], + ) + .run() + .await +} + /// Sends the specified line protocol to a server, runs a read_grou /// gRPC request, and compares it against expected frames async fn do_read_group_test( diff --git a/iox_query/src/exec/seriesset/converter.rs b/iox_query/src/exec/seriesset/converter.rs index 1fb7d89d8e..81a1c7a9a2 100644 --- a/iox_query/src/exec/seriesset/converter.rs +++ b/iox_query/src/exec/seriesset/converter.rs @@ -12,6 +12,7 @@ use arrow::{ use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream}; use futures::{ready, Stream, StreamExt, TryStreamExt}; +use predicate::rpc_predicate::{GROUP_KEY_SPECIAL_START, GROUP_KEY_SPECIAL_STOP}; use snafu::{OptionExt, Snafu}; use std::{ collections::VecDeque, @@ -515,8 +516,7 @@ impl GroupGenerator { }) }) .try_collect::>() - .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; + .await?; // Potential optimization is to skip this sort if we are // grouping by a prefix of the tags for a single measurement @@ -638,7 +638,9 @@ impl SortableSeries { // treat these specially and use value "" to mirror what TSM does // see https://github.com/influxdata/influxdb_iox/issues/2693#issuecomment-947695442 // for more details - if col.as_ref() == "_start" || col.as_ref() == "_stop" { + if col.as_ref() == GROUP_KEY_SPECIAL_START + || col.as_ref() == GROUP_KEY_SPECIAL_STOP + { Some(Arc::from("")) } else { None diff --git a/iox_query/src/frontend/influxrpc.rs b/iox_query/src/frontend/influxrpc.rs index f17fd25485..21d793fbee 100644 --- a/iox_query/src/frontend/influxrpc.rs +++ b/iox_query/src/frontend/influxrpc.rs @@ -25,7 +25,13 @@ use datafusion_util::AsExpr; use futures::{Stream, StreamExt, TryStreamExt}; use hashbrown::HashSet; use observability_deps::tracing::{debug, trace, warn}; -use predicate::{rpc_predicate::InfluxRpcPredicate, Predicate, PredicateMatch}; +use predicate::{ + rpc_predicate::{ + InfluxRpcPredicate, FIELD_COLUMN_NAME, GROUP_KEY_SPECIAL_START, GROUP_KEY_SPECIAL_STOP, + MEASUREMENT_COLUMN_NAME, + }, + Predicate, PredicateMatch, +}; use query_functions::{ group_by::{Aggregate, WindowDuration}, make_window_bound_expr, @@ -104,13 +110,13 @@ pub enum Error { DuplicateGroupColumn { column_name: String }, #[snafu(display( - "Group column '{}' not found in tag columns: {}", + "Group column '{}' not found in tag columns: {:?}", column_name, all_tag_column_names ))] GroupColumnNotFound { column_name: String, - all_tag_column_names: String, + all_tag_column_names: Vec, }, #[snafu(display("Error creating aggregate expression: {}", source))] @@ -160,12 +166,12 @@ impl Error { | Self::CastingAggregates { source, .. } => { DataFusionError::Context(format!("{method}: {msg}"), Box::new(source)) } - e @ (Self::CreatingStringSet { .. } - | Self::TableRemoved { .. } + Self::TableRemoved { .. } | Self::InvalidTagColumn { .. } - | Self::InternalInvalidTagType { .. } | Self::DuplicateGroupColumn { .. } - | Self::GroupColumnNotFound { .. } + | Self::GroupColumnNotFound { .. } => DataFusionError::Plan(msg), + e @ (Self::CreatingStringSet { .. } + | Self::InternalInvalidTagType { .. } | Self::CreatingAggregates { .. } | Self::CreatingScan { .. } | Self::InternalUnexpectedNoneAggregate {} @@ -822,36 +828,78 @@ impl InfluxRpcPlanner { .table_predicates(namespace.as_meta()) .context(CreatingPredicatesSnafu)?; - let plans = create_plans( - namespace, - &table_predicates, - ctx, - |ctx, table_name, predicate, chunks, schema| match agg { - Aggregate::None => Self::read_filter_plan( - ctx.child_ctx("read_filter plan"), - table_name, - Arc::clone(&schema), - predicate, - chunks, - ), - _ => Self::read_group_plan( - ctx.child_ctx("read_group plan"), - table_name, - schema, - predicate, - agg, - chunks, - ), - }, - ) - .await?; - // Note always group (which will resort the frames) // by tag, even if there are 0 columns let group_columns = group_columns .iter() .map(|s| Arc::from(s.as_ref())) - .collect(); + .collect::>>(); + let mut group_columns_set: HashSet> = HashSet::with_capacity(group_columns.len()); + for group_col in &group_columns { + match group_columns_set.entry(Arc::clone(group_col)) { + hashbrown::hash_set::Entry::Occupied(_) => { + return Err(Error::DuplicateGroupColumn { + column_name: group_col.to_string(), + }); + } + hashbrown::hash_set::Entry::Vacant(v) => { + v.insert(); + } + } + } + + let plans = create_plans( + namespace, + &table_predicates, + ctx, + |ctx, table_name, predicate, chunks, schema| { + // check group_columns for unknown columns + let known_tags_vec = schema + .tags_iter() + .map(|f| f.name().clone()) + .collect::>(); + let known_tags_set = known_tags_vec + .iter() + .map(|s| s.as_str()) + .collect::>(); + for group_col in &group_columns { + if (group_col.as_ref() == FIELD_COLUMN_NAME) + || (group_col.as_ref() == MEASUREMENT_COLUMN_NAME) + || (group_col.as_ref() == GROUP_KEY_SPECIAL_START) + || (group_col.as_ref() == GROUP_KEY_SPECIAL_STOP) + { + continue; + } + + ensure!( + known_tags_set.contains(group_col.as_ref()), + GroupColumnNotFoundSnafu { + column_name: group_col.as_ref(), + all_tag_column_names: known_tags_vec.clone(), + } + ); + } + + match agg { + Aggregate::None => Self::read_filter_plan( + ctx.child_ctx("read_filter plan"), + table_name, + Arc::clone(&schema), + predicate, + chunks, + ), + _ => Self::read_group_plan( + ctx.child_ctx("read_group plan"), + table_name, + schema, + predicate, + agg, + chunks, + ), + } + }, + ) + .await?; Ok(SeriesSetPlans::new(plans).grouped_by(group_columns)) } diff --git a/predicate/src/rpc_predicate.rs b/predicate/src/rpc_predicate.rs index d8a8959b1d..cf47cbd01e 100644 --- a/predicate/src/rpc_predicate.rs +++ b/predicate/src/rpc_predicate.rs @@ -57,6 +57,24 @@ pub const FIELD_COLUMN_NAME: &str = "_field"; /// into multiple expressions (one for each field column). pub const VALUE_COLUMN_NAME: &str = "_value"; +/// Special group key for `read_group` requests. +/// +/// Treat these specially and use `""` as a placeholder value (instead of a real column) to mirror what TSM does. +/// See +/// for more details. +/// +/// See also [`GROUP_KEY_SPECIAL_STOP`]. +pub const GROUP_KEY_SPECIAL_START: &str = "_start"; + +/// Special group key for `read_group` requests. +/// +/// Treat these specially and use `""` as a placeholder value (instead of a real column) to mirror what TSM does. +/// See +/// for more details. +/// +/// See also [`GROUP_KEY_SPECIAL_START`]. +pub const GROUP_KEY_SPECIAL_STOP: &str = "_stop"; + /// [`InfluxRpcPredicate`] implements the semantics of the InfluxDB /// Storage gRPC and handles mapping details such as `_field` and /// `_measurement` predicates into the corresponding IOx structures.