fix: better read_group input validation checking: group and hints fields (#539)
* fix: Error if hint argument is provided to read_groupg * fix: Verify compatible group and group_keys settings * docs: Add clarifying comments on validation * refactor: use into() rather than String::from for consistencypull/24376/head
parent
ea6b2f6bc8
commit
d47acfa3b5
|
@ -38,6 +38,17 @@ pub enum Error {
|
|||
#[snafu(display("Error creating aggregate: Unknown group type: {}", group_type))]
|
||||
UnknownGroup { group_type: i32 },
|
||||
|
||||
#[snafu(display(
|
||||
"Incompatible read_group request: Group::None had {} group keys (expected 0)",
|
||||
num_group_keys
|
||||
))]
|
||||
InvalidGroupNone { num_group_keys: usize },
|
||||
|
||||
#[snafu(display(
|
||||
"Incompatible read_group request: Group::By had no group keys (expected at least 1)"
|
||||
))]
|
||||
InvalidGroupBy {},
|
||||
|
||||
#[snafu(display("Error creating predicate: Unexpected empty predicate: Node"))]
|
||||
EmptyPredicateNode {},
|
||||
|
||||
|
@ -473,9 +484,21 @@ fn build_binary_expr(op: Operator, inputs: Vec<Expr>) -> Result<Expr> {
|
|||
|
||||
pub fn make_read_group_aggregate(
|
||||
aggregate: Option<RPCAggregate>,
|
||||
_group: RPCGroup,
|
||||
group: RPCGroup,
|
||||
group_keys: Vec<String>,
|
||||
) -> Result<GroupByAndAggregate> {
|
||||
// validate Group setting
|
||||
match group {
|
||||
// Group:None is invalid if grouping keys are specified
|
||||
RPCGroup::None if !group_keys.is_empty() => InvalidGroupNone {
|
||||
num_group_keys: group_keys.len(),
|
||||
}
|
||||
.fail(),
|
||||
// Group:By is invalid if no grouping keys are specified
|
||||
RPCGroup::By if group_keys.is_empty() => InvalidGroupBy {}.fail(),
|
||||
_ => Ok(()),
|
||||
}?;
|
||||
|
||||
let gby_agg = GroupByAndAggregate::Columns {
|
||||
agg: convert_aggregate(aggregate)?,
|
||||
group_columns: group_keys,
|
||||
|
@ -1131,6 +1154,47 @@ mod tests {
|
|||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_make_read_group_aggregate() {
|
||||
assert_eq!(
|
||||
make_read_group_aggregate(Some(make_aggregate(1)), RPCGroup::None, vec![]).unwrap(),
|
||||
GroupByAndAggregate::Columns {
|
||||
agg: QueryAggregate::Sum,
|
||||
group_columns: vec![]
|
||||
}
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
make_read_group_aggregate(Some(make_aggregate(1)), RPCGroup::By, vec!["gcol".into()])
|
||||
.unwrap(),
|
||||
GroupByAndAggregate::Columns {
|
||||
agg: QueryAggregate::Sum,
|
||||
group_columns: vec!["gcol".into()]
|
||||
}
|
||||
);
|
||||
|
||||
// error cases
|
||||
assert_eq!(
|
||||
make_read_group_aggregate(None, RPCGroup::None, vec![])
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
"Error creating aggregate: Unexpected empty aggregate"
|
||||
);
|
||||
|
||||
assert_eq!(
|
||||
make_read_group_aggregate(Some(make_aggregate(1)), RPCGroup::None, vec!["gcol".into()])
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
"Incompatible read_group request: Group::None had 1 group keys (expected 0)"
|
||||
);
|
||||
assert_eq!(
|
||||
make_read_group_aggregate(Some(make_aggregate(1)), RPCGroup::By, vec![])
|
||||
.unwrap_err()
|
||||
.to_string(),
|
||||
"Incompatible read_group request: Group::By had no group keys (expected at least 1)"
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_make_read_window_aggregate() {
|
||||
let pos_5_ns = WindowDuration::from_nanoseconds(5);
|
||||
|
|
|
@ -166,6 +166,12 @@ pub enum Error {
|
|||
source: Box<dyn std::error::Error + Send + Sync>,
|
||||
},
|
||||
|
||||
#[snafu(display(
|
||||
"Unexpected hint value on read_group request. Expected 0, got {}",
|
||||
hints
|
||||
))]
|
||||
InternalHintsFieldNotSupported { hints: u32 },
|
||||
|
||||
#[snafu(display("Operation not yet implemented: {}", operation))]
|
||||
NotYetImplemented { operation: String },
|
||||
}
|
||||
|
@ -210,6 +216,7 @@ impl Error {
|
|||
Self::ConvertingSeriesSet { .. } => Status::invalid_argument(self.to_string()),
|
||||
Self::ConvertingFieldList { .. } => Status::invalid_argument(self.to_string()),
|
||||
Self::SendingResults { .. } => Status::internal(self.to_string()),
|
||||
Self::InternalHintsFieldNotSupported { .. } => Status::internal(self.to_string()),
|
||||
Self::NotYetImplemented { .. } => Status::internal(self.to_string()),
|
||||
}
|
||||
}
|
||||
|
@ -310,7 +317,7 @@ where
|
|||
group_keys,
|
||||
group,
|
||||
aggregate,
|
||||
hints: _,
|
||||
hints,
|
||||
} = read_group_request;
|
||||
|
||||
info!(
|
||||
|
@ -319,6 +326,10 @@ where
|
|||
predicate.loggable()
|
||||
);
|
||||
|
||||
if hints != 0 {
|
||||
InternalHintsFieldNotSupported { hints }.fail()?
|
||||
}
|
||||
|
||||
warn!("read_group implementation not yet complete: https://github.com/influxdata/influxdb_iox/issues/448");
|
||||
|
||||
let aggregate_string = format!(
|
||||
|
@ -1858,13 +1869,13 @@ mod tests {
|
|||
partition_id,
|
||||
));
|
||||
|
||||
let group = generated_types::read_group_request::Group::None as i32;
|
||||
let group = generated_types::read_group_request::Group::By as i32;
|
||||
|
||||
let request = ReadGroupRequest {
|
||||
read_source: source.clone(),
|
||||
range: make_timestamp_range(150, 200),
|
||||
predicate: make_state_ma_predicate(),
|
||||
group_keys: vec![String::from("tag1")],
|
||||
group_keys: vec!["tag1".into()],
|
||||
group,
|
||||
aggregate: Some(RPCAggregate {
|
||||
r#type: AggregateType::Sum as i32,
|
||||
|
@ -1876,7 +1887,7 @@ mod tests {
|
|||
predicate: "Predicate { exprs: [#state Eq Utf8(\"MA\")] range: TimestampRange { start: 150, end: 200 }}".into(),
|
||||
gby_agg: GroupByAndAggregate::Columns {
|
||||
agg: QueryAggregate::Sum,
|
||||
group_columns: vec![String::from("tag1")],
|
||||
group_columns: vec!["tag1".into()],
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -1898,13 +1909,43 @@ mod tests {
|
|||
);
|
||||
|
||||
// ---
|
||||
// test error
|
||||
// test error hit in request processing
|
||||
// ---
|
||||
let request = ReadGroupRequest {
|
||||
read_source: source.clone(),
|
||||
range: None,
|
||||
predicate: None,
|
||||
group_keys: vec![],
|
||||
group_keys: vec!["tag1".into()],
|
||||
group,
|
||||
aggregate: Some(RPCAggregate {
|
||||
r#type: AggregateType::Sum as i32,
|
||||
}),
|
||||
hints: 42,
|
||||
};
|
||||
|
||||
let response = fixture.storage_client.read_group(request).await;
|
||||
assert!(response.is_err());
|
||||
let response_string = format!("{:?}", response);
|
||||
let expected_error = "Unexpected hint value on read_group request. Expected 0, got 42";
|
||||
assert!(
|
||||
response_string.contains(expected_error),
|
||||
"'{}' did not contain expected content '{}'",
|
||||
response_string,
|
||||
expected_error
|
||||
);
|
||||
|
||||
// Errored out in gRPC and never got to database layer
|
||||
let expected_request: Option<QueryGroupsRequest> = None;
|
||||
assert_eq!(test_db.get_query_groups_request().await, expected_request);
|
||||
|
||||
// ---
|
||||
// test error returned in database processing
|
||||
// ---
|
||||
let request = ReadGroupRequest {
|
||||
read_source: source.clone(),
|
||||
range: None,
|
||||
predicate: None,
|
||||
group_keys: vec!["tag1".into()],
|
||||
group,
|
||||
aggregate: Some(RPCAggregate {
|
||||
r#type: AggregateType::Sum as i32,
|
||||
|
@ -1928,7 +1969,7 @@ mod tests {
|
|||
predicate: "Predicate {}".into(),
|
||||
gby_agg: GroupByAndAggregate::Columns {
|
||||
agg: QueryAggregate::Sum,
|
||||
group_columns: vec![],
|
||||
group_columns: vec!["tag1".into()],
|
||||
},
|
||||
});
|
||||
assert_eq!(test_db.get_query_groups_request().await, expected_request);
|
||||
|
@ -2117,7 +2158,7 @@ mod tests {
|
|||
|
||||
let request = MeasurementFieldsRequest {
|
||||
source: source.clone(),
|
||||
measurement: String::from("TheMeasurement"),
|
||||
measurement: "TheMeasurement".into(),
|
||||
range: make_timestamp_range(150, 200),
|
||||
predicate: make_state_ma_predicate(),
|
||||
};
|
||||
|
@ -2155,7 +2196,7 @@ mod tests {
|
|||
// ---
|
||||
let request = MeasurementFieldsRequest {
|
||||
source: source.clone(),
|
||||
measurement: String::from("TheMeasurement"),
|
||||
measurement: "TheMeasurement".into(),
|
||||
range: None,
|
||||
predicate: None,
|
||||
};
|
||||
|
|
Loading…
Reference in New Issue