feat: InfluxQl `SHOW TAG KEYS` planner+exec (#7451)

Closes https://github.com/influxdata/idpe/issues/17363 .
pull/24376/head
Marco Neumann 2023-04-17 15:31:33 +02:00 committed by GitHub
parent 808a13cf40
commit 87ecdc5eaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 418 additions and 9 deletions

View File

@ -22,10 +22,10 @@ async fn influxql_returns_error() {
{table_name},tag1=A,tag2=C val=43i 123457"
)),
Step::InfluxQLExpectingError {
query: "SHOW TAG KEYS".into(),
query: "SHOW TAG KEYS ON foo".into(),
expected_error_code: tonic::Code::InvalidArgument,
expected_message:
"Error while planning query: This feature is not implemented: SHOW TAG KEYS"
"Error while planning query: This feature is not implemented: SHOW TAG KEYS ON <database>"
.into(),
},
Step::InfluxQLExpectingError {

View File

@ -79,3 +79,21 @@ SHOW TAG VALUES ON my_db WITH KEY = "tag0";
SHOW TAG VALUES FROM x.my_db WITH KEY = "tag0";
SHOW TAG VALUES FROM x.y.my_db WITH KEY = "tag0";
SHOW TAG VALUES WITH KEY = "tag0" WHERE tag1 = "foo";
-- SHOW TAG KEYS
SHOW TAG KEYS;
SHOW TAG KEYS LIMIT 1;
SHOW TAG KEYS OFFSET 1;
SHOW TAG KEYS LIMIT 1 OFFSET 1;
SHOW TAG KEYS FROM cpu;
SHOW TAG KEYS FROM disk,cpu,disk;
SHOW TAG KEYS FROM cpu,disk,cpu;
SHOW TAG KEYS FROM /m.*/;
SHOW TAG KEYS FROM /d\isk/;
SHOW TAG KEYS FROM does_not_exist;
-- unimplemented features in `SHOW TAG KEYS`
SHOW TAG KEYS ON my_db;
SHOW TAG KEYS FROM x.my_db;
SHOW TAG KEYS FROM x.y.my_db;
SHOW TAG KEYS WHERE tag1 = "foo";

View File

@ -337,6 +337,12 @@ name: m2
| tag0 | val09 |
| tag0 | val10 |
+------+-------+
name: m3
+------+-------+
| key | value |
+------+-------+
| tag0 | a |
+------+-------+
-- InfluxQL: SHOW TAG VALUES WITH KEY = "does_not_exist";
-- Results After Sorting
-- InfluxQL: SHOW TAG VALUES WITH KEY != "tag0";
@ -366,6 +372,14 @@ name: m0
| tag1 | val10 |
| tag1 | |
+------+-------+
name: m3
+------+-------+
| key | value |
+------+-------+
| tag1 | b |
| tag2 | c |
| tag3 | d |
+------+-------+
name: m4
+---------+-------+
| key | value |
@ -424,6 +438,15 @@ name: m2
| tag0 | val09 |
| tag0 | val10 |
+------+-------+
name: m3
+------+-------+
| key | value |
+------+-------+
| tag0 | a |
| tag1 | b |
| tag2 | c |
| tag3 | d |
+------+-------+
name: m4
+---------+-------+
| key | value |
@ -464,6 +487,15 @@ name: m2
| tag0 | val09 |
| tag0 | val10 |
+------+-------+
name: m3
+------+-------+
| key | value |
+------+-------+
| tag0 | a |
| tag1 | b |
| tag2 | c |
| tag3 | d |
+------+-------+
name: m4
+---------+-------+
| key | value |
@ -530,6 +562,12 @@ name: m2
| tag0 | val00 |
| tag0 | val01 |
+------+-------+
name: m3
+------+-------+
| key | value |
+------+-------+
| tag0 | a |
+------+-------+
-- InfluxQL: SHOW TAG VALUES WITH KEY = "tag0" OFFSET 1;
-- Results After Sorting
name: m0
@ -649,6 +687,12 @@ name: m2
| tag0 | val09 |
| tag0 | val10 |
+------+-------+
name: m3
+------+-------+
| key | value |
+------+-------+
| tag0 | a |
+------+-------+
-- InfluxQL: SHOW TAG VALUES FROM /d\isk/ WITH KEY = "device";
-- Results After Sorting
name: disk
@ -668,4 +712,238 @@ Error while planning query: This feature is not implemented: retention policy in
-- InfluxQL: SHOW TAG VALUES FROM x.y.my_db WITH KEY = "tag0";
Error while planning query: This feature is not implemented: database name in from clause
-- InfluxQL: SHOW TAG VALUES WITH KEY = "tag0" WHERE tag1 = "foo";
Error while planning query: This feature is not implemented: SHOW TAG VALUES WHERE <condition>
Error while planning query: This feature is not implemented: SHOW TAG VALUES WHERE <condition>
-- InfluxQL: SHOW TAG KEYS;
name: cpu
+--------+
| tagKey |
+--------+
| cpu |
| host |
+--------+
name: disk
+--------+
| tagKey |
+--------+
| device |
| host |
+--------+
name: m0
+--------+
| tagKey |
+--------+
| tag0 |
| tag1 |
+--------+
name: m1
+--------+
| tagKey |
+--------+
| tag0 |
+--------+
name: m2
+--------+
| tagKey |
+--------+
| tag0 |
+--------+
name: m3
+--------+
| tagKey |
+--------+
| tag0 |
| tag1 |
| tag2 |
| tag3 |
+--------+
name: m4
+---------+
| tagKey |
+---------+
| tag.one |
+---------+
-- InfluxQL: SHOW TAG KEYS LIMIT 1;
name: cpu
+--------+
| tagKey |
+--------+
| cpu |
+--------+
name: disk
+--------+
| tagKey |
+--------+
| device |
+--------+
name: m0
+--------+
| tagKey |
+--------+
| tag0 |
+--------+
name: m1
+--------+
| tagKey |
+--------+
| tag0 |
+--------+
name: m2
+--------+
| tagKey |
+--------+
| tag0 |
+--------+
name: m3
+--------+
| tagKey |
+--------+
| tag0 |
+--------+
name: m4
+---------+
| tagKey |
+---------+
| tag.one |
+---------+
-- InfluxQL: SHOW TAG KEYS OFFSET 1;
name: cpu
+--------+
| tagKey |
+--------+
| host |
+--------+
name: disk
+--------+
| tagKey |
+--------+
| host |
+--------+
name: m0
+--------+
| tagKey |
+--------+
| tag1 |
+--------+
name: m3
+--------+
| tagKey |
+--------+
| tag1 |
| tag2 |
| tag3 |
+--------+
-- InfluxQL: SHOW TAG KEYS LIMIT 1 OFFSET 1;
name: cpu
+--------+
| tagKey |
+--------+
| host |
+--------+
name: disk
+--------+
| tagKey |
+--------+
| host |
+--------+
name: m0
+--------+
| tagKey |
+--------+
| tag1 |
+--------+
name: m3
+--------+
| tagKey |
+--------+
| tag1 |
+--------+
-- InfluxQL: SHOW TAG KEYS FROM cpu;
name: cpu
+--------+
| tagKey |
+--------+
| cpu |
| host |
+--------+
-- InfluxQL: SHOW TAG KEYS FROM disk,cpu,disk;
name: cpu
+--------+
| tagKey |
+--------+
| cpu |
| host |
+--------+
name: disk
+--------+
| tagKey |
+--------+
| device |
| host |
+--------+
-- InfluxQL: SHOW TAG KEYS FROM cpu,disk,cpu;
name: cpu
+--------+
| tagKey |
+--------+
| cpu |
| host |
+--------+
name: disk
+--------+
| tagKey |
+--------+
| device |
| host |
+--------+
-- InfluxQL: SHOW TAG KEYS FROM /m.*/;
name: m0
+--------+
| tagKey |
+--------+
| tag0 |
| tag1 |
+--------+
name: m1
+--------+
| tagKey |
+--------+
| tag0 |
+--------+
name: m2
+--------+
| tagKey |
+--------+
| tag0 |
+--------+
name: m3
+--------+
| tagKey |
+--------+
| tag0 |
| tag1 |
| tag2 |
| tag3 |
+--------+
name: m4
+---------+
| tagKey |
+---------+
| tag.one |
+---------+
-- InfluxQL: SHOW TAG KEYS FROM /d\isk/;
name: disk
+--------+
| tagKey |
+--------+
| device |
| host |
+--------+
-- InfluxQL: SHOW TAG KEYS FROM does_not_exist;
-- InfluxQL: SHOW TAG KEYS ON my_db;
Error while planning query: This feature is not implemented: SHOW TAG KEYS ON <database>
-- InfluxQL: SHOW TAG KEYS FROM x.my_db;
Error while planning query: This feature is not implemented: retention policy in from clause
-- InfluxQL: SHOW TAG KEYS FROM x.y.my_db;
Error while planning query: This feature is not implemented: database name in from clause
-- InfluxQL: SHOW TAG KEYS WHERE tag1 = "foo";
Error while planning query: This feature is not implemented: SHOW TAG KEYS WHERE <condition>

View File

@ -1256,7 +1256,7 @@ pub static SETUPS: Lazy<HashMap<SetupName, SetupSteps>> = Lazy::new(|| {
m2,tag0=val08 f64=7.98 1667181600000000000
m2,tag0=val07 f64=8.98 1667181600000000000
m2,tag0=val04 f64=9.98 1667181600000000000
m3 u64=1u 1667181600000000000
m3,tag0=a,tag1=b,tag2=c,tag3=d u64=1u 1667181600000000000
m4,tag.one=foo field.one=1 1667181600000000000
"#
.to_string(),

View File

@ -1,6 +1,7 @@
use arrow::datatypes::SchemaRef;
use influxdb_influxql_parser::show_field_keys::ShowFieldKeysStatement;
use influxdb_influxql_parser::show_measurements::ShowMeasurementsStatement;
use influxdb_influxql_parser::show_tag_keys::ShowTagKeysStatement;
use influxdb_influxql_parser::show_tag_values::ShowTagValuesStatement;
use std::any::Any;
use std::collections::{HashMap, HashSet};
@ -282,6 +283,17 @@ fn find_all_measurements(stmt: &Statement, tables: &[String]) -> Result<HashSet<
Ok(self)
}
fn post_visit_show_tag_keys_statement(
self,
stk: &ShowTagKeysStatement,
) -> std::result::Result<Self, Self::Error> {
if stk.from.is_none() {
self.0.extend(self.1.iter().cloned());
}
Ok(self)
}
}
let mut m = HashSet::new();
@ -365,6 +377,10 @@ mod test {
vec!["foo", "foobar"]
);
// Find all measurements in `SHOW TAG KEYS`
assert_eq!(find("SHOW TAG KEYS"), vec!["bar", "foo", "foobar"]);
assert_eq!(find("SHOW TAG KEYS FROM /^foo/"), vec!["foo", "foobar"]);
// Finds no measurements
assert!(find("SELECT * FROM none").is_empty());
assert!(find("SELECT * FROM (SELECT * FROM none)").is_empty());

View File

@ -47,6 +47,7 @@ use influxdb_influxql_parser::show_field_keys::ShowFieldKeysStatement;
use influxdb_influxql_parser::show_measurements::{
ShowMeasurementsStatement, WithMeasurementClause,
};
use influxdb_influxql_parser::show_tag_keys::ShowTagKeysStatement;
use influxdb_influxql_parser::show_tag_values::{ShowTagValuesStatement, WithKeyClause};
use influxdb_influxql_parser::simple_from_clause::ShowFromClause;
use influxdb_influxql_parser::{
@ -203,9 +204,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
Statement::ShowRetentionPolicies(_) => Err(DataFusionError::NotImplemented(
"SHOW RETENTION POLICIES".into(),
)),
Statement::ShowTagKeys(_) => {
Err(DataFusionError::NotImplemented("SHOW TAG KEYS".into()))
}
Statement::ShowTagKeys(show_tag_keys) => self.show_tag_keys_to_plan(*show_tag_keys),
Statement::ShowTagValues(show_tag_values) => {
self.show_tag_values_to_plan(*show_tag_values)
}
@ -1270,6 +1269,82 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
}
}
fn show_tag_keys_to_plan(&self, show_tag_keys: ShowTagKeysStatement) -> Result<LogicalPlan> {
if show_tag_keys.database.is_some() {
// How do we handle this? Do we need to perform cross-namespace queries here?
return Err(DataFusionError::NotImplemented(
"SHOW TAG KEYS ON <database>".into(),
));
}
if show_tag_keys.condition.is_some() {
return Err(DataFusionError::NotImplemented(
"SHOW TAG KEYS WHERE <condition>".into(),
));
}
let tag_key_col = "tagKey";
let output_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(
INFLUXQL_MEASUREMENT_COLUMN_NAME,
(&InfluxColumnType::Tag).into(),
false,
),
ArrowField::new(tag_key_col, (&InfluxColumnType::Tag).into(), false),
]));
let tables = self.expand_tables(show_tag_keys.from)?;
let mut measurement_names_builder = StringDictionaryBuilder::<Int32Type>::new();
let mut tag_key_builder = StringDictionaryBuilder::<Int32Type>::new();
for table in tables {
let Some(table_schema) = self.s.table_schema(&table) else {continue};
for (t, f) in table_schema.iter() {
match t {
InfluxColumnType::Tag => {}
InfluxColumnType::Field(_) | InfluxColumnType::Timestamp => {
continue;
}
}
measurement_names_builder.append_value(&table);
tag_key_builder.append_value(f.name());
}
}
let plan = LogicalPlanBuilder::scan(
"tag_keys",
provider_as_source(Arc::new(MemTable::try_new(
Arc::clone(&output_schema),
vec![vec![RecordBatch::try_new(
Arc::clone(&output_schema),
vec![
Arc::new(measurement_names_builder.finish()),
Arc::new(tag_key_builder.finish()),
],
)?]],
)?)),
None,
)?
.build()?;
let plan = plan_with_metadata(
plan,
&InfluxQlMetadata {
measurement_column_index: MEASUREMENT_COLUMN_INDEX,
tag_key_columns: vec![],
},
)?;
let plan = self.limit(
plan,
show_tag_keys.offset,
show_tag_keys.limit,
vec![Expr::Column(Column::new_unqualified(tag_key_col)).sort(true, false)],
true,
&[],
&[],
)?;
Ok(plan)
}
fn show_field_keys_to_plan(
&self,
show_field_keys: ShowFieldKeysStatement,
@ -2119,7 +2194,6 @@ mod test {
assert_snapshot!(plan("DROP MEASUREMENT foo"), @"This feature is not implemented: DROP MEASUREMENT");
assert_snapshot!(plan("SHOW DATABASES"), @"This feature is not implemented: SHOW DATABASES");
assert_snapshot!(plan("SHOW RETENTION POLICIES"), @"This feature is not implemented: SHOW RETENTION POLICIES");
assert_snapshot!(plan("SHOW TAG KEYS"), @"This feature is not implemented: SHOW TAG KEYS");
}
mod metadata_queries {
@ -2149,6 +2223,18 @@ mod test {
"###);
}
#[test]
fn test_show_tag_keys() {
assert_snapshot!(plan("SHOW TAG KEYS"), @"TableScan: tag_keys [iox::measurement:Dictionary(Int32, Utf8), tagKey:Dictionary(Int32, Utf8)]");
assert_snapshot!(plan("SHOW TAG KEYS LIMIT 1 OFFSET 2"), @r###"
Sort: tag_keys.iox::measurement ASC NULLS LAST, tag_keys.tagKey ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), tagKey:Dictionary(Int32, Utf8)]
Projection: tag_keys.iox::measurement, tag_keys.tagKey [iox::measurement:Dictionary(Int32, Utf8), tagKey:Dictionary(Int32, Utf8)]
Filter: iox::row BETWEEN Int64(3) AND Int64(3) [iox::measurement:Dictionary(Int32, Utf8), tagKey:Dictionary(Int32, Utf8), iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [tag_keys.iox::measurement] ORDER BY [tag_keys.tagKey ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), tagKey:Dictionary(Int32, Utf8), iox::row:UInt64;N]
TableScan: tag_keys [iox::measurement:Dictionary(Int32, Utf8), tagKey:Dictionary(Int32, Utf8)]
"###);
}
#[test]
fn test_show_tag_values() {
assert_snapshot!(plan("SHOW TAG VALUES WITH KEY = bar"), @r###"

View File

@ -308,7 +308,18 @@ where
line_protocol
);
let response = state.cluster.write_to_router(line_protocol, None).await;
assert_eq!(response.status(), StatusCode::NO_CONTENT);
let status = response.status();
let body = hyper::body::to_bytes(response.into_body())
.await
.expect("reading response body");
assert!(
status == StatusCode::NO_CONTENT,
"Invalid response code while writing line protocol:\n\nLine Protocol:\n{}\n\nExpected Status: {}\nActual Status: {}\n\nBody:\n{:?}",
line_protocol,
StatusCode::NO_CONTENT,
status,
body,
);
info!("====Done writing line protocol");
}
Step::WriteLineProtocolExpectingError {