feat: basic InfluxQL `SHOW MEASUREMENTS` (#7504)

No WHERE-clauses hat result in actual data checks yet, just pure metdata
queries.

Ref https://github.com/influxdata/idpe/issues/17358 .
pull/24376/head
Marco Neumann 2023-04-17 13:28:01 +02:00 committed by GitHub
parent bda9c07c0b
commit e7511c0f33
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 281 additions and 10 deletions

View File

@ -1,5 +1,27 @@
-- IOX_SETUP: InfluxQLSelectSupport
-- SHOW MEASUREMENTS
SHOW MEASUREMENTS;
SHOW MEASUREMENTS LIMIT 2;
SHOW MEASUREMENTS OFFSET 1;
SHOW MEASUREMENTS LIMIT 1 OFFSET 2;
SHOW MEASUREMENTS WITH MEASUREMENT =~ /m.*/;
SHOW MEASUREMENTS WITH MEASUREMENT =~ /d\isk/;
SHOW MEASUREMENTS WITH MEASUREMENT = disk;
SHOW MEASUREMENTS WITH MEASUREMENT = does_not_exist;
-- invalid queries for `SHOW MEASUREMENTS`
SHOW MEASUREMENTS WITH MEASUREMENT = /my_db/;
SHOW MEASUREMENTS WITH MEASUREMENT =~ my_db;
-- unimplemented features in `SHOW MEASUREMENTS`
SHOW MEASUREMENTS ON my_db;
SHOW MEASUREMENTS WITH MEASUREMENT = x.my_db;
SHOW MEASUREMENTS WITH MEASUREMENT = x.y.my_db;
SHOW MEASUREMENTS WITH MEASUREMENT =~ x./my_db/;
SHOW MEASUREMENTS WITH MEASUREMENT =~ x.y./my_db/;
SHOW MEASUREMENTS WHERE "tag0" = 'x';
-- SHOW FIELD KEYS
SHOW FIELD KEYS;
SHOW FIELD KEYS LIMIT 2;

View File

@ -1,4 +1,86 @@
-- Test Setup: InfluxQLSelectSupport
-- InfluxQL: SHOW MEASUREMENTS;
name: measurements
+------+
| name |
+------+
| cpu |
| disk |
| m0 |
| m1 |
| m2 |
| m3 |
| m4 |
+------+
-- InfluxQL: SHOW MEASUREMENTS LIMIT 2;
name: measurements
+------+
| name |
+------+
| cpu |
| disk |
+------+
-- InfluxQL: SHOW MEASUREMENTS OFFSET 1;
name: measurements
+------+
| name |
+------+
| disk |
| m0 |
| m1 |
| m2 |
| m3 |
| m4 |
+------+
-- InfluxQL: SHOW MEASUREMENTS LIMIT 1 OFFSET 2;
name: measurements
+------+
| name |
+------+
| m0 |
+------+
-- InfluxQL: SHOW MEASUREMENTS WITH MEASUREMENT =~ /m.*/;
name: measurements
+------+
| name |
+------+
| m0 |
| m1 |
| m2 |
| m3 |
| m4 |
+------+
-- InfluxQL: SHOW MEASUREMENTS WITH MEASUREMENT =~ /d\isk/;
name: measurements
+------+
| name |
+------+
| disk |
+------+
-- InfluxQL: SHOW MEASUREMENTS WITH MEASUREMENT = disk;
name: measurements
+------+
| name |
+------+
| disk |
+------+
-- InfluxQL: SHOW MEASUREMENTS WITH MEASUREMENT = does_not_exist;
-- InfluxQL: SHOW MEASUREMENTS WITH MEASUREMENT = /my_db/;
Error while planning query: Error during planning: expected string but got regex
-- InfluxQL: SHOW MEASUREMENTS WITH MEASUREMENT =~ my_db;
Error while planning query: Error during planning: expected regex but got string
-- InfluxQL: SHOW MEASUREMENTS ON my_db;
Error while planning query: This feature is not implemented: SHOW MEASUREMENTS ON <database>
-- InfluxQL: SHOW MEASUREMENTS WITH MEASUREMENT = x.my_db;
Error while planning query: This feature is not implemented: retention policy in from clause
-- InfluxQL: SHOW MEASUREMENTS WITH MEASUREMENT = x.y.my_db;
Error while planning query: This feature is not implemented: database name in from clause
-- InfluxQL: SHOW MEASUREMENTS WITH MEASUREMENT =~ x./my_db/;
Error while planning query: This feature is not implemented: retention policy in from clause
-- InfluxQL: SHOW MEASUREMENTS WITH MEASUREMENT =~ x.y./my_db/;
Error while planning query: This feature is not implemented: database name in from clause
-- InfluxQL: SHOW MEASUREMENTS WHERE "tag0" = 'x';
Error while planning query: This feature is not implemented: SHOW MEASUREMENTS WHERE <condition>
-- InfluxQL: SHOW FIELD KEYS;
name: cpu
+--------------+-----------+

View File

@ -1,5 +1,6 @@
use arrow::datatypes::SchemaRef;
use influxdb_influxql_parser::show_field_keys::ShowFieldKeysStatement;
use influxdb_influxql_parser::show_measurements::ShowMeasurementsStatement;
use influxdb_influxql_parser::show_tag_values::ShowTagValuesStatement;
use std::any::Any;
use std::collections::{HashMap, HashSet};
@ -249,6 +250,17 @@ fn find_all_measurements(stmt: &Statement, tables: &[String]) -> Result<HashSet<
Ok(self)
}
fn post_visit_show_measurements_statement(
self,
sm: &ShowMeasurementsStatement,
) -> Result<Self, Self::Error> {
if sm.with_measurement.is_none() {
self.0.extend(self.1.iter().cloned());
}
Ok(self)
}
fn post_visit_show_field_keys_statement(
self,
sfk: &ShowFieldKeysStatement,
@ -328,6 +340,17 @@ mod test {
vec!["bar", "foo", "foobar"]
);
// Find all measurements in `SHOW MEASUREMENTS`
assert_eq!(find("SHOW MEASUREMENTS"), vec!["bar", "foo", "foobar"]);
assert_eq!(
find("SHOW MEASUREMENTS WITH MEASUREMENT = foo"),
vec!["foo"]
);
assert_eq!(
find("SHOW MEASUREMENTS WITH MEASUREMENT =~ /^foo/"),
vec!["foo", "foobar"]
);
// Find all measurements in `SHOW FIELD KEYS`
assert_eq!(find("SHOW FIELD KEYS"), vec!["bar", "foo", "foobar"]);
assert_eq!(find("SHOW FIELD KEYS FROM /^foo/"), vec!["foo", "foobar"]);

View File

@ -11,8 +11,8 @@ use crate::plan::planner_time_range_expression::{
use crate::plan::rewriter::rewrite_statement;
use crate::plan::util::{binary_operator_to_df_operator, rebase_expr, Schemas};
use crate::plan::var_ref::{column_type_to_var_ref_data_type, var_ref_data_type_to_data_type};
use arrow::array::StringBuilder;
use arrow::datatypes::{DataType, Field as ArrowField, Schema as ArrowSchema};
use arrow::array::{StringBuilder, StringDictionaryBuilder};
use arrow::datatypes::{DataType, Field as ArrowField, Int32Type, Schema as ArrowSchema};
use arrow::record_batch::RecordBatch;
use chrono_tz::Tz;
use datafusion::catalog::TableReference;
@ -44,6 +44,9 @@ use influxdb_influxql_parser::select::{
FillClause, GroupByClause, SLimitClause, SOffsetClause, TimeZoneClause,
};
use influxdb_influxql_parser::show_field_keys::ShowFieldKeysStatement;
use influxdb_influxql_parser::show_measurements::{
ShowMeasurementsStatement, WithMeasurementClause,
};
use influxdb_influxql_parser::show_tag_values::{ShowTagValuesStatement, WithKeyClause};
use influxdb_influxql_parser::simple_from_clause::ShowFromClause;
use influxdb_influxql_parser::{
@ -194,8 +197,8 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
Statement::ShowDatabases(_) => {
Err(DataFusionError::NotImplemented("SHOW DATABASES".into()))
}
Statement::ShowMeasurements(_) => {
Err(DataFusionError::NotImplemented("SHOW MEASUREMENTS".into()))
Statement::ShowMeasurements(show_measurements) => {
self.show_measurements_to_plan(*show_measurements)
}
Statement::ShowRetentionPolicies(_) => Err(DataFusionError::NotImplemented(
"SHOW RETENTION POLICIES".into(),
@ -1341,7 +1344,7 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
vec![Expr::Column(Column::new_unqualified(field_key_col)).sort(true, false)],
true,
&[],
&[field_key_col],
&[],
)?;
Ok(plan)
@ -1422,7 +1425,6 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
schema: output_schema.to_dfschema_ref()?,
}),
};
let plan = plan_with_metadata(
plan,
&InfluxQlMetadata {
@ -1440,7 +1442,138 @@ impl<'a> InfluxQLToLogicalPlan<'a> {
],
true,
&[],
&[key_col, value_col],
&[],
)?;
Ok(plan)
}
fn show_measurements_to_plan(
&self,
show_measurements: ShowMeasurementsStatement,
) -> Result<LogicalPlan> {
if show_measurements.on.is_some() {
// How do we handle this? Do we need to perform cross-namespace queries here?
return Err(DataFusionError::NotImplemented(
"SHOW MEASUREMENTS ON <database>".into(),
));
}
if show_measurements.condition.is_some() {
return Err(DataFusionError::NotImplemented(
"SHOW MEASUREMENTS WHERE <condition>".into(),
));
}
let tables = match show_measurements.with_measurement {
Some(
WithMeasurementClause::Equals(qualified_name)
| WithMeasurementClause::Regex(qualified_name),
) if qualified_name.database.is_some() => {
return Err(DataFusionError::NotImplemented(
"database name in from clause".into(),
));
}
Some(
WithMeasurementClause::Equals(qualified_name)
| WithMeasurementClause::Regex(qualified_name),
) if qualified_name.retention_policy.is_some() => {
return Err(DataFusionError::NotImplemented(
"retention policy in from clause".into(),
));
}
Some(WithMeasurementClause::Equals(qualified_name)) => match qualified_name.name {
MeasurementName::Name(n) => {
let names = self.s.table_names();
if names.into_iter().any(|table| table == n.as_str()) {
vec![n.as_str().to_owned()]
} else {
vec![]
}
}
MeasurementName::Regex(_) => {
return Err(DataFusionError::Plan(String::from(
"expected string but got regex",
)));
}
},
Some(WithMeasurementClause::Regex(qualified_name)) => match &qualified_name.name {
MeasurementName::Name(_) => {
return Err(DataFusionError::Plan(String::from(
"expected regex but got string",
)));
}
MeasurementName::Regex(regex) => {
let regex = parse_regex(regex)?;
let mut tables = self
.s
.table_names()
.into_iter()
.filter(|s| regex.is_match(s))
.map(|s| s.to_owned())
.collect::<Vec<_>>();
tables.sort();
tables
}
},
None => {
let mut tables = self
.s
.table_names()
.into_iter()
.map(|s| s.to_owned())
.collect::<Vec<_>>();
tables.sort();
tables
}
};
let name_col = "name";
let output_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new(
INFLUXQL_MEASUREMENT_COLUMN_NAME,
(&InfluxColumnType::Tag).into(),
false,
),
ArrowField::new(name_col, (&InfluxColumnType::Tag).into(), false),
]));
let mut dummy_measurement_names_builder = StringDictionaryBuilder::<Int32Type>::new();
let mut name_builder = StringDictionaryBuilder::<Int32Type>::new();
for table in tables {
dummy_measurement_names_builder.append_value("measurements");
name_builder.append_value(table);
}
let plan = LogicalPlanBuilder::scan(
"measurements",
provider_as_source(Arc::new(MemTable::try_new(
Arc::clone(&output_schema),
vec![vec![RecordBatch::try_new(
Arc::clone(&output_schema),
vec![
Arc::new(dummy_measurement_names_builder.finish()),
Arc::new(name_builder.finish()),
],
)?]],
)?)),
None,
)?
.build()?;
let plan = plan_with_metadata(
plan,
&InfluxQlMetadata {
measurement_column_index: MEASUREMENT_COLUMN_INDEX,
tag_key_columns: vec![],
},
)?;
let plan = self.limit(
plan,
show_measurements.offset,
show_measurements.limit,
vec![Expr::Column(Column::new_unqualified(name_col)).sort(true, false)],
true,
&[],
&[],
)?;
Ok(plan)
@ -1985,7 +2118,6 @@ mod test {
assert_snapshot!(plan("DELETE FROM foo"), @"This feature is not implemented: DELETE");
assert_snapshot!(plan("DROP MEASUREMENT foo"), @"This feature is not implemented: DROP MEASUREMENT");
assert_snapshot!(plan("SHOW DATABASES"), @"This feature is not implemented: SHOW DATABASES");
assert_snapshot!(plan("SHOW MEASUREMENTS"), @"This feature is not implemented: SHOW MEASUREMENTS");
assert_snapshot!(plan("SHOW RETENTION POLICIES"), @"This feature is not implemented: SHOW RETENTION POLICIES");
assert_snapshot!(plan("SHOW TAG KEYS"), @"This feature is not implemented: SHOW TAG KEYS");
}
@ -1997,7 +2129,7 @@ mod test {
fn test_show_field_keys() {
assert_snapshot!(plan("SHOW FIELD KEYS"), @"TableScan: field_keys [iox::measurement:Utf8, fieldKey:Utf8, fieldType:Utf8]");
assert_snapshot!(plan("SHOW FIELD KEYS LIMIT 1 OFFSET 2"), @r###"
Sort: field_keys.iox::measurement ASC NULLS LAST, field_keys.fieldKey ASC NULLS LAST, field_keys.fieldKey ASC NULLS LAST [iox::measurement:Utf8, fieldKey:Utf8, fieldType:Utf8]
Sort: field_keys.iox::measurement ASC NULLS LAST, field_keys.fieldKey ASC NULLS LAST [iox::measurement:Utf8, fieldKey:Utf8, fieldType:Utf8]
Projection: field_keys.iox::measurement, field_keys.fieldKey, field_keys.fieldType [iox::measurement:Utf8, fieldKey:Utf8, fieldType:Utf8]
Filter: iox::row BETWEEN Int64(3) AND Int64(3) [iox::measurement:Utf8, fieldKey:Utf8, fieldType:Utf8, iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [field_keys.iox::measurement] ORDER BY [field_keys.fieldKey ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Utf8, fieldKey:Utf8, fieldType:Utf8, iox::row:UInt64;N]
@ -2005,6 +2137,18 @@ mod test {
"###);
}
#[test]
fn test_snow_measurements() {
assert_snapshot!(plan("SHOW MEASUREMENTS"), @"TableScan: measurements [iox::measurement:Dictionary(Int32, Utf8), name:Dictionary(Int32, Utf8)]");
assert_snapshot!(plan("SHOW MEASUREMENTS LIMIT 1 OFFSET 2"), @r###"
Sort: measurements.iox::measurement ASC NULLS LAST, measurements.name ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), name:Dictionary(Int32, Utf8)]
Projection: measurements.iox::measurement, measurements.name [iox::measurement:Dictionary(Int32, Utf8), name:Dictionary(Int32, Utf8)]
Filter: iox::row BETWEEN Int64(3) AND Int64(3) [iox::measurement:Dictionary(Int32, Utf8), name:Dictionary(Int32, Utf8), iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [measurements.iox::measurement] ORDER BY [measurements.name ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), name:Dictionary(Int32, Utf8), iox::row:UInt64;N]
TableScan: measurements [iox::measurement:Dictionary(Int32, Utf8), name:Dictionary(Int32, Utf8)]
"###);
}
#[test]
fn test_show_tag_values() {
assert_snapshot!(plan("SHOW TAG VALUES WITH KEY = bar"), @r###"
@ -2015,7 +2159,7 @@ mod test {
TableScan: data [TIME:Boolean;N, bar:Dictionary(Int32, Utf8);N, bool_field:Boolean;N, f64_field:Float64;N, foo:Dictionary(Int32, Utf8);N, i64_field:Int64;N, mixedCase:Float64;N, str_field:Utf8;N, time:Timestamp(Nanosecond, None), with space:Float64;N]
"###);
assert_snapshot!(plan("SHOW TAG VALUES WITH KEY = bar LIMIT 1 OFFSET 2"), @r###"
Sort: iox::measurement ASC NULLS LAST, key ASC NULLS LAST, value ASC NULLS LAST, key ASC NULLS LAST, value ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), key:Dictionary(Int32, Utf8), value:Dictionary(Int32, Utf8);N]
Sort: iox::measurement ASC NULLS LAST, key ASC NULLS LAST, value ASC NULLS LAST [iox::measurement:Dictionary(Int32, Utf8), key:Dictionary(Int32, Utf8), value:Dictionary(Int32, Utf8);N]
Projection: iox::measurement, key, value [iox::measurement:Dictionary(Int32, Utf8), key:Dictionary(Int32, Utf8), value:Dictionary(Int32, Utf8);N]
Filter: iox::row BETWEEN Int64(3) AND Int64(3) [iox::measurement:Dictionary(Int32, Utf8), key:Dictionary(Int32, Utf8), value:Dictionary(Int32, Utf8);N, iox::row:UInt64;N]
WindowAggr: windowExpr=[[ROW_NUMBER() PARTITION BY [iox::measurement] ORDER BY [key ASC NULLS LAST, value ASC NULLS LAST] ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS iox::row]] [iox::measurement:Dictionary(Int32, Utf8), key:Dictionary(Int32, Utf8), value:Dictionary(Int32, Utf8);N, iox::row:UInt64;N]