fix: gRPC errors regarding group cols (#6314)

* fix: gRPC errors regarding group cols

- missing group col prev. produced an "internal error" but should be
  "invalid argument"
- duplicate group cols produced a panic but should also be "invalid
  argument"

* docs: clarify
pull/24376/head
Marco Neumann 2022-12-06 07:36:32 +00:00 committed by GitHub
parent cc9d4d302f
commit f62b270852
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 187 additions and 35 deletions

View File

@ -207,6 +207,90 @@ async fn test_read_group_periods() {
.await
}
#[tokio::test]
async fn test_group_key_not_found() {
do_test_invalid_group_key(InvalidGroupKey::ColNotFound).await;
}
#[tokio::test]
async fn test_not_a_tag() {
do_test_invalid_group_key(InvalidGroupKey::NotATag).await;
}
#[tokio::test]
async fn test_duplicate_group_keys() {
do_test_invalid_group_key(InvalidGroupKey::DuplicateKeys).await;
}
#[tokio::test]
async fn test_group_by_time() {
do_test_invalid_group_key(InvalidGroupKey::Time).await;
}
enum InvalidGroupKey {
ColNotFound,
NotATag,
DuplicateKeys,
Time,
}
async fn do_test_invalid_group_key(variant: InvalidGroupKey) {
test_helpers::maybe_start_logging();
let database_url = maybe_skip_integration!();
// Set up the cluster ====================================
let mut cluster = MiniCluster::create_shared(database_url).await;
StepTest::new(
&mut cluster,
vec![
Step::WriteLineProtocol("measurement,tag=foo field=1 1000".to_string()),
Step::WaitForReadable,
Step::Custom(Box::new(move |state: &mut StepTestState| {
async move {
let grpc_connection = state
.cluster()
.querier()
.querier_grpc_connection()
.into_grpc_connection();
let mut storage_client = StorageClient::new(grpc_connection);
let group_keys = match variant {
InvalidGroupKey::ColNotFound => ["tag", "unknown_tag"],
InvalidGroupKey::NotATag => ["tag", "field"],
InvalidGroupKey::DuplicateKeys => ["tag", "tag"],
InvalidGroupKey::Time => ["tag", "time"],
};
let read_group_request = GrpcRequestBuilder::new()
.timestamp_range(0, 2000)
.field_predicate("field")
.group_keys(group_keys)
.group(Group::By)
.aggregate_type(AggregateType::Last)
.source(state.cluster())
.build_read_group();
let status = storage_client
.read_group(read_group_request)
.await
.unwrap_err();
assert_eq!(
status.code(),
tonic::Code::InvalidArgument,
"Wrong status code: {}\n\nStatus:\n{}",
status.code(),
status,
);
}
.boxed()
})),
],
)
.run()
.await
}
/// Sends the specified line protocol to a server, runs a read_grou
/// gRPC request, and compares it against expected frames
async fn do_read_group_test(

View File

@ -12,6 +12,7 @@ use arrow::{
use datafusion::{error::DataFusionError, physical_plan::SendableRecordBatchStream};
use futures::{ready, Stream, StreamExt, TryStreamExt};
use predicate::rpc_predicate::{GROUP_KEY_SPECIAL_START, GROUP_KEY_SPECIAL_STOP};
use snafu::{OptionExt, Snafu};
use std::{
collections::VecDeque,
@ -515,8 +516,7 @@ impl GroupGenerator {
})
})
.try_collect::<Vec<_>>()
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
.await?;
// Potential optimization is to skip this sort if we are
// grouping by a prefix of the tags for a single measurement
@ -638,7 +638,9 @@ impl SortableSeries {
// treat these specially and use value "" to mirror what TSM does
// see https://github.com/influxdata/influxdb_iox/issues/2693#issuecomment-947695442
// for more details
if col.as_ref() == "_start" || col.as_ref() == "_stop" {
if col.as_ref() == GROUP_KEY_SPECIAL_START
|| col.as_ref() == GROUP_KEY_SPECIAL_STOP
{
Some(Arc::from(""))
} else {
None

View File

@ -25,7 +25,13 @@ use datafusion_util::AsExpr;
use futures::{Stream, StreamExt, TryStreamExt};
use hashbrown::HashSet;
use observability_deps::tracing::{debug, trace, warn};
use predicate::{rpc_predicate::InfluxRpcPredicate, Predicate, PredicateMatch};
use predicate::{
rpc_predicate::{
InfluxRpcPredicate, FIELD_COLUMN_NAME, GROUP_KEY_SPECIAL_START, GROUP_KEY_SPECIAL_STOP,
MEASUREMENT_COLUMN_NAME,
},
Predicate, PredicateMatch,
};
use query_functions::{
group_by::{Aggregate, WindowDuration},
make_window_bound_expr,
@ -104,13 +110,13 @@ pub enum Error {
DuplicateGroupColumn { column_name: String },
#[snafu(display(
"Group column '{}' not found in tag columns: {}",
"Group column '{}' not found in tag columns: {:?}",
column_name,
all_tag_column_names
))]
GroupColumnNotFound {
column_name: String,
all_tag_column_names: String,
all_tag_column_names: Vec<String>,
},
#[snafu(display("Error creating aggregate expression: {}", source))]
@ -160,12 +166,12 @@ impl Error {
| Self::CastingAggregates { source, .. } => {
DataFusionError::Context(format!("{method}: {msg}"), Box::new(source))
}
e @ (Self::CreatingStringSet { .. }
| Self::TableRemoved { .. }
Self::TableRemoved { .. }
| Self::InvalidTagColumn { .. }
| Self::InternalInvalidTagType { .. }
| Self::DuplicateGroupColumn { .. }
| Self::GroupColumnNotFound { .. }
| Self::GroupColumnNotFound { .. } => DataFusionError::Plan(msg),
e @ (Self::CreatingStringSet { .. }
| Self::InternalInvalidTagType { .. }
| Self::CreatingAggregates { .. }
| Self::CreatingScan { .. }
| Self::InternalUnexpectedNoneAggregate {}
@ -822,36 +828,78 @@ impl InfluxRpcPlanner {
.table_predicates(namespace.as_meta())
.context(CreatingPredicatesSnafu)?;
let plans = create_plans(
namespace,
&table_predicates,
ctx,
|ctx, table_name, predicate, chunks, schema| match agg {
Aggregate::None => Self::read_filter_plan(
ctx.child_ctx("read_filter plan"),
table_name,
Arc::clone(&schema),
predicate,
chunks,
),
_ => Self::read_group_plan(
ctx.child_ctx("read_group plan"),
table_name,
schema,
predicate,
agg,
chunks,
),
},
)
.await?;
// Note always group (which will resort the frames)
// by tag, even if there are 0 columns
let group_columns = group_columns
.iter()
.map(|s| Arc::from(s.as_ref()))
.collect();
.collect::<Vec<Arc<str>>>();
let mut group_columns_set: HashSet<Arc<str>> = HashSet::with_capacity(group_columns.len());
for group_col in &group_columns {
match group_columns_set.entry(Arc::clone(group_col)) {
hashbrown::hash_set::Entry::Occupied(_) => {
return Err(Error::DuplicateGroupColumn {
column_name: group_col.to_string(),
});
}
hashbrown::hash_set::Entry::Vacant(v) => {
v.insert();
}
}
}
let plans = create_plans(
namespace,
&table_predicates,
ctx,
|ctx, table_name, predicate, chunks, schema| {
// check group_columns for unknown columns
let known_tags_vec = schema
.tags_iter()
.map(|f| f.name().clone())
.collect::<Vec<_>>();
let known_tags_set = known_tags_vec
.iter()
.map(|s| s.as_str())
.collect::<HashSet<_>>();
for group_col in &group_columns {
if (group_col.as_ref() == FIELD_COLUMN_NAME)
|| (group_col.as_ref() == MEASUREMENT_COLUMN_NAME)
|| (group_col.as_ref() == GROUP_KEY_SPECIAL_START)
|| (group_col.as_ref() == GROUP_KEY_SPECIAL_STOP)
{
continue;
}
ensure!(
known_tags_set.contains(group_col.as_ref()),
GroupColumnNotFoundSnafu {
column_name: group_col.as_ref(),
all_tag_column_names: known_tags_vec.clone(),
}
);
}
match agg {
Aggregate::None => Self::read_filter_plan(
ctx.child_ctx("read_filter plan"),
table_name,
Arc::clone(&schema),
predicate,
chunks,
),
_ => Self::read_group_plan(
ctx.child_ctx("read_group plan"),
table_name,
schema,
predicate,
agg,
chunks,
),
}
},
)
.await?;
Ok(SeriesSetPlans::new(plans).grouped_by(group_columns))
}

View File

@ -57,6 +57,24 @@ pub const FIELD_COLUMN_NAME: &str = "_field";
/// into multiple expressions (one for each field column).
pub const VALUE_COLUMN_NAME: &str = "_value";
/// Special group key for `read_group` requests.
///
/// Treat these specially and use `""` as a placeholder value (instead of a real column) to mirror what TSM does.
/// See <https://github.com/influxdata/influxdb_iox/issues/2693#issuecomment-947695442>
/// for more details.
///
/// See also [`GROUP_KEY_SPECIAL_STOP`].
pub const GROUP_KEY_SPECIAL_START: &str = "_start";
/// Special group key for `read_group` requests.
///
/// Treat these specially and use `""` as a placeholder value (instead of a real column) to mirror what TSM does.
/// See <https://github.com/influxdata/influxdb_iox/issues/2693#issuecomment-947695442>
/// for more details.
///
/// See also [`GROUP_KEY_SPECIAL_START`].
pub const GROUP_KEY_SPECIAL_STOP: &str = "_stop";
/// [`InfluxRpcPredicate`] implements the semantics of the InfluxDB
/// Storage gRPC and handles mapping details such as `_field` and
/// `_measurement` predicates into the corresponding IOx structures.